Spaces:
Build error
Build error
| import os | |
| import json | |
| import re | |
| import gradio as gr | |
| import requests | |
| from duckduckgo_search import DDGS | |
| from typing import List | |
| from pydantic import BaseModel, Field | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_core.documents import Document | |
| from huggingface_hub import InferenceClient | |
| import logging | |
| # Set up basic configuration for logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Environment variables and configurations | |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
| MODELS = [ | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "mistralai/Mistral-Nemo-Instruct-2407", | |
| "meta-llama/Meta-Llama-3.1-8B-Instruct", | |
| "meta-llama/Meta-Llama-3.1-70B-Instruct" | |
| ] | |
| MODEL_TOKEN_LIMITS = { | |
| "mistralai/Mistral-7B-Instruct-v0.3": 32768, | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1": 32768, | |
| "mistralai/Mistral-Nemo-Instruct-2407": 32768, | |
| "meta-llama/Meta-Llama-3.1-8B-Instruct": 8192, | |
| "meta-llama/Meta-Llama-3.1-70B-Instruct": 8192, | |
| } | |
| DEFAULT_SYSTEM_PROMPT = """You are a world-class financial AI assistant, capable of complex reasoning and reflection. | |
| Reason through the query inside <thinking> tags, and then provide your final response inside <output> tags. | |
| Providing comprehensive and accurate information based on web search results is essential. | |
| Your goal is to synthesize the given context into a coherent and detailed response that directly addresses the user's query. | |
| Please ensure that your response is well-structured, factual. | |
| If you detect that you made a mistake in your reasoning at any point, correct yourself inside <reflection> tags.""" | |
| def get_embeddings(): | |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/stsb-roberta-large") | |
| def duckduckgo_search(query): | |
| with DDGS() as ddgs: | |
| results = ddgs.text(query, max_results=5) | |
| return results | |
| class CitingSources(BaseModel): | |
| sources: List[str] = Field( | |
| ..., | |
| description="List of sources to cite. Should be an URL of the source." | |
| ) | |
| def chatbot_interface(message, history, model, temperature, num_calls, use_embeddings, system_prompt): | |
| if not message.strip(): | |
| return "", history | |
| history = history + [(message, "")] | |
| try: | |
| for response in respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): | |
| history[-1] = (message, response) | |
| yield history | |
| except gr.CancelledError: | |
| yield history | |
| except Exception as e: | |
| logging.error(f"Unexpected error in chatbot_interface: {str(e)}") | |
| history[-1] = (message, f"An unexpected error occurred: {str(e)}") | |
| yield history | |
| def retry_last_response(history, model, temperature, num_calls, use_embeddings, system_prompt): | |
| if not history: | |
| return history | |
| last_user_msg = history[-1][0] | |
| history = history[:-1] # Remove the last response | |
| return chatbot_interface(last_user_msg, history, model, temperature, num_calls, use_embeddings, system_prompt) | |
| def respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): | |
| logging.info(f"User Query: {message}") | |
| logging.info(f"Model Used: {model}") | |
| logging.info(f"Use Embeddings: {use_embeddings}") | |
| logging.info(f"System Prompt: {system_prompt}") | |
| try: | |
| for main_content, sources in get_response_with_search(message, model, num_calls=num_calls, temperature=temperature, use_embeddings=use_embeddings, system_prompt=system_prompt): | |
| response = f"{main_content}\n\n{sources}" | |
| first_line = response.split('\n')[0] if response else '' | |
| yield response | |
| except Exception as e: | |
| logging.error(f"Error with {model}: {str(e)}") | |
| yield f"An error occurred with the {model} model: {str(e)}. Please try again or select a different model." | |
| def create_web_search_vectors(search_results): | |
| embed = get_embeddings() | |
| documents = [] | |
| for result in search_results: | |
| if 'body' in result: | |
| content = f"{result['title']}\n{result['body']}\nSource: {result['href']}" | |
| documents.append(Document(page_content=content, metadata={"source": result['href']})) | |
| return FAISS.from_documents(documents, embed) | |
| def get_response_with_search(query, model, num_calls=3, temperature=0.2, use_embeddings=True, system_prompt=DEFAULT_SYSTEM_PROMPT): | |
| search_results = duckduckgo_search(query) | |
| if use_embeddings: | |
| web_search_database = create_web_search_vectors(search_results) | |
| if not web_search_database: | |
| yield "No web search results available. Please try again.", "" | |
| return | |
| retriever = web_search_database.as_retriever(search_kwargs={"k": 5}) | |
| relevant_docs = retriever.get_relevant_documents(query) | |
| context = "\n".join([doc.page_content for doc in relevant_docs]) | |
| else: | |
| context = "\n".join([f"{result['title']}\n{result['body']}\nSource: {result['href']}" for result in search_results]) | |
| prompt = f"""Using the following context from web search results: | |
| {context} | |
| Write a detailed and complete research document that fulfills the following user request: '{query}' | |
| After writing the document, please provide a list of sources with their URLs used in your response.""" | |
| # Use Hugging Face API | |
| client = InferenceClient(model, token=huggingface_token) | |
| # Calculate input tokens (this is an approximation, you might need a more accurate method) | |
| input_tokens = len(prompt.split()) // 4 | |
| # Get the token limit for the current model | |
| model_token_limit = MODEL_TOKEN_LIMITS.get(model, 8192) # Default to 8192 if model not found | |
| # Calculate max_new_tokens | |
| max_new_tokens = min(model_token_limit - input_tokens, 6500) # Cap at 4096 to be safe | |
| main_content = "" | |
| for i in range(num_calls): | |
| try: | |
| response = client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=max_new_tokens, | |
| temperature=temperature, | |
| stream=False, | |
| top_p=0.8, | |
| ) | |
| # Log the raw response for debugging | |
| logging.info(f"Raw API response: {response}") | |
| # Check if the response is a string (which might be an error message) | |
| if isinstance(response, str): | |
| logging.error(f"API returned an unexpected string response: {response}") | |
| yield f"An error occurred: {response}", "" | |
| return | |
| # If it's not a string, assume it's the expected object structure | |
| if hasattr(response, 'choices') and response.choices: | |
| for choice in response.choices: | |
| if hasattr(choice, 'message') and hasattr(choice.message, 'content'): | |
| chunk = choice.message.content | |
| main_content += chunk | |
| yield main_content, "" # Yield partial main content without sources | |
| else: | |
| logging.error(f"Unexpected response structure: {response}") | |
| yield "An unexpected error occurred. Please try again.", "" | |
| except Exception as e: | |
| logging.error(f"Error in API call: {str(e)}") | |
| yield f"An error occurred: {str(e)}", "" | |
| return | |
| def vote(data: gr.LikeData): | |
| if data.liked: | |
| print(f"You upvoted this response: {data.value}") | |
| else: | |
| print(f"You downvoted this response: {data.value}") | |
| css = """ | |
| /* Fine-tune chatbox size */ | |
| """ | |
| def initial_conversation(): | |
| return [ | |
| (None, "Welcome! I'm your AI assistant for web search. Here's how you can use me:\n\n" | |
| "1. Ask me any question, and I'll search the web for information.\n" | |
| "2. You can adjust the system prompt for fine-tuned responses, whether to use embeddings, and the temperature.\n" | |
| "To get started, ask me a question!") | |
| ] | |
| demo = gr.ChatInterface( | |
| respond, | |
| additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=True, render=False), | |
| additional_inputs=[ | |
| gr.Dropdown(choices=MODELS, label="Select Model", value=MODELS[3]), | |
| gr.Slider(minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Temperature"), | |
| gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Number of API Calls"), | |
| gr.Checkbox(label="Use Embeddings", value=False), | |
| gr.Textbox(label="System Prompt", lines=5, value=DEFAULT_SYSTEM_PROMPT), | |
| ], | |
| title="AI-powered Web Search Assistant", | |
| description="Ask questions and get answers from web search results.", | |
| theme=gr.Theme.from_hub("allenai/gradio-theme"), | |
| css=css, | |
| examples=[ | |
| ["What are the latest developments in artificial intelligence?"], | |
| ["Can you explain the basics of quantum computing?"], | |
| ["What are the current global economic trends?"] | |
| ], | |
| cache_examples=False, | |
| analytics_enabled=False, | |
| textbox=gr.Textbox(placeholder="Ask a question", container=False, scale=7), | |
| chatbot = gr.Chatbot( | |
| show_copy_button=True, | |
| likeable=True, | |
| layout="bubble", | |
| height=400, | |
| value=initial_conversation() | |
| ) | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |