""" 浏览器自动化获取 reCAPTCHA token 使用 Playwright 访问页面并执行 reCAPTCHA 验证 """ import asyncio import time import re from typing import Optional, Dict from ..core.logger import debug_logger # Conditionally import playwright try: from playwright.async_api import async_playwright, Browser, BrowserContext PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]: """解析代理URL,分离协议、主机、端口、认证信息 Args: proxy_url: 代理URL,格式:protocol://[username:password@]host:port Returns: 代理配置字典,包含server、username、password(如果有认证) """ proxy_pattern = r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$' match = re.match(proxy_pattern, proxy_url) if match: protocol, username, password, host, port = match.groups() proxy_config = {'server': f'{protocol}://{host}:{port}'} if username and password: proxy_config['username'] = username proxy_config['password'] = password return proxy_config return None def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]: """验证浏览器代理URL格式(仅支持HTTP和无认证SOCKS5) Args: proxy_url: 代理URL Returns: (是否有效, 错误信息) """ if not proxy_url or not proxy_url.strip(): return True, "" # 空URL视为有效(不使用代理) proxy_url = proxy_url.strip() parsed = parse_proxy_url(proxy_url) if not parsed: return False, "代理URL格式错误,正确格式:http://host:port 或 socks5://host:port" # 检查是否有认证信息 has_auth = 'username' in parsed # 获取协议 protocol = parsed['server'].split('://')[0] # SOCKS5不支持认证 if protocol == 'socks5' and has_auth: return False, "浏览器不支持带认证的SOCKS5代理,请使用HTTP代理或移除SOCKS5认证" # HTTP/HTTPS支持认证 if protocol in ['http', 'https']: return True, "" # SOCKS5无认证支持 if protocol == 'socks5' and not has_auth: return True, "" return False, f"不支持的代理协议:{protocol}" class BrowserCaptchaService: """浏览器自动化获取 reCAPTCHA token(单例模式)""" _instance: Optional['BrowserCaptchaService'] = None _lock = asyncio.Lock() def __init__(self, db=None): """初始化服务(始终使用无头模式)""" self.headless = True # 始终无头 self.playwright = None self.browser: Optional[Browser] = None self._initialized = False self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV" self.db = db @classmethod async def get_instance(cls, db=None) -> 'BrowserCaptchaService': """获取单例实例""" if cls._instance is None: async with cls._lock: if cls._instance is None: cls._instance = cls(db) await cls._instance.initialize() return cls._instance async def initialize(self): """初始化浏览器(启动一次)""" if self._initialized: return try: # 检查 Playwright 是否可用 if not PLAYWRIGHT_AVAILABLE: debug_logger.log_error("[BrowserCaptcha] ❌ Playwright 不可用,请使用 YesCaptcha 服务") raise ImportError("Playwright 未安装,请使用 YesCaptcha 服务") # 获取浏览器专用代理配置 proxy_url = None if self.db: captcha_config = await self.db.get_captcha_config() if captcha_config.browser_proxy_enabled and captcha_config.browser_proxy_url: proxy_url = captcha_config.browser_proxy_url debug_logger.log_info(f"[BrowserCaptcha] 正在启动浏览器... (proxy={proxy_url or 'None'})") self.playwright = await async_playwright().start() # 配置浏览器启动参数 launch_options = { 'headless': self.headless, 'args': [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox' ] } # 如果有代理,解析并添加代理配置 if proxy_url: proxy_config = parse_proxy_url(proxy_url) if proxy_config: launch_options['proxy'] = proxy_config auth_info = "auth=yes" if 'username' in proxy_config else "auth=no" debug_logger.log_info(f"[BrowserCaptcha] 代理配置: {proxy_config['server']} ({auth_info})") else: debug_logger.log_warning(f"[BrowserCaptcha] 代理URL格式错误: {proxy_url}") self.browser = await self.playwright.chromium.launch(**launch_options) self._initialized = True debug_logger.log_info(f"[BrowserCaptcha] ✅ 浏览器已启动 (headless={self.headless}, proxy={proxy_url or 'None'})") except Exception as e: debug_logger.log_error(f"[BrowserCaptcha] ❌ 浏览器启动失败: {str(e)}") raise async def get_token(self, project_id: str) -> Optional[str]: """获取 reCAPTCHA token Args: project_id: Flow项目ID Returns: reCAPTCHA token字符串,如果获取失败返回None """ if not self._initialized: await self.initialize() start_time = time.time() context = None try: # 创建新的上下文 context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', locale='en-US', timezone_id='America/New_York' ) page = await context.new_page() website_url = f"https://labs.google/fx/tools/flow/project/{project_id}" debug_logger.log_info(f"[BrowserCaptcha] 访问页面: {website_url}") # 访问页面 try: await page.goto(website_url, wait_until="domcontentloaded", timeout=30000) except Exception as e: debug_logger.log_warning(f"[BrowserCaptcha] 页面加载超时或失败: {str(e)}") # 检查并注入 reCAPTCHA v3 脚本 debug_logger.log_info("[BrowserCaptcha] 检查并加载 reCAPTCHA v3 脚本...") script_loaded = await page.evaluate(""" () => { if (window.grecaptcha && typeof window.grecaptcha.execute === 'function') { return true; } return false; } """) if not script_loaded: # 注入脚本 debug_logger.log_info("[BrowserCaptcha] 注入 reCAPTCHA v3 脚本...") await page.evaluate(f""" () => {{ return new Promise((resolve) => {{ const script = document.createElement('script'); script.src = 'https://www.google.com/recaptcha/api.js?render={self.website_key}'; script.async = true; script.defer = true; script.onload = () => resolve(true); script.onerror = () => resolve(false); document.head.appendChild(script); }}); }} """) # 等待reCAPTCHA加载和初始化 debug_logger.log_info("[BrowserCaptcha] 等待reCAPTCHA初始化...") for i in range(20): grecaptcha_ready = await page.evaluate(""" () => { return window.grecaptcha && typeof window.grecaptcha.execute === 'function'; } """) if grecaptcha_ready: debug_logger.log_info(f"[BrowserCaptcha] reCAPTCHA 已准备好(等待了 {i*0.5} 秒)") break await asyncio.sleep(0.5) else: debug_logger.log_warning("[BrowserCaptcha] reCAPTCHA 初始化超时,继续尝试执行...") # 额外等待确保完全初始化 await page.wait_for_timeout(1000) # 执行reCAPTCHA并获取token debug_logger.log_info("[BrowserCaptcha] 执行reCAPTCHA验证...") token = await page.evaluate(""" async (websiteKey) => { try { if (!window.grecaptcha) { console.error('[BrowserCaptcha] window.grecaptcha 不存在'); return null; } if (typeof window.grecaptcha.execute !== 'function') { console.error('[BrowserCaptcha] window.grecaptcha.execute 不是函数'); return null; } // 确保grecaptcha已准备好 await new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error('reCAPTCHA加载超时')); }, 15000); if (window.grecaptcha && window.grecaptcha.ready) { window.grecaptcha.ready(() => { clearTimeout(timeout); resolve(); }); } else { clearTimeout(timeout); resolve(); } }); // 执行reCAPTCHA v3 const token = await window.grecaptcha.execute(websiteKey, { action: 'FLOW_GENERATION' }); return token; } catch (error) { console.error('[BrowserCaptcha] reCAPTCHA执行错误:', error); return null; } } """, self.website_key) duration_ms = (time.time() - start_time) * 1000 if token: debug_logger.log_info(f"[BrowserCaptcha] ✅ Token获取成功(耗时 {duration_ms:.0f}ms)") return token else: debug_logger.log_error("[BrowserCaptcha] Token获取失败(返回null)") return None except Exception as e: debug_logger.log_error(f"[BrowserCaptcha] 获取token异常: {str(e)}") return None finally: # 关闭上下文 if context: try: await context.close() except: pass async def close(self): """关闭浏览器""" try: if self.browser: try: await self.browser.close() except Exception as e: # 忽略连接关闭错误(正常关闭场景) if "Connection closed" not in str(e): debug_logger.log_warning(f"[BrowserCaptcha] 关闭浏览器时出现异常: {str(e)}") finally: self.browser = None if self.playwright: try: await self.playwright.stop() except Exception: pass # 静默处理 playwright 停止异常 finally: self.playwright = None self._initialized = False debug_logger.log_info("[BrowserCaptcha] 浏览器已关闭") except Exception as e: debug_logger.log_error(f"[BrowserCaptcha] 关闭浏览器异常: {str(e)}")