# -*- coding: utf-8 -*-
"""
OpenResearcher DeepSearch Agent - Hugging Face Space
Uses ZeroGPU for efficient inference with the Nemotron model
Aligned with app_local.py frontend and logic
"""
import os
import gradio as gr
import httpx
import json
import json5
import re
import time
import html
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple, Generator
import traceback
import base64
from transformers import AutoTokenizer
from gradio_client import Client as GradioClient
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# ============================================================
# Configuration
# ============================================================
MODEL_NAME = os.getenv("MODEL_NAME", "alias-fast")
REMOTE_API_BASE = os.getenv("REMOTE_API_BASE", "https://api.helmholtz-blablador.fz-juelich.de/v1")
BLABLADOR_API_KEY = os.getenv("BLABLADOR_API_KEY", "")
MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "4096"))
# ============================================================
# System Prompt & Tools
# ============================================================
DEVELOPER_CONTENT = """
You are a helpful assistant and harmless assistant.
You will be able to use a set of browsering tools to answer user queries.
Tool for browsing.
The `cursor` appears in brackets before each browsing display: `[{cursor}]`.
Cite information from the tool using the following format:
`【{cursor}†L{line_start}(-L{line_end})?】`, for example: `【6†L9-L11】` or `【8†L3】`.
Do not quote more than 10 words directly from the tool output.
sources=web
""".strip()
TOOL_CONTENT = """
[
{
"type": "function",
"function": {
"name": "browser.search",
"description": "Searches for information related to a query and displays top N results. Returns a list of search results with titles, URLs, and summaries.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query string"
},
"topn": {
"type": "integer",
"description": "Number of results to display",
"default": 10
}
},
"required": [
"query"
]
}
}
},
{
"type": "function",
"function": {
"name": "browser.open",
"description": "Opens a link from the current page or a fully qualified URL. Can scroll to a specific location and display a specific number of lines. Valid link ids are displayed with the formatting: 【{id}†.*】.",
"parameters": {
"type": "object",
"properties": {
"id": {
"type": [
"integer",
"string"
],
"description": "Link id from current page (integer) or fully qualified URL (string). Default is -1 (most recent page)",
"default": -1
},
"cursor": {
"type": "integer",
"description": "Page cursor to operate on. If not provided, the most recent page is implied",
"default": -1
},
"loc": {
"type": "integer",
"description": "Starting line number. If not provided, viewport will be positioned at the beginning or centered on relevant passage",
"default": -1
},
"num_lines": {
"type": "integer",
"description": "Number of lines to display",
"default": -1
},
"view_source": {
"type": "boolean",
"description": "Whether to view page source",
"default": false
},
"source": {
"type": "string",
"description": "The source identifier (e.g., 'web')"
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "browser.find",
"description": "Finds exact matches of a pattern in the current page or a specified page by cursor.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "The exact text pattern to search for"
},
"cursor": {
"type": "integer",
"description": "Page cursor to search in. If not provided, searches in the current page",
"default": -1
}
},
"required": [
"pattern"
]
}
}
}
]
""".strip()
# ============================================================
# Browser Tool Implementation
# ============================================================
class SimpleBrowser:
"""Browser tool using victor/websearch Gradio API."""
def __init__(self):
self.pages: Dict[str, Dict] = {}
self.page_stack: List[str] = []
self.link_map: Dict[int, Dict] = {} # Map from cursor ID (int) to {url, title}
self.used_citations = [] # List of cursor IDs (int) in order of first appearance
try:
# victor/websearch is a public space, but we can pass token if available
hf_token = os.getenv("HF_TOKEN", "")
# Use 'token' instead of 'hf_token' for Gradio Client
self.client = GradioClient("victor/websearch", token=hf_token if hf_token else None)
except Exception as e:
print(f"Error initializing Gradio client: {e}")
self.client = None
@property
def current_cursor(self) -> int:
return len(self.page_stack) - 1
def add_link(self, cursor: int, url: str, title: str = ""):
self.link_map[cursor] = {'url': url, 'title': title}
def get_link_info(self, cursor: int) -> Optional[dict]:
return self.link_map.get(cursor)
def get_citation_index(self, cursor: int) -> int:
if cursor not in self.used_citations:
self.used_citations.append(cursor)
return self.used_citations.index(cursor)
def get_page_info(self, cursor: int) -> Optional[Dict[str, str]]:
if cursor in self.link_map:
return self.link_map[cursor]
if 0 <= cursor < len(self.page_stack):
url = self.page_stack[cursor]
page = self.pages.get(url)
if page:
return {'url': url, 'title': page.get('title', '')}
return None
def _format_line_numbers(self, text: str, offset: int = 0) -> str:
lines = text.split('\n')
return '\n'.join(f"L{i + offset}: {line}" for i, line in enumerate(lines))
def _parse_websearch_output(self, output: str) -> List[Dict]:
results = []
# Split by the separator ---, handling potential variations in newlines
parts = re.split(r'\n---\n|^\s*---\s*$', output, flags=re.MULTILINE)
for part in parts:
part = part.strip()
if not part or "Successfully extracted content" in part:
continue
title_match = re.search(r'## (.*)', part)
domain_match = re.search(r'\*\*Domain:\*\* (.*)', part)
url_match = re.search(r'\*\*URL:\*\* (.*)', part)
if title_match and url_match:
title = title_match.group(1).strip()
url = url_match.group(1).strip()
domain = domain_match.group(1).strip() if domain_match else ""
# Content starts after metadata
metadata_end = url_match.end()
content = part[metadata_end:].strip()
results.append({
'title': title,
'url': url,
'domain': domain,
'content': content
})
return results
async def search(self, query: str, topn: int = 4) -> str:
if not self.client:
return "Error: Search client not initialized"
try:
# Call the Gradio API
loop = asyncio.get_event_loop()
result_str = await loop.run_in_executor(
None,
lambda: self.client.predict(
query=query,
search_type="search",
num_results=topn,
api_name="/search_web"
)
)
results = self._parse_websearch_output(result_str)
if not results:
return f"No results found for: '{query}'"
# Populate pages and link_map
new_link_map = {}
lines = []
for i, r in enumerate(results):
title = r['title']
url = r['url']
domain = r['domain']
content = r['content']
# Create a snippet for the search result view
snippet = content[:200].replace('\n', ' ') + "..."
self.link_map[i] = {'url': url, 'title': title}
new_link_map[i] = {'url': url, 'title': title}
# Cache the full content
self.pages[url] = {
'url': url,
'title': title,
'text': content
}
link_text = f"【{i}†{title}†{domain}】" if domain else f"【{i}†{title}】"
lines.append(f"{link_text}")
lines.append(f" {snippet}")
lines.append("")
formatted_content = '\n'.join(lines)
pseudo_url = f"web-search://q={query}&ts={int(time.time())}"
cursor = self.current_cursor + 1
self.pages[pseudo_url] = {
'url': pseudo_url,
'title': f"Search Results: {query}",
'text': formatted_content,
'urls': {str(k): v['url'] for k, v in new_link_map.items()}
}
self.page_stack.append(pseudo_url)
header = f"Search Results: {query} ({pseudo_url})\n**viewing lines [0 - {len(formatted_content.split(chr(10)))-1}]**\n\n"
body = self._format_line_numbers(formatted_content)
return f"[{cursor}] {header}{body}"
except Exception as e:
return f"Error during search: {str(e)}"
async def open(self, id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, **kwargs) -> str:
target_url = None
if isinstance(id, str) and id.startswith("http"):
target_url = id
elif isinstance(id, int) and id >= 0:
info = self.link_map.get(id)
target_url = info['url'] if info else None
if not target_url:
return f"Error: Invalid link id '{id}'. Available: {list(self.link_map.keys())}"
elif cursor >= 0 and cursor < len(self.page_stack):
page_url = self.page_stack[cursor]
page = self.pages.get(page_url)
if page:
text = page['text']
lines = text.split('\n')
start = max(0, loc) if loc >= 0 else 0
end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines)
header = f"{page['title']} ({page['url']})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n"
body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start)
return f"[{cursor}] {header}{body}"
else:
return "Error: No valid target specified"
if not target_url:
return "Error: Could not determine target URL"
# Check if we already have the page content cached
if target_url in self.pages:
page = self.pages[target_url]
text = page['text']
lines = text.split('\n')
new_cursor = self.current_cursor + 1
self.page_stack.append(target_url)
start = max(0, loc) if loc >= 0 else 0
end = min(len(lines), start + num_lines) if num_lines > 0 else len(lines)
header = f"{page['title']} ({target_url})\n**viewing lines [{start} - {end-1}] of {len(lines)-1}**\n\n"
body = self._format_line_numbers('\n'.join(lines[start:end]), offset=start)
return f"[{new_cursor}] {header}{body}"
return f"Error: Content for {target_url} not found in search results. The current search API only provides content for pages returned in search results."
def find(self, pattern: str, cursor: int = -1) -> str:
if not self.page_stack:
return "Error: No page open"
page_url = self.page_stack[cursor] if cursor >= 0 and cursor < len(self.page_stack) else self.page_stack[-1]
page = self.pages.get(page_url)
if not page:
return "Error: Page not found"
text = page['text']
lines = text.split('\n')
matches = []
for i, line in enumerate(lines):
if str(pattern).lower() in line.lower():
start = max(0, i - 1)
end = min(len(lines), i + 3)
context = '\n'.join(f"L{j}: {lines[j]}" for j in range(start, end))
matches.append(f"# 【{len(matches)}†match at L{i}】\n{context}")
if len(matches) >= 10:
break
if not matches:
return f"No matches found for: '{pattern}'"
result_url = f"{page_url}/find?pattern={pattern}"
new_cursor = self.current_cursor + 1
result_content = '\n\n'.join(matches)
page_data = {
'url': result_url,
'title': f"Find results for: '{pattern}'",
'text': result_content,
'urls': {}
}
self.pages[result_url] = page_data
self.page_stack.append(result_url)
header = f"Find results for text: `{pattern}` in `{page['title']}`\n\n"
return f"[{new_cursor}] {header}{result_content}"
def get_cursor_url(self, cursor: int) -> Optional[str]:
if cursor >= 0 and cursor < len(self.page_stack):
return self.page_stack[cursor]
return None
# ============================================================
# Tokenizer Loading
# ============================================================
tokenizer = None
def load_tokenizer():
global tokenizer
if tokenizer is None:
# We use Nemotron as a proxy tokenizer for token counting
token_model = "OpenResearcher/Nemotron-3-Nano-30B-A3B"
print(f"Loading tokenizer: {token_model}")
try:
tokenizer = AutoTokenizer.from_pretrained(
token_model,
trust_remote_code=True
)
print("Tokenizer loaded successfully!")
except Exception as e:
print(f"Error loading tokenizer: {e}")
import traceback
traceback.print_exc()
raise
return tokenizer
# ============================================================
# Text Processing
# ============================================================
def extract_thinking(text: str) -> Tuple[Optional[str], str]:
reasoning_content = None
content = text
if '\1', result)
result = result.replace('\n\n', '
').replace('\n', '
')
if not result.startswith('
'): result = f'
{result}
' return result def render_thinking_streaming(text: str) -> str: """Render thinking content in streaming mode (visible, with animation).""" escaped = html.escape(text) return f'')
formatted_result = formatted_result.replace('\n', '
')
if not formatted_result.startswith('
{formatted_result}
' max_length = 5000 if len(result) > max_length: formatted_result = formatted_result[:max_length] + 'Please enter a question to begin.
" return if not BLABLADOR_API_KEY: yield """""" return # Load tokenizer for prompt formatting try: load_tokenizer() except Exception as e: yield f"Error loading tokenizer: {html.escape(str(e))}
" return browser = SimpleBrowser() tools = json.loads(TOOL_CONTENT) system_prompt = DEVELOPER_CONTENT + f"\n\nToday's date: {datetime.now().strftime('%Y-%m-%d')}" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ] stop_strings = ["\nGeneration Error: {html.escape(str(e))}
") yield ''.join(html_parts) return for stop_str in stop_strings: if stop_str in generated: generated = generated[:generated.find(stop_str)] reasoning, content = extract_thinking(generated) tool_call, clean_content = parse_tool_call(content) if reasoning: html_parts.append(render_thinking_collapsed(reasoning)) yield ''.join(html_parts) if tool_call: fn_name = tool_call.get("name", "unknown") args = tool_call.get("arguments", {}) html_parts.append(render_tool_call(fn_name, args, browser)) yield ''.join(html_parts) if clean_content.strip() and not tool_call: rendered = render_citations(clean_content, browser) html_parts.append(f'Error: {html.escape(str(e))}
{html.escape(tb)}I am OpenResearcher, a leading open-source Deep Research Agent, welcome to try!
Due to high traffic, if your submission has no response, please refresh the page and resubmit. Thank you!
Ask any question and I'll search the web to find answers