Spaces:
Sleeping
Sleeping
| # ========================= | |
| # Imports and Environment | |
| # ========================= | |
| import os | |
| import requests | |
| import subprocess | |
| import tempfile | |
| import base64 | |
| import io | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from typing import TypedDict, Annotated | |
| from huggingface_hub import list_models | |
| from langchain.tools import Tool | |
| from langchain_community.utilities import SerpAPIWrapper | |
| from langchain_core.messages import HumanMessage | |
| from langchain_huggingface import ChatHuggingFace | |
| from langchain_openai import ChatOpenAI | |
| import openai | |
| from pydub import AudioSegment | |
| import pandas as pd | |
| from PIL import Image | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_experimental.tools.python.tool import PythonREPLTool | |
| import uuid | |
| import pytesseract | |
| from urllib.parse import urlparse | |
| # Load environment variables | |
| print("Current working directory:", os.getcwd()) | |
| load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), ".env")) | |
| # ========================= | |
| # 1. Web Search Tools | |
| # ========================= | |
| def serp_search(query: str) -> str: | |
| """ | |
| Searches the web using SerpAPI and returns the top result snippet. | |
| Args: | |
| query (str): The search query. | |
| Returns: | |
| str: The top result snippet or an error message. | |
| """ | |
| try: | |
| search = SerpAPIWrapper() | |
| results = search.run(query) | |
| return results | |
| except Exception as e: | |
| return f"Search failed: {e}" | |
| serp_search_tool = Tool( | |
| name="serp_search_tool", | |
| func=serp_search, | |
| description="Searches the web using SerpAPI and returns the top result." | |
| ) | |
| # ========================= | |
| # 2. File Download/Handling Tools | |
| # ========================= | |
| # Note: File downloading is now handled in app.py via process_question_with_files() | |
| # This section is kept for reference but the download_file_tool is not exported | |
| def download_file(url: str, save_path: str) -> str: | |
| """ | |
| Downloads a file from a URL and saves it to the given path. | |
| Args: | |
| url (str): The URL from which to download the file. | |
| save_path (str): The local file path where the downloaded file will be saved. | |
| Returns: | |
| str: A message indicating the result of the download operation. | |
| """ | |
| try: | |
| # Reduced from 30 to 15 seconds | |
| response = requests.get(url, timeout=15) | |
| response.raise_for_status() | |
| with open(save_path, "wb") as f: | |
| f.write(response.content) | |
| return f"File downloaded to {save_path}" | |
| except Exception as e: | |
| return f"Failed to download: {e}" | |
| # download_file_tool is now used internally by process_question_with_files() in app.py | |
| # and is not exported as a standalone tool for the agent | |
| # ========================= | |
| # 3. Python Execution Tools | |
| # ========================= | |
| def RunPythonFileTool(file_path: str) -> str: | |
| """ | |
| Executes a Python script loaded from the specified path using the PythonInterpreterTool if available, otherwise subprocess. | |
| Args: | |
| file_path (str): The full path to the python (.py) file containing the Python code. | |
| Returns: | |
| str: The output produced by the code execution, or an error message if it fails. | |
| """ | |
| try: | |
| if not os.path.exists(file_path): | |
| return f"File not found: {file_path}" | |
| with open(file_path, "r") as f: | |
| code = f.read() | |
| try: | |
| from langchain.tools.python.tool import PythonInterpreterTool | |
| interpreter = PythonInterpreterTool() | |
| result = interpreter.run({"code": code}) | |
| return result.get("output", "No output returned.") | |
| except ImportError: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp: | |
| temp.write(code) | |
| temp_path = temp.name | |
| result = subprocess.run( | |
| ["python", temp_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=15 | |
| ) | |
| os.unlink(temp_path) | |
| if result.returncode == 0: | |
| return result.stdout.strip() or "No output returned." | |
| else: | |
| return f"Error: {result.stderr.strip()}" | |
| except subprocess.TimeoutExpired: | |
| return "Error: Code execution timed out" | |
| except Exception as e: | |
| return f"Execution failed: {e}" | |
| python_execution_tool = Tool( | |
| name="python_execution_tool", | |
| func=RunPythonFileTool, | |
| description="Executes Python code and returns the output. Use this when you need to run Python scripts or calculate values." | |
| ) | |
| # ========================= | |
| # 4. Text Utilities | |
| # ========================= | |
| def ReverseTextTool(text: str) -> str: | |
| """ | |
| Reverses the order of characters in a given text string. | |
| Args: | |
| text (str): The text to reverse. | |
| Returns: | |
| str: The reversed text or an error message. | |
| """ | |
| try: | |
| return text[::-1] | |
| except Exception as e: | |
| return f"Error reversing text: {str(e)}" | |
| reverse_text_tool = Tool( | |
| name="reverse_text_tool", | |
| func=ReverseTextTool, | |
| description="Reverses the order of characters in a given text string. Use this when you need to reverse text." | |
| ) | |
| # ========================= | |
| # 5. Audio, Video, and Image Tools | |
| # ========================= | |
| def process_audio(audio_file_path: str) -> str: | |
| """ | |
| Processes audio files to extract information and transcribe speech content. | |
| Args: | |
| audio_file_path (str): Path to the audio file. | |
| Returns: | |
| str: Transcription result or file info with error message. | |
| """ | |
| try: | |
| if not os.path.exists(audio_file_path): | |
| return f"Audio file not found: {audio_file_path}" | |
| file_extension = Path(audio_file_path).suffix.lower() | |
| # Check if it's an audio file we can process | |
| if file_extension not in ['.mp3', '.wav', '.m4a', '.flac', '.ogg']: | |
| file_size = os.path.getsize(audio_file_path) | |
| return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Unsupported audio format for transcription." | |
| # Try to transcribe the audio | |
| try: | |
| # Initialize OpenAI client | |
| client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Convert MP3 to WAV if needed (Whisper works better with WAV) | |
| if file_extension == '.mp3': | |
| audio = AudioSegment.from_mp3(audio_file_path) | |
| # Export as WAV to a temporary buffer | |
| wav_buffer = io.BytesIO() | |
| audio.export(wav_buffer, format="wav") | |
| wav_buffer.seek(0) | |
| # Use the WAV buffer for transcription | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=wav_buffer, | |
| response_format="text" | |
| ) | |
| else: | |
| # For other formats, try direct transcription | |
| with open(audio_file_path, "rb") as audio_file: | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="text" | |
| ) | |
| file_size = os.path.getsize(audio_file_path) | |
| return f"Transcription successful!\nFile: {audio_file_path}\nSize: {file_size} bytes\nType: {file_extension}\n\nTranscription:\n{transcription}" | |
| except openai.AuthenticationError: | |
| file_size = os.path.getsize(audio_file_path) | |
| return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. OpenAI API key not found or invalid. Please set OPENAI_API_KEY in your environment variables." | |
| except openai.BadRequestError as e: | |
| file_size = os.path.getsize(audio_file_path) | |
| return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Audio format not supported or file too large: {str(e)}" | |
| except Exception as e: | |
| file_size = os.path.getsize(audio_file_path) | |
| return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Transcription error: {str(e)}" | |
| except Exception as e: | |
| return f"Error processing audio: {str(e)}" | |
| audio_processing_tool = Tool( | |
| name="audio_processing_tool", | |
| func=process_audio, | |
| description="Transcribes audio files (MP3, WAV, M4A, FLAC, OGG) to text using speech recognition. Use this when you need to convert speech in audio files to text." | |
| ) | |
| def analyze_video(video_url: str) -> str: | |
| """ | |
| Analyzes video content from YouTube or other video URLs. | |
| Args: | |
| video_url (str): The video URL. | |
| Returns: | |
| str: Video analysis or an error message. | |
| """ | |
| try: | |
| if 'youtube.com' in video_url or 'youtu.be' in video_url: | |
| video_id = None | |
| if 'youtube.com/watch?v=' in video_url: | |
| video_id = video_url.split('watch?v=')[1].split('&')[0] | |
| elif 'youtu.be/' in video_url: | |
| video_id = video_url.split('youtu.be/')[1].split('?')[0] | |
| if video_id: | |
| search_result = serp_search( | |
| f"youtube video {video_id} title description") | |
| return f"Video analysis for {video_id}: {search_result}" | |
| else: | |
| return "Could not extract video ID from URL" | |
| else: | |
| return "Video analysis currently supports YouTube videos only" | |
| except Exception as e: | |
| return f"Error analyzing video: {str(e)}" | |
| video_analysis_tool = Tool( | |
| name="video_analysis_tool", | |
| func=analyze_video, | |
| description="Analyzes video content from URLs. Use this when questions involve video content or YouTube links." | |
| ) | |
| # ========================= | |
| # 6. Image Recognition Tools | |
| # ========================= | |
| def image_recognition(img_path: str) -> str: | |
| """ | |
| Analyzes and describes the content of images using AI vision. | |
| Args: | |
| img_path (str): Path to the image file. | |
| Returns: | |
| str: Description or extracted text, or an error message. | |
| """ | |
| try: | |
| if not os.path.exists(img_path): | |
| return f"Error: Image file not found at {img_path}" | |
| if not os.getenv("OPENAI_API_KEY"): | |
| return "OpenAI API key not found. Please set OPENAI_API_KEY in your environment variables." | |
| # Get image info first | |
| try: | |
| img = Image.open(img_path) | |
| image_info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" | |
| except Exception as e: | |
| image_info = f"Image info error: {str(e)}" | |
| # Try vision model | |
| try: | |
| vision_llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
| with open(img_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| {"type": "text", "text": "Describe what you see in this image in detail. If there's text, extract it. If it's a chess position, describe the board state and pieces."}, | |
| {"type": "image_url", "image_url": { | |
| "url": f"data:image/png;base64,{image_base64}"}}, | |
| ] | |
| ) | |
| ] | |
| response = vision_llm.invoke(message) | |
| vision_result = response.content.strip() | |
| # Check if we got a content policy response | |
| if "sorry" in vision_result.lower() and "can't assist" in vision_result.lower(): | |
| # Fallback to OCR | |
| try: | |
| import pytesseract | |
| text = pytesseract.image_to_string(img).strip() | |
| if text: | |
| return f"{image_info}\n\nOCR extracted text:\n{text}" | |
| else: | |
| return f"{image_info}\n\nVision model blocked. OCR found no text." | |
| except ImportError: | |
| return f"{image_info}\n\nVision model blocked. OCR not available." | |
| else: | |
| return f"{image_info}\n\nVision analysis:\n{vision_result}" | |
| except Exception as vision_error: | |
| # Fallback to OCR if vision fails | |
| try: | |
| import pytesseract | |
| text = pytesseract.image_to_string(img).strip() | |
| if text: | |
| return f"{image_info}\n\nVision failed, OCR extracted text:\n{text}" | |
| else: | |
| return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR found no text." | |
| except ImportError: | |
| return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR not available." | |
| except Exception as e: | |
| return f"Error analyzing image: {str(e)}" | |
| image_recognition_tool = Tool( | |
| name="image_recognition_tool", | |
| func=image_recognition, | |
| description="Analyzes and describes the content of images using AI vision. Use this when you need to understand what's in an image." | |
| ) | |
| # ========================= | |
| # 7. File Type Detection | |
| # ========================= | |
| def detect_file_type(file_path: str) -> str: | |
| """ | |
| Detects the type of file and provides appropriate handling suggestions. | |
| Args: | |
| file_path (str): Path to the file. | |
| Returns: | |
| str: File type info or an error message. | |
| """ | |
| try: | |
| if not os.path.exists(file_path): | |
| return f"File not found: {file_path}" | |
| file_extension = Path(file_path).suffix.lower() | |
| file_size = os.path.getsize(file_path) | |
| file_types = { | |
| '.py': 'Python script', | |
| '.mp3': 'Audio file', | |
| '.mp4': 'Video file', | |
| '.jpg': 'Image file', | |
| '.jpeg': 'Image file', | |
| '.png': 'Image file', | |
| '.txt': 'Text file', | |
| '.pdf': 'PDF document', | |
| '.doc': 'Word document', | |
| '.docx': 'Word document', | |
| '.xls': 'Excel spreadsheet', | |
| '.xlsx': 'Excel spreadsheet' | |
| } | |
| file_type = file_types.get(file_extension, 'Unknown file type') | |
| return f"File: {file_path}, Type: {file_type}, Size: {file_size} bytes" | |
| except Exception as e: | |
| return f"Error detecting file type: {str(e)}" | |
| file_type_detection_tool = Tool( | |
| name="file_type_detection_tool", | |
| func=detect_file_type, | |
| description="Detects file types and provides information about files. Use this when you need to understand what type of file you're working with." | |
| ) | |
| # ========================= | |
| # 8. Enhanced File Reading Tools | |
| # ========================= | |
| def read_file(file_name: str) -> str: | |
| """ | |
| Read and process different file types (text, CSV, images). | |
| """ | |
| if not file_name or not os.path.exists(file_name): | |
| return "File not found" | |
| try: | |
| file_extension = os.path.splitext(file_name)[1].lower() | |
| if file_extension == ".csv": | |
| return _read_csv_file(file_name) | |
| elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: | |
| return _read_image_file(file_name) | |
| elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: | |
| return _read_text_file(file_name) | |
| else: | |
| # Try to read as text file | |
| return _read_text_file(file_name) | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def _read_text_file(file_name: str) -> str: | |
| """Read a text file.""" | |
| try: | |
| with open(file_name, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| return content[:5000] # Limit to first 5000 characters | |
| except UnicodeDecodeError: | |
| # Try with different encoding | |
| try: | |
| with open(file_name, "r", encoding="latin-1") as f: | |
| content = f.read() | |
| return content[:5000] | |
| except Exception as e: | |
| return f"Text file reading error: {str(e)}" | |
| def _read_csv_file(file_name: str) -> str: | |
| """Read and summarize a CSV file.""" | |
| try: | |
| df = pd.read_csv(file_name) | |
| # Create a summary | |
| summary = [] | |
| summary.append( | |
| f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") | |
| summary.append(f"Columns: {', '.join(df.columns.tolist())}") | |
| # Show first few rows | |
| summary.append("\nFirst 5 rows:") | |
| summary.append(df.head().to_string()) | |
| # Show basic statistics for numeric columns | |
| numeric_columns = df.select_dtypes(include=['number']).columns | |
| if len(numeric_columns) > 0: | |
| summary.append(f"\nNumeric column statistics:") | |
| summary.append(df[numeric_columns].describe().to_string()) | |
| return "\n".join(summary) | |
| except Exception as e: | |
| return f"CSV reading error: {str(e)}" | |
| def _read_image_file(file_name: str) -> str: | |
| """Read and analyze an image file.""" | |
| try: | |
| # Try OCR first | |
| try: | |
| import pytesseract | |
| img = Image.open(file_name) | |
| # Get image info | |
| info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" | |
| # Try OCR | |
| text = pytesseract.image_to_string(img).strip() | |
| if text: | |
| return f"{info}\n\nExtracted text:\n{text}" | |
| else: | |
| return f"{info}\n\nNo text detected in image." | |
| except ImportError: | |
| # OCR not available, just return image info | |
| img = Image.open(file_name) | |
| return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" | |
| except Exception as e: | |
| return f"Image reading error: {str(e)}" | |
| read_file_tool = Tool( | |
| name="read_file_tool", | |
| func=read_file, | |
| description="Reads and processes different file types including text files, CSV files, and images. Use this when you need to extract content from files." | |
| ) | |
| # ========================= | |
| # 9. Code Execution and Math Tools | |
| # ========================= | |
| def execute_code(code: str, timeout: int = 5) -> str: | |
| """ | |
| Execute Python code safely with timeout. | |
| """ | |
| try: | |
| # Basic security check - prevent dangerous operations | |
| dangerous_keywords = [ | |
| "import os", "import subprocess", "__import__", "exec", "eval", "open("] | |
| if any(keyword in code.lower() for keyword in dangerous_keywords): | |
| return "Code execution blocked: potentially unsafe operations detected" | |
| result = subprocess.run( | |
| ["python3", "-c", code], | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| cwd="/tmp" # Run in safe directory | |
| ) | |
| if result.returncode == 0: | |
| return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" | |
| else: | |
| return f"Code execution error: {result.stderr.strip()}" | |
| except subprocess.TimeoutExpired: | |
| return "Code execution timeout" | |
| except Exception as e: | |
| return f"Code execution error: {str(e)}" | |
| def calculate_simple_math(expression: str) -> str: | |
| """ | |
| Safely evaluate simple mathematical expressions. | |
| """ | |
| try: | |
| # Only allow basic math characters | |
| allowed_chars = set("0123456789+-*/.() ") | |
| if not all(c in allowed_chars for c in expression): | |
| return "Invalid mathematical expression" | |
| # Use eval safely for basic math | |
| result = eval(expression) | |
| return str(result) | |
| except Exception as e: | |
| return f"Math calculation error: {str(e)}" | |
| code_execution_tool = Tool( | |
| name="code_execution_tool", | |
| func=execute_code, | |
| description="Executes Python code safely with timeout and security checks. Use this when you need to run small Python code snippets." | |
| ) | |
| math_calculation_tool = Tool( | |
| name="math_calculation_tool", | |
| func=calculate_simple_math, | |
| description="Safely evaluates simple mathematical expressions. Use this when you need to perform basic math calculations." | |
| ) | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for a query and return maximum 2 results.""" | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>' | |
| f"\n{doc.page_content}\n</Document>" | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| wiki_search_tool = Tool( | |
| name="wiki_search_tool", | |
| func=wiki_search, | |
| description="Search Wikipedia for a query and return up to 2 results. Use this for factual or historical questions." | |
| ) | |
| python_tool = PythonREPLTool() | |
| python_repl_tool = Tool( | |
| name="python_repl_tool", | |
| func=python_tool, | |
| description="Executes Python code in a REPL environment. Use this for running Python code snippets interactively." | |
| ) | |
| # --- New Tools --- | |
| def extract_text_from_image(file_path: str) -> str: | |
| try: | |
| image = Image.open(file_path) | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text}" | |
| except Exception as e: | |
| return f"Error extracting text from image: {str(e)}" | |
| extract_text_from_image_tool = Tool( | |
| name="extract_text_from_image_tool", | |
| func=extract_text_from_image, | |
| description="Extract text from an image using OCR (pytesseract)." | |
| ) | |
| def analyze_csv_file_simple(file_path: str) -> str: | |
| """Analyze a CSV file using pandas.""" | |
| try: | |
| df = pd.read_csv(file_path) | |
| result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" | |
| analyze_csv_file_tool = Tool( | |
| name="analyze_csv_file_tool", | |
| func=analyze_csv_file_simple, | |
| description="Analyze a CSV file using pandas and answer a question about it." | |
| ) | |
| def analyze_excel_file_simple(file_path: str) -> str: | |
| """Analyze an Excel file using pandas.""" | |
| try: | |
| df = pd.read_excel(file_path) | |
| result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {str(e)}" | |
| analyze_excel_file_tool = Tool( | |
| name="analyze_excel_file_tool", | |
| func=analyze_excel_file_simple, | |
| description="Analyze an Excel file using pandas and answer a question about it." | |
| ) | |
| # | |