|
|
import os |
|
|
import re |
|
|
import json |
|
|
import requests |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from typing import Optional, Union, Dict, Any, List |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
class SimpleAgent: |
|
|
"""Simple agent with tool capabilities""" |
|
|
|
|
|
def __init__(self, llm): |
|
|
self.llm = llm |
|
|
self.tools = { |
|
|
'search_web': self.search_web, |
|
|
'search_wikipedia': self.search_wikipedia, |
|
|
'execute_python': self.execute_python, |
|
|
'read_excel_file': self.read_excel_file, |
|
|
'read_text_file': self.read_text_file, |
|
|
} |
|
|
|
|
|
def search_web(self, query: str) -> str: |
|
|
"""Search the web using DuckDuckGo for current information.""" |
|
|
try: |
|
|
search_url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1" |
|
|
response = requests.get(search_url, timeout=10) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
results = [] |
|
|
if data.get("AbstractText"): |
|
|
results.append(f"Abstract: {data['AbstractText']}") |
|
|
|
|
|
if data.get("RelatedTopics"): |
|
|
for topic in data["RelatedTopics"][:3]: |
|
|
if isinstance(topic, dict) and topic.get("Text"): |
|
|
results.append(f"Related: {topic['Text']}") |
|
|
|
|
|
if results: |
|
|
return "\n".join(results) |
|
|
else: |
|
|
return f"Search performed for '{query}' but no specific results found." |
|
|
else: |
|
|
return f"Search failed with status code {response.status_code}" |
|
|
except Exception as e: |
|
|
return f"Search error: {str(e)}" |
|
|
|
|
|
def search_wikipedia(self, query: str) -> str: |
|
|
"""Search Wikipedia for factual information.""" |
|
|
try: |
|
|
search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + query.replace(" ", "_") |
|
|
response = requests.get(search_url, timeout=10) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
extract = data.get("extract", "") |
|
|
if extract: |
|
|
return f"Wikipedia: {extract[:500]}..." |
|
|
else: |
|
|
return f"Wikipedia page found for '{query}' but no extract available." |
|
|
else: |
|
|
return f"Wikipedia search failed for '{query}'" |
|
|
except Exception as e: |
|
|
return f"Wikipedia search error: {str(e)}" |
|
|
|
|
|
def execute_python(self, code: str) -> str: |
|
|
"""Execute Python code and return the result.""" |
|
|
try: |
|
|
import io |
|
|
import sys |
|
|
|
|
|
safe_globals = { |
|
|
'__builtins__': { |
|
|
'print': print, 'len': len, 'str': str, 'int': int, 'float': float, |
|
|
'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, |
|
|
'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs, |
|
|
'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, |
|
|
}, |
|
|
'math': __import__('math'), |
|
|
'json': __import__('json'), |
|
|
} |
|
|
|
|
|
old_stdout = sys.stdout |
|
|
sys.stdout = mystdout = io.StringIO() |
|
|
|
|
|
try: |
|
|
exec(code, safe_globals) |
|
|
output = mystdout.getvalue() |
|
|
finally: |
|
|
sys.stdout = old_stdout |
|
|
|
|
|
return output if output else "Code executed successfully (no output)" |
|
|
except Exception as e: |
|
|
return f"Python execution error: {str(e)}" |
|
|
|
|
|
def read_excel_file(self, file_path: str, sheet_name: Optional[str] = None) -> str: |
|
|
"""Read an Excel file and return its contents.""" |
|
|
try: |
|
|
file_path_obj = Path(file_path) |
|
|
if not file_path_obj.exists(): |
|
|
return f"Error: File not found at {file_path}" |
|
|
|
|
|
if sheet_name and sheet_name.isdigit(): |
|
|
sheet_name = int(sheet_name) |
|
|
elif sheet_name is None: |
|
|
sheet_name = 0 |
|
|
|
|
|
df = pd.read_excel(file_path, sheet_name=sheet_name) |
|
|
|
|
|
if len(df) > 20: |
|
|
result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" |
|
|
result += "First 10 rows:\n" + df.head(10).to_string(index=False) |
|
|
result += f"\n\n... ({len(df) - 20} rows omitted) ...\n\n" |
|
|
result += "Last 10 rows:\n" + df.tail(10).to_string(index=False) |
|
|
else: |
|
|
result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" |
|
|
result += df.to_string(index=False) |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error reading Excel file: {str(e)}" |
|
|
|
|
|
def read_text_file(self, file_path: str) -> str: |
|
|
"""Read a text file and return its contents.""" |
|
|
try: |
|
|
file_path_obj = Path(file_path) |
|
|
if not file_path_obj.exists(): |
|
|
return f"Error: File not found at {file_path}" |
|
|
|
|
|
encodings = ['utf-8', 'utf-16', 'iso-8859-1', 'cp1252'] |
|
|
|
|
|
for encoding in encodings: |
|
|
try: |
|
|
with open(file_path_obj, 'r', encoding=encoding) as f: |
|
|
content = f.read() |
|
|
return f"File content ({encoding} encoding):\n\n{content}" |
|
|
except UnicodeDecodeError: |
|
|
continue |
|
|
|
|
|
return f"Error: Could not decode file with any standard encoding" |
|
|
except Exception as e: |
|
|
return f"Error reading file: {str(e)}" |
|
|
|
|
|
def run(self, question: str) -> str: |
|
|
"""Run the agent with tool usage""" |
|
|
|
|
|
direct_response = self.llm(f""" |
|
|
Question: {question} |
|
|
Think step by step. If this question requires: |
|
|
- Web search for current information, say "NEED_SEARCH: <search query>" |
|
|
- Mathematical calculation, say "NEED_PYTHON: <python code>" |
|
|
- Wikipedia lookup, say "NEED_WIKI: <search term>" |
|
|
- File analysis (if file path mentioned), say "NEED_FILE: <file_path>" |
|
|
Otherwise, provide a direct answer. |
|
|
Your response:""") |
|
|
|
|
|
|
|
|
if "NEED_SEARCH:" in direct_response: |
|
|
search_query = direct_response.split("NEED_SEARCH:")[1].strip() |
|
|
search_result = self.search_web(search_query) |
|
|
return self.llm(f"Question: {question}\n\nSearch results: {search_result}\n\nFinal answer:") |
|
|
|
|
|
elif "NEED_PYTHON:" in direct_response: |
|
|
code = direct_response.split("NEED_PYTHON:")[1].strip() |
|
|
exec_result = self.execute_python(code) |
|
|
return self.llm(f"Question: {question}\n\nCalculation result: {exec_result}\n\nFinal answer:") |
|
|
|
|
|
elif "NEED_WIKI:" in direct_response: |
|
|
wiki_query = direct_response.split("NEED_WIKI:")[1].strip() |
|
|
wiki_result = self.search_wikipedia(wiki_query) |
|
|
return self.llm(f"Question: {question}\n\nWikipedia info: {wiki_result}\n\nFinal answer:") |
|
|
|
|
|
elif "NEED_FILE:" in direct_response: |
|
|
file_path = direct_response.split("NEED_FILE:")[1].strip() |
|
|
if file_path.endswith(('.xlsx', '.xls')): |
|
|
file_content = self.read_excel_file(file_path) |
|
|
else: |
|
|
file_content = self.read_text_file(file_path) |
|
|
return self.llm(f"Question: {question}\n\nFile content: {file_content}\n\nFinal answer:") |
|
|
|
|
|
else: |
|
|
return direct_response |
|
|
class OpenRouterLLM: |
|
|
"""Simple OpenRouter LLM wrapper""" |
|
|
|
|
|
def __init__(self, model: str = "deepseek/deepseek-v3.1-terminus"): |
|
|
self.api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("my_key") |
|
|
self.model = model |
|
|
self.base_url = "https://openrouter.ai/api/v1/chat/completions" |
|
|
|
|
|
def __call__(self, prompt: str, max_tokens: int = 1500, temperature: float = 0.1) -> str: |
|
|
"""Make API call to OpenRouter""" |
|
|
|
|
|
if not self.api_key or not self.api_key.startswith('sk-or-v1-'): |
|
|
return "Error: Invalid OpenRouter API key" |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {self.api_key}", |
|
|
"Content-Type": "application/json", |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"model": self.model, |
|
|
"messages": [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "You are a helpful AI assistant. Provide direct, accurate answers. For GAIA evaluation, be precise and concise." |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
} |
|
|
], |
|
|
"temperature": temperature, |
|
|
"max_tokens": max_tokens, |
|
|
} |
|
|
|
|
|
try: |
|
|
response = requests.post(self.base_url, headers=headers, json=payload, timeout=30) |
|
|
|
|
|
if response.status_code != 200: |
|
|
return f"API Error: {response.status_code}" |
|
|
|
|
|
result = response.json() |
|
|
|
|
|
if "choices" in result and len(result["choices"]) > 0: |
|
|
answer = result["choices"][0]["message"]["content"].strip() |
|
|
return self._clean_answer(answer) |
|
|
else: |
|
|
return "Error: No response content received" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def _clean_answer(self, answer: str) -> str: |
|
|
"""Clean the answer for GAIA evaluation""" |
|
|
answer = answer.strip() |
|
|
|
|
|
|
|
|
prefixes = [ |
|
|
"Answer:", "The answer is:", "Final answer:", "Result:", |
|
|
"Solution:", "Based on", "Therefore", "In conclusion" |
|
|
] |
|
|
|
|
|
for prefix in prefixes: |
|
|
if answer.lower().startswith(prefix.lower()): |
|
|
answer = answer[len(prefix):].strip() |
|
|
if answer.startswith(':'): |
|
|
answer = answer[1:].strip() |
|
|
break |
|
|
|
|
|
|
|
|
if len(answer.split()) <= 3: |
|
|
answer = answer.strip('"\'.') |
|
|
|
|
|
return answer |
|
|
|
|
|
|
|
|
class GaiaAgent: |
|
|
"""Simple tool-based agent for GAIA tasks""" |
|
|
|
|
|
def __init__(self): |
|
|
print("Initializing GaiaAgent with OpenRouter DeepSeek...") |
|
|
|
|
|
|
|
|
self.llm = OpenRouterLLM(model="deepseek/deepseek-v3.1-terminus") |
|
|
|
|
|
|
|
|
self.agent = SimpleAgent(self.llm) |
|
|
|
|
|
print("GaiaAgent initialized successfully!") |
|
|
|
|
|
def __call__(self, task_id: str, question: str) -> str: |
|
|
"""Process a question and return the answer""" |
|
|
try: |
|
|
print(f"Processing task {task_id}: {question[:100]}...") |
|
|
|
|
|
|
|
|
enhanced_question = self._enhance_question_with_file_analysis(question) |
|
|
|
|
|
|
|
|
answer = self.agent.run(enhanced_question) |
|
|
|
|
|
|
|
|
clean_answer = self._clean_final_answer(answer) |
|
|
|
|
|
print(f"Agent answer for {task_id}: {clean_answer}") |
|
|
return clean_answer |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Agent error: {str(e)}" |
|
|
print(f"Error processing task {task_id}: {error_msg}") |
|
|
return error_msg |
|
|
|
|
|
def _enhance_question_with_file_analysis(self, question: str) -> str: |
|
|
"""Check if question mentions files and enhance accordingly""" |
|
|
|
|
|
file_patterns = [ |
|
|
r'/tmp/gaia_cached_files/[^\s]+', |
|
|
r'saved locally at:\s*([^\s]+)', |
|
|
r'file.*?\.xlsx?', |
|
|
r'file.*?\.csv', |
|
|
r'file.*?\.txt' |
|
|
] |
|
|
|
|
|
for pattern in file_patterns: |
|
|
matches = re.findall(pattern, question, re.IGNORECASE) |
|
|
if matches: |
|
|
|
|
|
break |
|
|
|
|
|
return question |
|
|
|
|
|
def _clean_final_answer(self, answer: str) -> str: |
|
|
"""Final cleaning of the answer""" |
|
|
answer = answer.strip() |
|
|
|
|
|
|
|
|
if "final answer:" in answer.lower(): |
|
|
parts = answer.lower().split("final answer:") |
|
|
if len(parts) > 1: |
|
|
answer = answer.split(":")[-1].strip() |
|
|
|
|
|
|
|
|
cleanup_phrases = [ |
|
|
"based on the", "according to", "the answer is", "therefore", |
|
|
"in conclusion", "as a result", "so the answer is" |
|
|
] |
|
|
|
|
|
for phrase in cleanup_phrases: |
|
|
if answer.lower().startswith(phrase): |
|
|
answer = answer[len(phrase):].strip() |
|
|
break |
|
|
|
|
|
|
|
|
answer = answer.strip('.,;:"\'') |
|
|
|
|
|
return answer |