from typing import Any, Optional from smolagents.tools import Tool import json import time import os import requests class BrightDataDatasetTool(Tool): name = "brightdata_dataset_fetch" description = "Trigger a Bright Data dataset collection and poll until the snapshot is ready. Choose a dataset key (e.g., amazon_product, linkedin_company_profile, google_maps_reviews). For most datasets, you only need to provide the URL parameter. For example: brightdata_dataset_fetch(dataset='linkedin_person_profile', url='https://linkedin.com/in/...')" output_type = "string" def __init__(self): # Keep dataset catalogue on the instance and build the inputs schema dynamically to satisfy tool validation. self.datasets = globals().get("DATASETS") if not self.datasets: import json fallback_json = r'{"amazon_product": {"dataset_id": "gd_l7q7dkf244hwjntr0", "description": "Quickly read structured amazon product data.\nRequires a valid product URL with /dp/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "amazon_product_reviews": {"dataset_id": "gd_le8e811kzy4ggddlq", "description": "Quickly read structured amazon product review data.\nRequires a valid product URL with /dp/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "amazon_product_search": {"dataset_id": "gd_lwdb4vjm1ehb499uxs", "description": "Quickly read structured amazon product search data.\nRequires a valid search keyword and amazon domain URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["keyword", "url"], "fixed_values": {"pages_to_search": "1"}}, "walmart_product": {"dataset_id": "gd_l95fol7l1ru6rlo116", "description": "Quickly read structured walmart product data.\nRequires a valid product URL with /ip/ in it.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "walmart_seller": {"dataset_id": "gd_m7ke48w81ocyu4hhz0", "description": "Quickly read structured walmart seller data.\nRequires a valid walmart seller URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "ebay_product": {"dataset_id": "gd_ltr9mjt81n0zzdk1fb", "description": "Quickly read structured ebay product data.\nRequires a valid ebay product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "homedepot_products": {"dataset_id": "gd_lmusivh019i7g97q2n", "description": "Quickly read structured homedepot product data.\nRequires a valid homedepot product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zara_products": {"dataset_id": "gd_lct4vafw1tgx27d4o0", "description": "Quickly read structured zara product data.\nRequires a valid zara product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "etsy_products": {"dataset_id": "gd_ltppk0jdv1jqz25mz", "description": "Quickly read structured etsy product data.\nRequires a valid etsy product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "bestbuy_products": {"dataset_id": "gd_ltre1jqe1jfr7cccf", "description": "Quickly read structured bestbuy product data.\nRequires a valid bestbuy product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_person_profile": {"dataset_id": "gd_l1viktl72bvl7bjuj0", "description": "Quickly read structured linkedin people profile data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_company_profile": {"dataset_id": "gd_l1vikfnt1wgvvqz95w", "description": "Quickly read structured linkedin company profile data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_job_listings": {"dataset_id": "gd_lpfll7v5hcqtkxl6l", "description": "Quickly read structured linkedin job listings data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_posts": {"dataset_id": "gd_lyy3tktm25m4avu764", "description": "Quickly read structured linkedin posts data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "linkedin_people_search": {"dataset_id": "gd_m8d03he47z8nwb5xc", "description": "Quickly read structured linkedin people search data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "first_name", "last_name"]}, "crunchbase_company": {"dataset_id": "gd_l1vijqt9jfj7olije", "description": "Quickly read structured crunchbase company data.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zoominfo_company_profile": {"dataset_id": "gd_m0ci4a4ivx3j5l6nx", "description": "Quickly read structured ZoomInfo company profile data.\nRequires a valid ZoomInfo company URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_profiles": {"dataset_id": "gd_l1vikfch901nx3by4", "description": "Quickly read structured Instagram profile data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_posts": {"dataset_id": "gd_lk5ns7kz21pck8jpis", "description": "Quickly read structured Instagram post data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_reels": {"dataset_id": "gd_lyclm20il4r5helnj", "description": "Quickly read structured Instagram reel data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "instagram_comments": {"dataset_id": "gd_ltppn085pokosxh13", "description": "Quickly read structured Instagram comments data.\nRequires a valid Instagram URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_posts": {"dataset_id": "gd_lyclm1571iy3mv57zw", "description": "Quickly read structured Facebook post data.\nRequires a valid Facebook post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_marketplace_listings": {"dataset_id": "gd_lvt9iwuh6fbcwmx1a", "description": "Quickly read structured Facebook marketplace listing data.\nRequires a valid Facebook marketplace listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "facebook_company_reviews": {"dataset_id": "gd_m0dtqpiu1mbcyc2g86", "description": "Quickly read structured Facebook company reviews data.\nRequires a valid Facebook company URL and number of reviews.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "num_of_reviews"]}, "facebook_events": {"dataset_id": "gd_m14sd0to1jz48ppm51", "description": "Quickly read structured Facebook events data.\nRequires a valid Facebook event URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_profiles": {"dataset_id": "gd_l1villgoiiidt09ci", "description": "Quickly read structured Tiktok profiles data.\nRequires a valid Tiktok profile URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_posts": {"dataset_id": "gd_lu702nij2f790tmv9h", "description": "Quickly read structured Tiktok post data.\nRequires a valid Tiktok post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_shop": {"dataset_id": "gd_m45m1u911dsa4274pi", "description": "Quickly read structured Tiktok shop data.\nRequires a valid Tiktok shop product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "tiktok_comments": {"dataset_id": "gd_lkf2st302ap89utw5k", "description": "Quickly read structured Tiktok comments data.\nRequires a valid Tiktok video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "google_maps_reviews": {"dataset_id": "gd_luzfs1dn2oa0teb81", "description": "Quickly read structured Google maps reviews data.\nRequires a valid Google maps URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "days_limit"], "defaults": {"days_limit": "3"}}, "google_shopping": {"dataset_id": "gd_ltppk50q18kdw67omz", "description": "Quickly read structured Google shopping data.\nRequires a valid Google shopping product URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "google_play_store": {"dataset_id": "gd_lsk382l8xei8vzm4u", "description": "Quickly read structured Google play store data.\nRequires a valid Google play store app URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "apple_app_store": {"dataset_id": "gd_lsk9ki3u2iishmwrui", "description": "Quickly read structured apple app store data.\nRequires a valid apple app store app URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "reuter_news": {"dataset_id": "gd_lyptx9h74wtlvpnfu", "description": "Quickly read structured reuter news data.\nRequires a valid reuter news report URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "github_repository_file": {"dataset_id": "gd_lyrexgxc24b3d4imjt", "description": "Quickly read structured github repository data.\nRequires a valid github repository file URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "yahoo_finance_business": {"dataset_id": "gd_lmrpz3vxmz972ghd7", "description": "Quickly read structured yahoo finance business data.\nRequires a valid yahoo finance business URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "x_posts": {"dataset_id": "gd_lwxkxvnf1cynvib9co", "description": "Quickly read structured X post data.\nRequires a valid X post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "zillow_properties_listing": {"dataset_id": "gd_lfqkr8wm13ixtbd8f5", "description": "Quickly read structured zillow properties listing data.\nRequires a valid zillow properties listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "booking_hotel_listings": {"dataset_id": "gd_m5mbdl081229ln6t4a", "description": "Quickly read structured booking hotel listings data.\nRequires a valid booking hotel listing URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_profiles": {"dataset_id": "gd_lk538t2k2p1k3oos71", "description": "Quickly read structured youtube profiles data.\nRequires a valid youtube profile URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_comments": {"dataset_id": "gd_lk9q0ew71spt1mxywf", "description": "Quickly read structured youtube comments data.\nRequires a valid youtube video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url", "num_of_comments"], "defaults": {"num_of_comments": "10"}}, "reddit_posts": {"dataset_id": "gd_lvz8ah06191smkebj4", "description": "Quickly read structured reddit posts data.\nRequires a valid reddit post URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}, "youtube_videos": {"dataset_id": "gd_lk56epmy2i5g7lzu0k", "description": "Quickly read structured YouTube videos data.\nRequires a valid YouTube video URL.\nThis can be a cache lookup, so it can be more reliable than scraping.", "inputs": ["url"]}}' self.datasets = json.loads(fallback_json) self.inputs = { "dataset": { "type": "string", "description": "Dataset key", # Provide choices so UI renders a dropdown instead of a long list. "enum": sorted(self.datasets.keys()), }, "url": { "type": "string", "description": "URL for the dataset (required for most datasets)", "nullable": True, }, "keyword": { "type": "string", "description": "Search keyword (for search datasets like amazon_product_search)", "nullable": True, }, "first_name": { "type": "string", "description": "First name (for datasets like linkedin_people_search)", "nullable": True, }, "last_name": { "type": "string", "description": "Last name (for datasets like linkedin_people_search)", "nullable": True, }, "days_limit": { "type": "string", "description": "Days limit (for datasets like google_maps_reviews, default: 3)", "nullable": True, }, "num_of_reviews": { "type": "string", "description": "Number of reviews (for datasets like facebook_company_reviews)", "nullable": True, }, "num_of_comments": { "type": "string", "description": "Number of comments (for datasets like youtube_comments, default: 10)", "nullable": True, }, } super().__init__() def _prepare_payload(self, dataset_key: str, params): """Validate required fields, apply defaults, and merge fixed values.""" config = self.datasets[dataset_key] payload = {} defaults = config.get("defaults", {}) fixed_values = config.get("fixed_values", {}) for field in config["inputs"]: if field in params: payload[field] = params[field] elif field in defaults: payload[field] = defaults[field] else: raise ValueError(f"Missing required field '{field}' for dataset '{dataset_key}'") # Apply fixed values that should always be sent payload.update(fixed_values) return payload def forward( self, dataset: str, url: str = None, keyword: str = None, first_name: str = None, last_name: str = None, days_limit: str = None, num_of_reviews: str = None, num_of_comments: str = None, ) -> str: """ Trigger a dataset run and poll until results are ready. Args: dataset: The dataset key from DATASETS. url: URL for the dataset (required for most datasets). keyword: Search keyword (for search datasets). first_name: First name (for people search datasets). last_name: Last name (for people search datasets). days_limit: Days limit (for time-based datasets). num_of_reviews: Number of reviews to fetch. num_of_comments: Number of comments to fetch. Returns: JSON string of the snapshot data once ready. """ import os import json import time import requests api_token = os.getenv("BRIGHT_DATA_API_TOKEN") if not api_token: raise ValueError("BRIGHT_DATA_API_TOKEN not found in environment variables") if dataset not in self.datasets: raise ValueError(f"Unknown dataset '{dataset}'. Valid options: {', '.join(sorted(self.datasets.keys()))}") # Build params dict from provided arguments params = {} if url is not None: params["url"] = url if keyword is not None: params["keyword"] = keyword if first_name is not None: params["first_name"] = first_name if last_name is not None: params["last_name"] = last_name if days_limit is not None: params["days_limit"] = days_limit if num_of_reviews is not None: params["num_of_reviews"] = num_of_reviews if num_of_comments is not None: params["num_of_comments"] = num_of_comments payload = self._prepare_payload(dataset, params) dataset_id = self.datasets[dataset]["dataset_id"] trigger_url = "https://api.brightdata.com/datasets/v3/trigger" trigger_headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json", } trigger_response = requests.post( trigger_url, params={"dataset_id": dataset_id, "include_errors": "true"}, json=[payload], headers=trigger_headers, timeout=60, ) trigger_response.raise_for_status() snapshot_id = trigger_response.json().get("snapshot_id") if not snapshot_id: raise RuntimeError("No snapshot ID returned from Bright Data.") # Poll for completion (up to 10 minutes, matching MCP logic) snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}" max_attempts = 600 attempts = 0 while attempts < max_attempts: try: response = requests.get( snapshot_url, params={"format": "json"}, headers={"Authorization": f"Bearer {api_token}"}, timeout=30, ) # If Bright Data returns an error response we don't want to loop forever if response.status_code == 400: response.raise_for_status() data = response.json() if isinstance(data, list): return json.dumps(data, indent=2) status = data.get("status") if isinstance(data, dict) else None if status not in {"running", "building"}: return json.dumps(data, indent=2) attempts += 1 time.sleep(1) except requests.exceptions.RequestException as exc: # Mirror JS logic: tolerate transient failures, but break on 400 if getattr(getattr(exc, "response", None), "status_code", None) == 400: raise attempts += 1 time.sleep(1) raise TimeoutError(f"Timeout waiting for snapshot {snapshot_id} after {max_attempts} seconds")