from __future__ import annotations import ast import json import os import re import time from typing import Any, Dict, List, Optional import requests from smolagents.tools import Tool DATASETS_JSON = r'''{"amazon_product": {"dataset_id": "gd_l7q7dkf244hwjntr0", "description": "Quickly read structured amazon product data.\nRequires a valid product URL with /dp/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "amazon_product_reviews": {"dataset_id": "gd_le8e811kzy4ggddlq", "description": "Quickly read structured amazon product review data.\nRequires a valid product URL with /dp/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "amazon_product_search": {"dataset_id": "gd_lwdb4vjm1ehb499uxs", "description": "Quickly read structured amazon product search data.\nRequires a valid search keyword and amazon domain URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["keyword", "url"], "fixed_values": {"pages_to_search": "1"}}, "walmart_product": {"dataset_id": "gd_l95fol7l1ru6rlo116", "description": "Quickly read structured walmart product data.\nRequires a valid product URL with /ip/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "walmart_seller": {"dataset_id": "gd_m7ke48w81ocyu4hhz0", "description": "Quickly read structured walmart seller data.\nRequires a valid walmart seller URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "ebay_product": {"dataset_id": "gd_ltr9mjt81n0zzdk1fb", "description": "Quickly read structured ebay product data.\nRequires a valid ebay product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "homedepot_products": {"dataset_id": "gd_lmusivh019i7g97q2n", "description": "Quickly read structured homedepot product data.\nRequires a valid homedepot product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zara_products": {"dataset_id": "gd_lct4vafw1tgx27d4o0", "description": "Quickly read structured zara product data.\nRequires a valid zara product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "etsy_products": {"dataset_id": "gd_ltppk0jdv1jqz25mz", "description": "Quickly read structured etsy product data.\nRequires a valid etsy product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "bestbuy_products": {"dataset_id": "gd_ltre1jqe1jfr7cccf", "description": "Quickly read structured bestbuy product data.\nRequires a valid bestbuy product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_person_profile": {"dataset_id": "gd_l1viktl72bvl7bjuj0", "description": "Quickly read structured linkedin people profile data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_company_profile": {"dataset_id": "gd_l1vikfnt1wgvvqz95w", "description": "Quickly read structured linkedin company profile data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_job_listings": {"dataset_id": "gd_lpfll7v5hcqtkxl6l", "description": "Quickly read structured linkedin job listings data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_posts": {"dataset_id": "gd_lyy3tktm25m4avu764", "description": "Quickly read structured linkedin posts data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_people_search": {"dataset_id": "gd_m8d03he47z8nwb5xc", "description": "Quickly read structured linkedin people search data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "first_name", "last_name"]}, "crunchbase_company": {"dataset_id": "gd_l1vijqt9jfj7olije", "description": "Quickly read structured crunchbase company data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zoominfo_company_profile": {"dataset_id": "gd_m0ci4a4ivx3j5l6nx", "description": "Quickly read structured ZoomInfo company profile data.\nRequires a valid ZoomInfo company URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_profiles": {"dataset_id": "gd_l1vikfch901nx3by4", "description": "Quickly read structured Instagram profile data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_posts": {"dataset_id": "gd_lk5ns7kz21pck8jpis", "description": "Quickly read structured Instagram post data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_reels": {"dataset_id": "gd_lyclm20il4r5helnj", "description": "Quickly read structured Instagram reel data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_comments": {"dataset_id": "gd_ltppn085pokosxh13", "description": "Quickly read structured Instagram comments data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_posts": {"dataset_id": "gd_lyclm1571iy3mv57zw", "description": "Quickly read structured Facebook post data.\nRequires a valid Facebook post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_marketplace_listings": {"dataset_id": "gd_lvt9iwuh6fbcwmx1a", "description": "Quickly read structured Facebook marketplace listing data.\nRequires a valid Facebook marketplace listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_company_reviews": {"dataset_id": "gd_m0dtqpiu1mbcyc2g86", "description": "Quickly read structured Facebook company reviews data.\nRequires a valid Facebook company URL and number of reviews.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "num_of_reviews"]}, "facebook_events": {"dataset_id": "gd_m14sd0to1jz48ppm51", "description": "Quickly read structured Facebook events data.\nRequires a valid Facebook event URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_profiles": {"dataset_id": "gd_l1villgoiiidt09ci", "description": "Quickly read structured Tiktok profiles data.\nRequires a valid Tiktok profile URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_posts": {"dataset_id": "gd_lu702nij2f790tmv9h", "description": "Quickly read structured Tiktok post data.\nRequires a valid Tiktok post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_shop": {"dataset_id": "gd_m45m1u911dsa4274pi", "description": "Quickly read structured Tiktok shop data.\nRequires a valid Tiktok shop product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_comments": {"dataset_id": "gd_lkf2st302ap89utw5k", "description": "Quickly read structured Tiktok comments data.\nRequires a valid Tiktok video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "google_maps_reviews": {"dataset_id": "gd_luzfs1dn2oa0teb81", "description": "Quickly read structured Google maps reviews data.\nRequires a valid Google maps URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "days_limit"], "defaults": {"days_limit": "3"}}, "google_shopping": {"dataset_id": "gd_ltppk50q18kdw67omz", "description": "Quickly read structured Google shopping data.\nRequires a valid Google shopping product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "google_play_store": {"dataset_id": "gd_lsk382l8xei8vzm4u", "description": "Quickly read structured Google play store data.\nRequires a valid Google play store app URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "apple_app_store": {"dataset_id": "gd_lsk9ki3u2iishmwrui", "description": "Quickly read structured apple app store data.\nRequires a valid apple app store app URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "reuter_news": {"dataset_id": "gd_lyptx9h74wtlvpnfu", "description": "Quickly read structured reuter news data.\nRequires a valid reuter news report URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "github_repository_file": {"dataset_id": "gd_lyrexgxc24b3d4imjt", "description": "Quickly read structured github repository data.\nRequires a valid github repository file URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "yahoo_finance_business": {"dataset_id": "gd_lmrpz3vxmz972ghd7", "description": "Quickly read structured yahoo finance business data.\nRequires a valid yahoo finance business URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "x_posts": {"dataset_id": "gd_lwxkxvnf1cynvib9co", "description": "Quickly read structured X post data.\nRequires a valid X post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zillow_properties_listing": {"dataset_id": "gd_lfqkr8wm13ixtbd8f5", "description": "Quickly read structured zillow properties listing data.\nRequires a valid zillow properties listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "booking_hotel_listings": {"dataset_id": "gd_m5mbdl081229ln6t4a", "description": "Quickly read structured booking hotel listings data.\nRequires a valid booking hotel listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_profiles": {"dataset_id": "gd_lk538t2k2p1k3oos71", "description": "Quickly read structured youtube profiles data.\nRequires a valid youtube profile URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_comments": {"dataset_id": "gd_lk9q0ew71spt1mxywf", "description": "Quickly read structured youtube comments data.\nRequires a valid youtube video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "num_of_comments"], "defaults": {"num_of_comments": "10"}}, "reddit_posts": {"dataset_id": "gd_lvz8ah06191smkebj4", "description": "Quickly read structured reddit posts data.\nRequires a valid reddit post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_videos": {"dataset_id": "gd_lk56epmy2i5g7lzu0k", "description": "Quickly read structured YouTube videos data.\nRequires a valid YouTube video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}}''' DATASETS: Dict[str, Any] = json.loads(DATASETS_JSON) DATASET_FIELDS: Dict[str, List[str]] = {key: value["inputs"] for key, value in DATASETS.items()} DATASET_CHOICES = sorted(DATASETS.keys()) class BrightDataDatasetTool(Tool): name = "brightdata_dataset_fetch" description = "Trigger a Bright Data dataset collection and poll until the snapshot is ready." output_type = "string" def __init__(self, datasets: Optional[Dict[str, Any]] = None) -> None: self.datasets = datasets or DATASETS self.inputs = { "dataset": { "type": "string", "description": "Dataset key", "enum": sorted(self.datasets.keys()), }, "url": { "type": "string", "description": "URL for the dataset", "nullable": True, }, "keyword": { "type": "string", "description": "Search keyword", "nullable": True, }, "first_name": { "type": "string", "description": "First name", "nullable": True, }, "last_name": { "type": "string", "description": "Last name", "nullable": True, }, "days_limit": { "type": "string", "description": "Days limit", "nullable": True, }, "num_of_reviews": { "type": "string", "description": "Number of reviews", "nullable": True, }, "num_of_comments": { "type": "string", "description": "Number of comments", "nullable": True, }, } super().__init__() def forward( self, dataset: str, url: Optional[str] = None, keyword: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, days_limit: Optional[str] = None, num_of_reviews: Optional[str] = None, num_of_comments: Optional[str] = None, ) -> str: try: # Debug logging import sys print(f"[DEBUG forward] Received url parameter: {url!r} (type: {type(url).__name__})", file=sys.stderr) url = self._coerce_url_input(url) print(f"[DEBUG forward] After coerce: {url!r}", file=sys.stderr) api_token = os.getenv("BRIGHT_DATA_API_TOKEN") if not api_token: raise ValueError("BRIGHT_DATA_API_TOKEN not found in environment variables") if dataset not in self.datasets: raise ValueError( f"Unknown dataset '{dataset}'. Valid options: {', '.join(sorted(self.datasets.keys()))}" ) params = self._build_params( url=url, keyword=keyword, first_name=first_name, last_name=last_name, days_limit=days_limit, num_of_reviews=num_of_reviews, num_of_comments=num_of_comments, ) payload = self._prepare_payload(dataset, params) snapshot_id = self._trigger_snapshot(dataset, payload, api_token) data = self._poll_snapshot(snapshot_id, api_token) return json.dumps(data, indent=2) except requests.exceptions.RequestException as exc: details = exc.response.text if getattr(exc, "response", None) is not None else "" return json.dumps({"error": str(exc), "details": details, "payload": payload, "coerced_url": url}) except Exception as exc: return json.dumps({"error": str(exc)}) def _build_params( self, url: Optional[str], keyword: Optional[str], first_name: Optional[str], last_name: Optional[str], days_limit: Optional[str], num_of_reviews: Optional[str], num_of_comments: Optional[str], ) -> Dict[str, str]: params: Dict[str, str] = {} if url is not None: params["url"] = url if keyword is not None: params["keyword"] = keyword if first_name is not None: params["first_name"] = first_name if last_name is not None: params["last_name"] = last_name if days_limit is not None: params["days_limit"] = days_limit if num_of_reviews is not None: params["num_of_reviews"] = num_of_reviews if num_of_comments is not None: params["num_of_comments"] = num_of_comments return params def _prepare_payload(self, dataset_key: str, params: Dict[str, str]) -> Dict[str, str]: config = self.datasets[dataset_key] payload: Dict[str, str] = {} defaults = config.get("defaults", {}) fixed_values = config.get("fixed_values", {}) for field in config["inputs"]: if field in params: payload[field] = params[field] elif field in defaults: payload[field] = defaults[field] else: raise ValueError(f"Missing required field '{field}' for dataset '{dataset_key}'") payload.update(fixed_values) return payload def _trigger_snapshot(self, dataset_key: str, payload: Dict[str, str], api_token: str) -> str: dataset_id = self.datasets[dataset_key]["dataset_id"] trigger_url = "https://api.brightdata.com/datasets/v3/trigger" response = requests.post( trigger_url, params={"dataset_id": dataset_id, "include_errors": "true"}, json=[payload], headers={ "Authorization": f"Bearer {api_token}", "Content-Type": "application/json", }, timeout=60, ) response.raise_for_status() snapshot_id = response.json().get("snapshot_id") if not snapshot_id: raise RuntimeError("No snapshot ID returned from Bright Data.") return snapshot_id def _poll_snapshot(self, snapshot_id: str, api_token: str) -> Any: snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}" max_attempts = 600 attempts = 0 while attempts < max_attempts: response = requests.get( snapshot_url, params={"format": "json"}, headers={"Authorization": f"Bearer {api_token}"}, timeout=30, ) if response.status_code == 400: response.raise_for_status() data = response.json() if isinstance(data, list): return data status = data.get("status") if isinstance(data, dict) else None if status not in {"running", "building"}: return data attempts += 1 time.sleep(1) raise TimeoutError(f"Timeout waiting for snapshot {snapshot_id} after {max_attempts} seconds") def _coerce_url_input(self, raw: Optional[Any]) -> Optional[str]: import sys print(f"[DEBUG _coerce_url_input] Input: {raw!r} (type: {type(raw).__name__})", file=sys.stderr) if raw is None: return None if isinstance(raw, str): if raw.strip().startswith("{") and "orig_name" in raw: parsed = self._parse_file_dict_string(raw) if parsed: raw = parsed else: return self._extract_url_from_text(raw) else: return self._extract_url_from_text(raw) if isinstance(raw, dict): # Check if this is a Gradio FileData with a path to read path_value = raw.get("path") if isinstance(path_value, str) and os.path.isfile(path_value): # Read the file content (smolagents writes URL as file content) file_content = self._read_text_file(path_value) import sys print(f"[DEBUG _coerce_url_input] File content from {path_value}: {file_content!r}", file=sys.stderr) if file_content: extracted = self._extract_url_from_text(file_content) print(f"[DEBUG _coerce_url_input] Extracted URL: {extracted!r}", file=sys.stderr) if extracted: return extracted # Check for direct url field (common in Gradio FileData from smolagents) url_value = raw.get("url") if isinstance(url_value, str): if url_value.startswith(("http://", "https://")): return url_value if url_value.startswith("/gradio_api/file="): # Do not parse HTML/CSS file contents; treat as no URL. return None extracted = self._extract_url_from_text(url_value) if extracted: return extracted # Fallback: check original text name fields if present for key in ("orig_name", "name"): candidate = raw.get(key) if isinstance(candidate, str) and candidate: extracted = self._extract_url_from_text(candidate) if extracted: return extracted return None return None def _ensure_scheme(self, url: str) -> str: if url.startswith(("http://", "https://")): return url return f"https://{url}" def _parse_file_dict_string(self, value: str) -> Optional[dict]: try: parsed = ast.literal_eval(value) return parsed if isinstance(parsed, dict) else None except (ValueError, SyntaxError): return None def _read_text_file(self, path: str) -> Optional[str]: if not os.path.isfile(path): return None try: with open(path, "r", encoding="utf-8", errors="ignore") as fh: return fh.read() except OSError: return None def _extract_url_from_text(self, text: str) -> Optional[str]: if not text: return None # If text looks like HTML, try to extract canonical URL first if text.strip().startswith(("]+)", text) if match: return match.group(1) # domain/path without scheme match_domain = re.search(r"\b([A-Za-z0-9.-]+\.[A-Za-z]{2,}(?:/[^\s\"'<>]*)?)", text) if match_domain: return self._ensure_scheme(match_domain.group(1)) return None def _get_gradio_app_code(self, tool_module_name: str = "tool") -> str: choices = sorted(self.datasets.keys()) dataset_fields = {key: value["inputs"] for key, value in self.datasets.items()} return f"""import gradio as gr import importlib BrightDataDatasetTool = importlib.import_module("{tool_module_name}").BrightDataDatasetTool tool = BrightDataDatasetTool() DATASET_FIELDS = {dataset_fields} CHOICES = {choices} def toggle_fields(selected): inputs = ["url", "keyword", "first_name", "last_name", "days_limit", "num_of_reviews", "num_of_comments"] wanted = set(DATASET_FIELDS.get(selected, [])) def vis(name): return gr.update(visible=name in wanted) return tuple(vis(name) for name in inputs) def run(dataset, url, keyword, first_name, last_name, days_limit, num_of_reviews, num_of_comments): return tool( dataset=dataset, url=url, keyword=keyword, first_name=first_name, last_name=last_name, days_limit=days_limit, num_of_reviews=num_of_reviews, num_of_comments=num_of_comments, ) with gr.Blocks() as demo: gr.Markdown("### Bright Data dataset fetch") dataset = gr.Dropdown(choices=CHOICES, label="Dataset", value=CHOICES[0]) url = gr.Textbox(label="URL", placeholder="https://...", visible=True) keyword = gr.Textbox(label="Keyword", visible=False) first_name = gr.Textbox(label="First name", visible=False) last_name = gr.Textbox(label="Last name", visible=False) days_limit = gr.Textbox(label="Days limit (e.g. 3)", visible=False) num_of_reviews = gr.Textbox(label="Number of reviews", visible=False) num_of_comments = gr.Textbox(label="Number of comments", visible=False) dataset.change( toggle_fields, inputs=[dataset], outputs=[url, keyword, first_name, last_name, days_limit, num_of_reviews, num_of_comments], ) run_btn = gr.Button("Run") output = gr.Textbox(label="Output", lines=12) run_btn.click( run, inputs=[dataset, url, keyword, first_name, last_name, days_limit, num_of_reviews, num_of_comments], outputs=output, ) demo.launch() """