File size: 12,557 Bytes
33cfa2a 2207529 33cfa2a 2207529 33cfa2a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
"""
浏览器自动化获取 reCAPTCHA token
使用 Playwright 访问页面并执行 reCAPTCHA 验证
"""
import asyncio
import time
import re
from typing import Optional, Dict
from ..core.logger import debug_logger
# Conditionally import playwright
try:
from playwright.async_api import async_playwright, Browser, BrowserContext
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
"""解析代理URL,分离协议、主机、端口、认证信息
Args:
proxy_url: 代理URL,格式:protocol://[username:password@]host:port
Returns:
代理配置字典,包含server、username、password(如果有认证)
"""
proxy_pattern = r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$'
match = re.match(proxy_pattern, proxy_url)
if match:
protocol, username, password, host, port = match.groups()
proxy_config = {'server': f'{protocol}://{host}:{port}'}
if username and password:
proxy_config['username'] = username
proxy_config['password'] = password
return proxy_config
return None
def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]:
"""验证浏览器代理URL格式(仅支持HTTP和无认证SOCKS5)
Args:
proxy_url: 代理URL
Returns:
(是否有效, 错误信息)
"""
if not proxy_url or not proxy_url.strip():
return True, "" # 空URL视为有效(不使用代理)
proxy_url = proxy_url.strip()
parsed = parse_proxy_url(proxy_url)
if not parsed:
return False, "代理URL格式错误,正确格式:http://host:port 或 socks5://host:port"
# 检查是否有认证信息
has_auth = 'username' in parsed
# 获取协议
protocol = parsed['server'].split('://')[0]
# SOCKS5不支持认证
if protocol == 'socks5' and has_auth:
return False, "浏览器不支持带认证的SOCKS5代理,请使用HTTP代理或移除SOCKS5认证"
# HTTP/HTTPS支持认证
if protocol in ['http', 'https']:
return True, ""
# SOCKS5无认证支持
if protocol == 'socks5' and not has_auth:
return True, ""
return False, f"不支持的代理协议:{protocol}"
class BrowserCaptchaService:
"""浏览器自动化获取 reCAPTCHA token(单例模式)"""
_instance: Optional['BrowserCaptchaService'] = None
_lock = asyncio.Lock()
def __init__(self, db=None):
"""初始化服务(始终使用无头模式)"""
self.headless = True # 始终无头
self.playwright = None
self.browser: Optional[Browser] = None
self._initialized = False
self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
self.db = db
@classmethod
async def get_instance(cls, db=None) -> 'BrowserCaptchaService':
"""获取单例实例"""
if cls._instance is None:
async with cls._lock:
if cls._instance is None:
cls._instance = cls(db)
await cls._instance.initialize()
return cls._instance
async def initialize(self):
"""初始化浏览器(启动一次)"""
if self._initialized:
return
try:
# 检查 Playwright 是否可用
if not PLAYWRIGHT_AVAILABLE:
debug_logger.log_error("[BrowserCaptcha] ❌ Playwright 不可用,请使用 YesCaptcha 服务")
raise ImportError("Playwright 未安装,请使用 YesCaptcha 服务")
# 获取浏览器专用代理配置
proxy_url = None
if self.db:
captcha_config = await self.db.get_captcha_config()
if captcha_config.browser_proxy_enabled and captcha_config.browser_proxy_url:
proxy_url = captcha_config.browser_proxy_url
debug_logger.log_info(f"[BrowserCaptcha] 正在启动浏览器... (proxy={proxy_url or 'None'})")
self.playwright = await async_playwright().start()
# 配置浏览器启动参数
launch_options = {
'headless': self.headless,
'args': [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox'
]
}
# 如果有代理,解析并添加代理配置
if proxy_url:
proxy_config = parse_proxy_url(proxy_url)
if proxy_config:
launch_options['proxy'] = proxy_config
auth_info = "auth=yes" if 'username' in proxy_config else "auth=no"
debug_logger.log_info(f"[BrowserCaptcha] 代理配置: {proxy_config['server']} ({auth_info})")
else:
debug_logger.log_warning(f"[BrowserCaptcha] 代理URL格式错误: {proxy_url}")
self.browser = await self.playwright.chromium.launch(**launch_options)
self._initialized = True
debug_logger.log_info(f"[BrowserCaptcha] ✅ 浏览器已启动 (headless={self.headless}, proxy={proxy_url or 'None'})")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] ❌ 浏览器启动失败: {str(e)}")
raise
async def get_token(self, project_id: str) -> Optional[str]:
"""获取 reCAPTCHA token
Args:
project_id: Flow项目ID
Returns:
reCAPTCHA token字符串,如果获取失败返回None
"""
if not self._initialized:
await self.initialize()
start_time = time.time()
context = None
try:
# 创建新的上下文
context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
locale='en-US',
timezone_id='America/New_York'
)
page = await context.new_page()
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
debug_logger.log_info(f"[BrowserCaptcha] 访问页面: {website_url}")
# 访问页面
try:
await page.goto(website_url, wait_until="domcontentloaded", timeout=30000)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 页面加载超时或失败: {str(e)}")
# 检查并注入 reCAPTCHA v3 脚本
debug_logger.log_info("[BrowserCaptcha] 检查并加载 reCAPTCHA v3 脚本...")
script_loaded = await page.evaluate("""
() => {
if (window.grecaptcha && typeof window.grecaptcha.execute === 'function') {
return true;
}
return false;
}
""")
if not script_loaded:
# 注入脚本
debug_logger.log_info("[BrowserCaptcha] 注入 reCAPTCHA v3 脚本...")
await page.evaluate(f"""
() => {{
return new Promise((resolve) => {{
const script = document.createElement('script');
script.src = 'https://www.google.com/recaptcha/api.js?render={self.website_key}';
script.async = true;
script.defer = true;
script.onload = () => resolve(true);
script.onerror = () => resolve(false);
document.head.appendChild(script);
}});
}}
""")
# 等待reCAPTCHA加载和初始化
debug_logger.log_info("[BrowserCaptcha] 等待reCAPTCHA初始化...")
for i in range(20):
grecaptcha_ready = await page.evaluate("""
() => {
return window.grecaptcha &&
typeof window.grecaptcha.execute === 'function';
}
""")
if grecaptcha_ready:
debug_logger.log_info(f"[BrowserCaptcha] reCAPTCHA 已准备好(等待了 {i*0.5} 秒)")
break
await asyncio.sleep(0.5)
else:
debug_logger.log_warning("[BrowserCaptcha] reCAPTCHA 初始化超时,继续尝试执行...")
# 额外等待确保完全初始化
await page.wait_for_timeout(1000)
# 执行reCAPTCHA并获取token
debug_logger.log_info("[BrowserCaptcha] 执行reCAPTCHA验证...")
token = await page.evaluate("""
async (websiteKey) => {
try {
if (!window.grecaptcha) {
console.error('[BrowserCaptcha] window.grecaptcha 不存在');
return null;
}
if (typeof window.grecaptcha.execute !== 'function') {
console.error('[BrowserCaptcha] window.grecaptcha.execute 不是函数');
return null;
}
// 确保grecaptcha已准备好
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('reCAPTCHA加载超时'));
}, 15000);
if (window.grecaptcha && window.grecaptcha.ready) {
window.grecaptcha.ready(() => {
clearTimeout(timeout);
resolve();
});
} else {
clearTimeout(timeout);
resolve();
}
});
// 执行reCAPTCHA v3
const token = await window.grecaptcha.execute(websiteKey, {
action: 'FLOW_GENERATION'
});
return token;
} catch (error) {
console.error('[BrowserCaptcha] reCAPTCHA执行错误:', error);
return null;
}
}
""", self.website_key)
duration_ms = (time.time() - start_time) * 1000
if token:
debug_logger.log_info(f"[BrowserCaptcha] ✅ Token获取成功(耗时 {duration_ms:.0f}ms)")
return token
else:
debug_logger.log_error("[BrowserCaptcha] Token获取失败(返回null)")
return None
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 获取token异常: {str(e)}")
return None
finally:
# 关闭上下文
if context:
try:
await context.close()
except:
pass
async def close(self):
"""关闭浏览器"""
try:
if self.browser:
try:
await self.browser.close()
except Exception as e:
# 忽略连接关闭错误(正常关闭场景)
if "Connection closed" not in str(e):
debug_logger.log_warning(f"[BrowserCaptcha] 关闭浏览器时出现异常: {str(e)}")
finally:
self.browser = None
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass # 静默处理 playwright 停止异常
finally:
self.playwright = None
self._initialized = False
debug_logger.log_info("[BrowserCaptcha] 浏览器已关闭")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 关闭浏览器异常: {str(e)}")
|