FLOW2API / src /services /browser_captcha.py
xiaoh2018's picture
Upload browser_captcha.py
2207529 verified
"""
浏览器自动化获取 reCAPTCHA token
使用 Playwright 访问页面并执行 reCAPTCHA 验证
"""
import asyncio
import time
import re
from typing import Optional, Dict
from ..core.logger import debug_logger
# Conditionally import playwright
try:
from playwright.async_api import async_playwright, Browser, BrowserContext
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
"""解析代理URL,分离协议、主机、端口、认证信息
Args:
proxy_url: 代理URL,格式:protocol://[username:password@]host:port
Returns:
代理配置字典,包含server、username、password(如果有认证)
"""
proxy_pattern = r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$'
match = re.match(proxy_pattern, proxy_url)
if match:
protocol, username, password, host, port = match.groups()
proxy_config = {'server': f'{protocol}://{host}:{port}'}
if username and password:
proxy_config['username'] = username
proxy_config['password'] = password
return proxy_config
return None
def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]:
"""验证浏览器代理URL格式(仅支持HTTP和无认证SOCKS5)
Args:
proxy_url: 代理URL
Returns:
(是否有效, 错误信息)
"""
if not proxy_url or not proxy_url.strip():
return True, "" # 空URL视为有效(不使用代理)
proxy_url = proxy_url.strip()
parsed = parse_proxy_url(proxy_url)
if not parsed:
return False, "代理URL格式错误,正确格式:http://host:port 或 socks5://host:port"
# 检查是否有认证信息
has_auth = 'username' in parsed
# 获取协议
protocol = parsed['server'].split('://')[0]
# SOCKS5不支持认证
if protocol == 'socks5' and has_auth:
return False, "浏览器不支持带认证的SOCKS5代理,请使用HTTP代理或移除SOCKS5认证"
# HTTP/HTTPS支持认证
if protocol in ['http', 'https']:
return True, ""
# SOCKS5无认证支持
if protocol == 'socks5' and not has_auth:
return True, ""
return False, f"不支持的代理协议:{protocol}"
class BrowserCaptchaService:
"""浏览器自动化获取 reCAPTCHA token(单例模式)"""
_instance: Optional['BrowserCaptchaService'] = None
_lock = asyncio.Lock()
def __init__(self, db=None):
"""初始化服务(始终使用无头模式)"""
self.headless = True # 始终无头
self.playwright = None
self.browser: Optional[Browser] = None
self._initialized = False
self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
self.db = db
@classmethod
async def get_instance(cls, db=None) -> 'BrowserCaptchaService':
"""获取单例实例"""
if cls._instance is None:
async with cls._lock:
if cls._instance is None:
cls._instance = cls(db)
await cls._instance.initialize()
return cls._instance
async def initialize(self):
"""初始化浏览器(启动一次)"""
if self._initialized:
return
try:
# 检查 Playwright 是否可用
if not PLAYWRIGHT_AVAILABLE:
debug_logger.log_error("[BrowserCaptcha] ❌ Playwright 不可用,请使用 YesCaptcha 服务")
raise ImportError("Playwright 未安装,请使用 YesCaptcha 服务")
# 获取浏览器专用代理配置
proxy_url = None
if self.db:
captcha_config = await self.db.get_captcha_config()
if captcha_config.browser_proxy_enabled and captcha_config.browser_proxy_url:
proxy_url = captcha_config.browser_proxy_url
debug_logger.log_info(f"[BrowserCaptcha] 正在启动浏览器... (proxy={proxy_url or 'None'})")
self.playwright = await async_playwright().start()
# 配置浏览器启动参数
launch_options = {
'headless': self.headless,
'args': [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox'
]
}
# 如果有代理,解析并添加代理配置
if proxy_url:
proxy_config = parse_proxy_url(proxy_url)
if proxy_config:
launch_options['proxy'] = proxy_config
auth_info = "auth=yes" if 'username' in proxy_config else "auth=no"
debug_logger.log_info(f"[BrowserCaptcha] 代理配置: {proxy_config['server']} ({auth_info})")
else:
debug_logger.log_warning(f"[BrowserCaptcha] 代理URL格式错误: {proxy_url}")
self.browser = await self.playwright.chromium.launch(**launch_options)
self._initialized = True
debug_logger.log_info(f"[BrowserCaptcha] ✅ 浏览器已启动 (headless={self.headless}, proxy={proxy_url or 'None'})")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] ❌ 浏览器启动失败: {str(e)}")
raise
async def get_token(self, project_id: str) -> Optional[str]:
"""获取 reCAPTCHA token
Args:
project_id: Flow项目ID
Returns:
reCAPTCHA token字符串,如果获取失败返回None
"""
if not self._initialized:
await self.initialize()
start_time = time.time()
context = None
try:
# 创建新的上下文
context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
locale='en-US',
timezone_id='America/New_York'
)
page = await context.new_page()
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
debug_logger.log_info(f"[BrowserCaptcha] 访问页面: {website_url}")
# 访问页面
try:
await page.goto(website_url, wait_until="domcontentloaded", timeout=30000)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 页面加载超时或失败: {str(e)}")
# 检查并注入 reCAPTCHA v3 脚本
debug_logger.log_info("[BrowserCaptcha] 检查并加载 reCAPTCHA v3 脚本...")
script_loaded = await page.evaluate("""
() => {
if (window.grecaptcha && typeof window.grecaptcha.execute === 'function') {
return true;
}
return false;
}
""")
if not script_loaded:
# 注入脚本
debug_logger.log_info("[BrowserCaptcha] 注入 reCAPTCHA v3 脚本...")
await page.evaluate(f"""
() => {{
return new Promise((resolve) => {{
const script = document.createElement('script');
script.src = 'https://www.google.com/recaptcha/api.js?render={self.website_key}';
script.async = true;
script.defer = true;
script.onload = () => resolve(true);
script.onerror = () => resolve(false);
document.head.appendChild(script);
}});
}}
""")
# 等待reCAPTCHA加载和初始化
debug_logger.log_info("[BrowserCaptcha] 等待reCAPTCHA初始化...")
for i in range(20):
grecaptcha_ready = await page.evaluate("""
() => {
return window.grecaptcha &&
typeof window.grecaptcha.execute === 'function';
}
""")
if grecaptcha_ready:
debug_logger.log_info(f"[BrowserCaptcha] reCAPTCHA 已准备好(等待了 {i*0.5} 秒)")
break
await asyncio.sleep(0.5)
else:
debug_logger.log_warning("[BrowserCaptcha] reCAPTCHA 初始化超时,继续尝试执行...")
# 额外等待确保完全初始化
await page.wait_for_timeout(1000)
# 执行reCAPTCHA并获取token
debug_logger.log_info("[BrowserCaptcha] 执行reCAPTCHA验证...")
token = await page.evaluate("""
async (websiteKey) => {
try {
if (!window.grecaptcha) {
console.error('[BrowserCaptcha] window.grecaptcha 不存在');
return null;
}
if (typeof window.grecaptcha.execute !== 'function') {
console.error('[BrowserCaptcha] window.grecaptcha.execute 不是函数');
return null;
}
// 确保grecaptcha已准备好
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('reCAPTCHA加载超时'));
}, 15000);
if (window.grecaptcha && window.grecaptcha.ready) {
window.grecaptcha.ready(() => {
clearTimeout(timeout);
resolve();
});
} else {
clearTimeout(timeout);
resolve();
}
});
// 执行reCAPTCHA v3
const token = await window.grecaptcha.execute(websiteKey, {
action: 'FLOW_GENERATION'
});
return token;
} catch (error) {
console.error('[BrowserCaptcha] reCAPTCHA执行错误:', error);
return null;
}
}
""", self.website_key)
duration_ms = (time.time() - start_time) * 1000
if token:
debug_logger.log_info(f"[BrowserCaptcha] ✅ Token获取成功(耗时 {duration_ms:.0f}ms)")
return token
else:
debug_logger.log_error("[BrowserCaptcha] Token获取失败(返回null)")
return None
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 获取token异常: {str(e)}")
return None
finally:
# 关闭上下文
if context:
try:
await context.close()
except:
pass
async def close(self):
"""关闭浏览器"""
try:
if self.browser:
try:
await self.browser.close()
except Exception as e:
# 忽略连接关闭错误(正常关闭场景)
if "Connection closed" not in str(e):
debug_logger.log_warning(f"[BrowserCaptcha] 关闭浏览器时出现异常: {str(e)}")
finally:
self.browser = None
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass # 静默处理 playwright 停止异常
finally:
self.playwright = None
self._initialized = False
debug_logger.log_info("[BrowserCaptcha] 浏览器已关闭")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 关闭浏览器异常: {str(e)}")