# -*- coding: utf-8 -*- """ OpenResearcher DeepSearch Agent - Hugging Face Space Uses ZeroGPU for efficient inference with the Nemotron model Aligned with app_local.py frontend and logic """ import os import gradio as gr import httpx import json import json5 import re import time import html import asyncio from datetime import datetime from typing import List, Dict, Any, Optional, Tuple, Generator import traceback import base64 from transformers import AutoTokenizer from gradio_client import Client as GradioClient try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # ============================================================ # Configuration # ============================================================ MODEL_NAME = os.getenv("MODEL_NAME", "alias-fast") REMOTE_API_BASE = os.getenv("REMOTE_API_BASE", "https://api.helmholtz-blablador.fz-juelich.de/v1") BLABLADOR_API_KEY = os.getenv("BLABLADOR_API_KEY", "") MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "4096")) # ============================================================ # System Prompt & Tools # ============================================================ DEVELOPER_CONTENT = """ You are a helpful assistant and harmless assistant. You will be able to use a set of browsering tools to answer user queries. Tool for browsing. The `cursor` appears in brackets before each browsing display: `[{cursor}]`. Cite information from the tool using the following format: `【{cursor}†L{line_start}(-L{line_end})?】`, for example: `【6†L9-L11】` or `【8†L3】`. Do not quote more than 10 words directly from the tool output. sources=web """.strip() TOOL_CONTENT = """ [ { "type": "function", "function": { "name": "browser.search", "description": "Searches for information related to a query and displays top N results. Returns a list of search results with titles, URLs, and summaries.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query string" }, "topn": { "type": "integer", "description": "Number of results to display", "default": 10 } }, "required": [ "query" ] } } }, { "type": "function", "function": { "name": "browser.open", "description": "Opens a link from the current page or a fully qualified URL. Can scroll to a specific location and display a specific number of lines. Valid link ids are displayed with the formatting: 【{id}†.*】.", "parameters": { "type": "object", "properties": { "id": { "type": [ "integer", "string" ], "description": "Link id from current page (integer) or fully qualified URL (string). Default is -1 (most recent page)", "default": -1 }, "cursor": { "type": "integer", "description": "Page cursor to operate on. If not provided, the most recent page is implied", "default": -1 }, "loc": { "type": "integer", "description": "Starting line number. If not provided, viewport will be positioned at the beginning or centered on relevant passage", "default": -1 }, "num_lines": { "type": "integer", "description": "Number of lines to display", "default": -1 }, "view_source": { "type": "boolean", "description": "Whether to view page source", "default": false }, "source": { "type": "string", "description": "The source identifier (e.g., 'web')" } }, "required": [] } } }, { "type": "function", "function": { "name": "browser.find", "description": "Finds exact matches of a pattern in the current page or a specified page by cursor.", "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "The exact text pattern to search for" }, "cursor": { "type": "integer", "description": "Page cursor to search in. If not provided, searches in the current page", "default": -1 } }, "required": [ "pattern" ] } } } ] """.strip() # ============================================================ # Browser Tool Implementation # ============================================================ class SimpleBrowser: """Browser tool using victor/websearch Gradio API.""" def __init__(self): self.pages: Dict[str, Dict] = {} self.page_stack: List[str] = [] self.link_map: Dict[int, Dict] = {} # Map from cursor ID (int) to {url, title} self.used_citations = [] # List of cursor IDs (int) in order of first appearance try: # victor/websearch is a public space, but we can pass token if available hf_token = os.getenv("HF_TOKEN", "") # Use 'token' instead of 'hf_token' for Gradio Client self.client = GradioClient("victor/websearch", token=hf_token if hf_token else None) except Exception as e: print(f"Error initializing Gradio client: {e}") self.client = None @property def current_cursor(self) -> int: return len(self.page_stack) - 1 def add_link(self, cursor: int, url: str, title: str = ""): self.link_map[cursor] = {'url': url, 'title': title} def get_link_info(self, cursor: int) -> Optional[dict]: return self.link_map.get(cursor) def get_citation_index(self, cursor: int) -> int: if cursor not in self.used_citations: self.used_citations.append(cursor) return self.used_citations.index(cursor) def get_page_info(self, cursor: int) -> Optional[Dict[str, str]]: if cursor in self.link_map: return self.link_map[cursor] if 0 <= cursor < len(self.page_stack): url = self.page_stack[cursor] page = self.pages.get(url) if page: return {'url': url, 'title': page.get('title', '')} return None def _format_line_numbers(self, text: str, offset: int = 0) -> str: lines = text.split('\n') return '\n'.join(f"L{i + offset}: {line}" for i, line in enumerate(lines)) def _parse_websearch_output(self, output: str) -> List[Dict]: results = [] # Split by the separator ---, handling potential variations in newlines parts = re.split(r'\n---\n|^\s*---\s*$', output, flags=re.MULTILINE) for part in parts: part = part.strip() if not part or "Successfully extracted content" in part: continue title_match = re.search(r'## (.*)', part) domain_match = re.search(r'\*\*Domain:\*\* (.*)', part) url_match = re.search(r'\*\*URL:\*\* (.*)', part) if title_match and url_match: title = title_match.group(1).strip() url = url_match.group(1).strip() domain = domain_match.group(1).strip() if domain_match else "" # Content starts after metadata metadata_end = url_match.end() content = part[metadata_end:].strip() results.append({ 'title': title, 'url': url, 'domain': domain, 'content': content }) return results async def search(self, query: str, topn: int = 4) -> str: if not self.client: return "Error: Search client not initialized" try: # Call the Gradio API loop = asyncio.get_event_loop() result_str = await loop.run_in_executor( None, lambda: self.client.predict( query=query, search_type="search", num_results=topn, api_name="/search_web" ) ) results = self._parse_websearch_output(result_str) if not results: return f"No results found for: '{query}'" # Populate pages and link_map new_link_map = {} lines = [] for i, r in enumerate(results): title = r['title'] url = r['url'] domain = r['domain'] content = r['content'] # Create a snippet for the search result view snippet = content[:200].replace('\n', ' ') + "..." self.link_map[i] = {'url': url, 'title': title} new_link_map[i] = {'url': url, 'title': title} # Cache the full content self.pages[url] = { 'url': url, 'title': title, 'text': content } link_text = f"【{i}†{title}†{domain}】" if domain else f"【{i}†{title}】" lines.append(f"{link_text}") lines.append(f" {snippet}") lines.append("") formatted_content = '\n'.join(lines) pseudo_url = f"web-search://q={query}&ts={int(time.time())}" cursor = self.current_cursor + 1 self.pages[pseudo_url] = { 'url': pseudo_url, 'title': f"Search Results: {query}", 'text': formatted_content, 'urls': {str(k): v['url'] for k, v in new_link_map.items()} } self.page_stack.append(pseudo_url) header = f"Search Results: {query} ({pseudo_url})\n**viewing lines [0 - {len(formatted_content.split(chr(10)))-1}]**\n\n" body = self._format_line_numbers(formatted_content) return f"[{cursor}] {header}{body}" except Exception as e: return f"Error during search: {str(e)}" async def open(self, id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, **kwargs) -> str: target_url = None if isinstance(id, str) and id.startswith("http"): target_url = id elif isinstance(id, int) and id >= 0: info = self.link_map.get(id) target_url = info['url'] if info else None if not target_url: return f"Error: Invalid link id '{id}'. Available: {list(self.link_map.keys())}" elif cursor >= 0 and cursor < len(self.page_stack): page_url = self.page_stack[cursor] page = self.pages.get(page_url) if page: text = page['text'] lines = text.split('\n') start = max(0, loc) if loc >= 0 else 0 end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines) header = f"{page['title']} ({page['url']})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n" body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start) return f"[{cursor}] {header}{body}" else: return "Error: No valid target specified" if not target_url: return "Error: Could not determine target URL" # Check if we already have the page content cached if target_url in self.pages: page = self.pages[target_url] text = page['text'] lines = text.split('\n') new_cursor = self.current_cursor + 1 self.page_stack.append(target_url) start = max(0, loc) if loc >= 0 else 0 end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines) header = f"{page['title']} ({target_url})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n" body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start) return f"[{new_cursor}] {header}{body}" return f"Error: Content for {target_url} not found in search results. The current search API only provides content for pages returned in search results." def find(self, pattern: str, cursor: int = -1) -> str: if not self.page_stack: return "Error: No page open" page_url = self.page_stack[cursor] if cursor >= 0 and cursor < len(self.page_stack) else self.page_stack[-1] page = self.pages.get(page_url) if not page: return "Error: Page not found" text = page['text'] lines = text.split('\n') matches = [] for i, line in enumerate(lines): if str(pattern).lower() in line.lower(): start = max(0, i - 1) end = min(len(lines), i + 3) context = '\n'.join(f"L{j}: {lines[j]}" for j in range(start, end)) matches.append(f"# 【{len(matches)}†match at L{i}】\n{context}") if len(matches) >= 10: break if not matches: return f"No matches found for: '{pattern}'" result_url = f"{page_url}/find?pattern={pattern}" new_cursor = self.current_cursor + 1 result_content = '\n\n'.join(matches) page_data = { 'url': result_url, 'title': f"Find results for: '{pattern}'", 'text': result_content, 'urls': {} } self.pages[result_url] = page_data self.page_stack.append(result_url) header = f"Find results for text: `{pattern}` in `{page['title']}`\n\n" return f"[{new_cursor}] {header}{result_content}" def get_cursor_url(self, cursor: int) -> Optional[str]: if cursor >= 0 and cursor < len(self.page_stack): return self.page_stack[cursor] return None # ============================================================ # Tokenizer Loading # ============================================================ tokenizer = None def load_tokenizer(): global tokenizer if tokenizer is None: # We use Nemotron as a proxy tokenizer for token counting token_model = "OpenResearcher/Nemotron-3-Nano-30B-A3B" print(f"Loading tokenizer: {token_model}") try: tokenizer = AutoTokenizer.from_pretrained( token_model, trust_remote_code=True ) print("Tokenizer loaded successfully!") except Exception as e: print(f"Error loading tokenizer: {e}") import traceback traceback.print_exc() raise return tokenizer # ============================================================ # Text Processing # ============================================================ def extract_thinking(text: str) -> Tuple[Optional[str], str]: reasoning_content = None content = text if '' in content and '' in content: match = re.search(r'(.*?)', content, re.DOTALL) if match: reasoning_content = match.group(1).strip() content = content.replace(match.group(0), "").strip() elif '' in content: match = re.search(r'^(.*?)', content, re.DOTALL) if match: reasoning_content = match.group(1).strip() content = content.replace(match.group(0), "").strip() return reasoning_content, content def parse_tool_call(text: str) -> Tuple[Optional[Dict], str]: tool_call_text = None content = text if '' in content and '' in content: match = re.search(r'(.*?)', content, re.DOTALL) if match: tool_call_text = match.group(1).strip() content = content.replace(match.group(0), "").strip() elif '' in content: match = re.search(r'^(.*?)', content, re.DOTALL) if match: tool_call_text = match.group(1).strip() content = content.replace(match.group(0), "").strip() if tool_call_text: try: if "```json" in tool_call_text: tool_call_text = tool_call_text.split("```json")[1].split("```")[0].strip() elif "```" in tool_call_text: tool_call_text = tool_call_text.split("```")[1].split("```")[0].strip() parsed = json5.loads(tool_call_text) return parsed, content except: pass func_match = re.search(r'', tool_call_text) if func_match: tool_name = func_match.group(1) tool_args = {} params = re.finditer(r'\s*(.*?)\s*', tool_call_text, re.DOTALL) for p in params: param_name = p.group(1) param_value = p.group(2).strip() if param_value.startswith('"') and param_value.endswith('"'): param_value = param_value[1:-1] try: if param_value.isdigit(): param_value = int(param_value) except: pass tool_args[param_name] = param_value return {"name": tool_name, "arguments": tool_args}, content return None, content def is_final_answer(text: str) -> bool: t = text.lower() return ( ('' in t and '' in t) or 'final answer:' in t or ('exact answer:' in t and 'confidence:' in t) ) # ============================================================ # HTML Rendering Helpers (From app_local.py) # ============================================================ def render_citations(text: str, browser: SimpleBrowser) -> str: """Convert citation markers to clickable HTML links.""" def replace_citation(m): cursor_str = m.group(1) # l1 = m.group(2) # l2 = m.group(3) try: cursor = int(cursor_str) index = browser.get_citation_index(cursor) # Check if we have URL info info = browser.get_page_info(cursor) if info and info.get('url'): # Return clickable index link pointing to reference section # Aligned with generate_html_example.py style (green via CSS class) url = info.get('url') return f'[{index}]' # Fallback if no URL return f'[{index}]' except Exception as e: # print(f"Error in replace_citation: {e}, match: {m.group(0)}") pass return m.group(0) # First pass: replace citations with linked citations result = re.sub(r'[【\[](\d+)†.*?[】\]]', replace_citation, text) # Second pass: Deduplicate adjacent identical citations # Matches: [N] followed by optional whitespace and same link # We repeat this until no more changes to handle multiple duplicates while True: new_result = re.sub(r'(]+>\[\d+\])(\s*)\1', r'\1', result) if new_result == result: break result = new_result # Convert basic markdown to HTML result = re.sub(r'\*\*(.+?)\*\*', r'\1', result) result = re.sub(r'\*(.+?)\*', r'\1', result) result = re.sub(r'`(.+?)`', r'\1', result) result = result.replace('\n\n', '

