thinhbtt's picture
Update app.py
b48ac2a verified
raw
history blame
9.55 kB
import os
import gradio as gr
import requests
import pandas as pd
import time
from typing import Optional, List, Dict
# Optional: import openai (pip install openai)
import openai
# Constants
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Default model - you can change to "gpt-4o" or "gpt-4.1" if available
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") # or "gpt-4.1"
if not OPENAI_API_KEY:
print("WARNING: OPENAI_API_KEY not set. Set it in Space secrets before running.")
openai.api_key = OPENAI_API_KEY
# -----------------------------
# Agent implementation (OpenAI-based)
# -----------------------------
class OpenAIAgent:
"""
Minimal agent that uses OpenAI chat completion to answer each question.
It is tuned to return *only* the final answer (no extra commentary) so
that it matches the EXACT-MATCH submission requirement.
"""
def __init__(self, model: str = OPENAI_MODEL, temperature: float = 0.0):
self.model = model
self.temperature = temperature
def _build_prompt_messages(self, question_text: str, file_summaries: Optional[List[str]] = None) -> List[Dict]:
"""
Build messages for chat completion. We instruct the model to output
the answer ONLY (single-line), nothing else. No 'Final Answer' phrase.
"""
system = (
"You are an assistant that MUST produce a single concise answer only. "
"When asked a question, respond with the exact answer text only β€” nothing else. "
"Do NOT include explanation, reasoning steps, or any extra punctuation beyond the answer. "
"If the question requires a short phrase or number, output that. "
"If you do not know, output 'I don't know'."
)
user_parts = [f"Question: {question_text}"]
if file_summaries:
# attach file summaries if provided
user_parts.append("File summaries (use these to answer):")
user_parts.extend(file_summaries)
user = "\n".join(user_parts)
return [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
def _call_openai(self, messages: List[Dict], max_tokens: int = 60) -> str:
"""
Call OpenAI ChatCompletion API (supports gpt-4o / gpt-4.1). Return assistant text.
"""
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY not set in environment.")
try:
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=max_tokens,
top_p=1.0,
n=1,
)
# Extract text (handles typical response structure)
text = ""
# openai returns choices list with message
choices = response.get("choices", [])
if choices and "message" in choices[0]:
text = choices[0]["message"].get("content", "")
else:
# fallback for older/newer SDK response shapes
text = response["choices"][0]["text"]
# trim
return text.strip()
except Exception as e:
# bubble up informative exception for logging
raise RuntimeError(f"OpenAI API error: {e}")
def summarize_file(self, file_url: str) -> Optional[str]:
"""
Simple downloader + summarizer placeholder.
For text files, fetch content and truncate. For images or other binary files,
just return a placeholder note (could be extended).
"""
try:
r = requests.get(file_url, timeout=10)
r.raise_for_status()
content_type = r.headers.get("Content-Type", "")
if "text" in content_type or file_url.lower().endswith((".txt", ".md", ".csv")):
text = r.text
# keep first 1000 chars to avoid huge prompts
summary = text[:1000].replace("\n", " ")
return f"[file content preview] {summary}"
else:
# For non-text file, just inform the model of the file name
return f"[file] downloaded from {file_url} (type: {content_type})"
except Exception as e:
print(f"Warning: Unable to fetch or summarize file {file_url}: {e}")
return None
def answer(self, question_text: str, files: Optional[List[str]] = None) -> str:
"""
Main entry: prepare prompt, call model, and return answer string.
Ensures we strip quotes/newlines to produce a concise single-line answer.
"""
file_summaries = []
if files:
for furl in files:
s = self.summarize_file(furl)
if s:
file_summaries.append(s)
messages = self._build_prompt_messages(question_text, file_summaries if file_summaries else None)
raw = self._call_openai(messages, max_tokens=80)
# Post-process: keep single-line, strip surrounding quotes, remove trailing punctuation if it's just noise
ans = " ".join(raw.splitlines()).strip()
# remove wrapping quotes
if (ans.startswith('"') and ans.endswith('"')) or (ans.startswith("'") and ans.endswith("'")):
ans = ans[1:-1].strip()
# final safety: if empty, return "I don't know"
if not ans:
ans = "I don't know"
return ans
# -----------------------------
# Runner / UI glue (kept similar to original)
# -----------------------------
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the OpenAIAgent on them, submits all answers,
and returns status string and results DataFrame.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
else:
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# instantiate agent
try:
agent = OpenAIAgent()
except Exception as e:
return f"Error initializing agent: {e}", None
# agent_code repo URL
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else ""
# fetch questions
try:
resp = requests.get(questions_url, timeout=15)
resp.raise_for_status()
questions_data = resp.json()
except Exception as e:
return f"Error fetching questions: {e}", None
if not questions_data:
return "No questions returned from server.", None
results_log = []
answers_payload = []
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
files = item.get("files") or []
if not task_id or question_text is None:
continue
try:
ans = agent.answer(question_text, files)
except Exception as e:
ans = "I don't know"
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
else:
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": ans})
answers_payload.append({"task_id": task_id, "submitted_answer": ans})
# small sleep to avoid rate limits
time.sleep(0.5)
if not answers_payload:
return "Agent produced no answers.", pd.DataFrame(results_log)
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
# submit
try:
r = requests.post(submit_url, json=submission_data, timeout=60)
r.raise_for_status()
result_data = r.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
return final_status, pd.DataFrame(results_log)
except requests.exceptions.HTTPError as e:
try:
text = e.response.text
except:
text = str(e)
return f"Submission failed: {text}", pd.DataFrame(results_log)
except Exception as e:
return f"Submission failed: {e}", pd.DataFrame(results_log)
# -----------------------------
# Build Gradio Interface
# -----------------------------
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner (OpenAI-based)")
gr.Markdown(
"""
Instructions:
1. Add your OpenAI key as a secret named `OPENAI_API_KEY` in this Space.
2. Ensure requirements.txt contains `openai`.
3. Login, then click 'Run Evaluation & Submit All Answers'.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
demo.launch(debug=True, share=False)