Spaces:
Build error
Build error
| from seleniumwire import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from fastapi import FastAPI, Request | |
| import uvicorn | |
| import time | |
| import json | |
| from urllib.parse import unquote, urlparse, quote_plus | |
| app = FastAPI() | |
| # 解析cookie字符串为字典 | |
| def convert_cookies_to_dict(cookies): | |
| cookie_items = cookies.split("; ") | |
| parsed_cookies = {item.split("=", 1)[0].strip(): item.split("=", 1)[1].strip() if "=" in item else "" for item in cookie_items} | |
| return parsed_cookies# | |
| # 获取域名字符串的根域 | |
| def get_root_domain(url): | |
| parsed_url = urlparse(url) | |
| domain = parsed_url.netloc | |
| parts = domain.split('.') | |
| if len(parts) > 1: | |
| return '.'.join(parts[-2:]) | |
| else: | |
| return domain | |
| # 尝试对字符串做 json 解析,如果失败则返回原字符串 | |
| def try_json_decode(headers): | |
| try: | |
| return json.loads(str(headers)) | |
| except Exception as e: | |
| return headers | |
| def main(): | |
| return {"code": 200,"msg":"Success"} | |
| def chrome(url:str=None,wait:int=5,header:str=None,cookie:str=None,cookie_domain:str=None): | |
| # 开启捕获HAR数据功能,允许使用 driver.har 进行检索 | |
| seleniumwire_options = { | |
| 'enable_har': True | |
| } | |
| # 必须有目标url | |
| if type(url) == str: | |
| target_url = unquote(url) | |
| target_domain = get_root_domain(target_url) | |
| else: | |
| return {"code": 500,"msg":"No target URL"} | |
| # 等待时间必须在 0 到 30 之间 | |
| if wait in range(0, 31): | |
| wait_time = wait | |
| else: | |
| return {"code": 500,"msg":"The waiting time must be between 0 and 30"} | |
| header_array = {} | |
| # header可以覆写,但必须传入json | |
| try: | |
| if type(header) == str: | |
| header_array.update(json.loads(unquote(header))) | |
| except Exception as e: | |
| return {"code": 500,"msg":"The header field is not JSON"} | |
| # 如果输入了cookie | |
| if type(cookie) == str: | |
| header_array.update({"cookie":unquote(cookie)}) | |
| # 初始化浏览器 | |
| options = Options() | |
| # 设置为无头模式 | |
| options.add_argument('--headless') | |
| # 实例化 | |
| driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwire_options) | |
| # 需要打开网址页面,才能用 driver.add_cookie 进行cookie追加 | |
| driver.get(target_url) | |
| # 清除本次打开网址页面,可能存储在本地的cookie、sessionStorage、localStorage,并删除因此次访问所产生的 network 和 har 记录 | |
| driver.delete_all_cookies() | |
| driver.execute_script("window.sessionStorage.clear();") | |
| driver.execute_script("window.localStorage.clear();") | |
| del driver.requests | |
| # 对浏览器追加我们传递进来的cookie | |
| if 'cookie' in header_array: | |
| cookie_array = convert_cookies_to_dict(header_array['cookie']) | |
| del header_array['cookie'] | |
| if type(cookie_domain) == str: | |
| domain = cookie_domain | |
| else: | |
| domain = f'.{target_domain}' | |
| for key, value in cookie_array.items(): | |
| try: | |
| driver.add_cookie({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) | |
| except Exception as e: | |
| print("Error Cookie:") | |
| print({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) | |
| # 把下次访问中的请求头修改成我们需要的样式(没有修改的项目则保持原样) | |
| driver.header_overrides = header_array | |
| # 再次访问网址 | |
| driver.get(target_url) | |
| # 输出此时访问的网页源码 | |
| # print(driver.page_source) | |
| # 等待多少秒,来预估网页完全的加载完成(执行完内部的所有js,因为部分js可能涉及到请求后的动态处理,或者延时跳转) | |
| if wait_time > 0: | |
| time.sleep(wait_time) | |
| # 获取完全加载完成时,页面的URL | |
| current_url = driver.current_url | |
| # 获取完全加载完成时,页面的源代码 | |
| page_source = driver.page_source | |
| # 获取完全加载完成时,页面的cookie | |
| cookies = driver.get_cookies() | |
| # 完全加载完成时,页面是否有发生过 301 302 跳转过 | |
| is_jump = (target_url != current_url) | |
| network = [] | |
| # 遍历输出过程中的 network(使用非 har 文件的摘要方式输出) | |
| for request in driver.requests: | |
| if request.response: | |
| network.append({ | |
| "method":request.method, | |
| "status":request.response.status_code , | |
| "url":request.url, | |
| "responseheaders":{k: try_json_decode(v) for k, v in request.response.headers.items()}, | |
| "requestheaders":{k: try_json_decode(v) for k, v in request.headers.items()}, | |
| }) | |
| # driver.har 将调用 har 记录,输出最为完整的 network 数据流 | |
| # print(driver.har) | |
| data = { | |
| "url": current_url, | |
| "page_source": page_source, | |
| "end_cookies": cookies, | |
| "is_jump": is_jump, | |
| "network": network | |
| } | |
| driver.quit() | |
| return {"code": 200,"data":data} | |
| if __name__ == '__main__': | |
| uvicorn.run(app='app:app', host="0.0.0.0", port=7860) |