').replace('\n', '
') if not result.startswith('

'): result = f'

{result}

' return result def render_thinking_streaming(text: str) -> str: """Render thinking content in streaming mode (visible, with animation).""" escaped = html.escape(text) return f'
{escaped}
' def render_thinking_collapsed(text: str) -> str: """Render thinking content in collapsed mode after completion.""" escaped = html.escape(text) preview = text[:100] + "..." if len(text) > 100 else text preview_escaped = html.escape(preview) return f'''
Thought process: "{preview_escaped}"
{escaped}
''' def render_tool_call(fn_name: str, args: dict, browser: SimpleBrowser = None) -> str: """Render a tool call card with unified format and subtle distinction.""" border_colors = { "browser.search": "#667eea", "browser.open": "#4facfe", "browser.find": "#fa709a" } border_color = border_colors.get(fn_name, "#9ca3af") if fn_name == "browser.search": query = str(args.get('query', '')) return f'''
Searching the web
Query: "{html.escape(query)}"
''' elif fn_name == "browser.open": link_id = args.get('id', '') url_info = "" if browser and isinstance(link_id, int) and link_id >= 0: info = browser.link_map.get(link_id) url = info.get('url', "") if info else "" if url: try: domain = url.split('/')[2] url_info = f" → {domain}" except: url_info = "" return f'''
Opening page
Link #{link_id}{url_info}
''' elif fn_name == "browser.find": pattern = str(args.get('pattern', '')) return f'''
Finding in page
Pattern: "{html.escape(pattern)}"
''' else: return f'''
{html.escape(str(fn_name))}
{html.escape(json.dumps(args))}
''' def render_tool_result(result: str, fn_name: str) -> str: """Render tool result in an expanded card with direct HTML rendering.""" import uuid tool_label = { "browser.search": "🔍 Search Results", "browser.open": "📄 Page Content", "browser.find": "🔎 Find Results" }.get(fn_name, "📋 Result") border_colors = { "browser.search": "#667eea", "browser.open": "#4facfe", "browser.find": "#86efac" } border_color = border_colors.get(fn_name, "#9ca3af") # ===== SEARCH RESULTS ===== if fn_name == "browser.search" and "" in result and "