Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from typing import Dict, List, Set | |
| import os | |
| import logging | |
| import inspect | |
| from huggingface_hub import login | |
| from smolagents import ToolCollection, CodeAgent, Tool, HfApiModel | |
| from dotenv import load_dotenv | |
| #local setup (load token when not on spaces) | |
| # load_dotenv(verbose=True) | |
| # login(token=os.getenv('HF_TOKEN')) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logging.warning('Starting application...') | |
| # Global variables | |
| publishtoken = None | |
| tool_Collections: dict[str, ToolCollection] = {} | |
| loaded_tools: Set[Tool] = set() | |
| # API keys for litellm providers | |
| litellm_api_keys = { | |
| 'xai': '', | |
| 'HF': '', | |
| 'grok': '', | |
| 'anthropic': '', | |
| 'openAI': '', | |
| } | |
| def all_tools_names() -> List[str]: | |
| """Return a list of all tool names from loaded collections.""" | |
| all_tools = [] | |
| for collection in tool_Collections.values(): | |
| if isinstance(collection, ToolCollection): | |
| all_tools.extend(tool.name for tool in collection.tools) | |
| else: | |
| all_tools.extend(collection) | |
| return all_tools | |
| def filter_tools(tools): | |
| """Filter out base tools from a list of tools.""" | |
| if tools is None: | |
| logging.warning("Received None for tools, defaulting to an empty list.") | |
| tools = [] | |
| base_tools_names = ['web search'] | |
| return [tool for tool in tools if tool not in base_tools_names] | |
| def load_collection_from_space(agent: CodeAgent, collection_slug: str) -> List[str]: | |
| """Load a collection of tools from a Hugging Face space.""" | |
| if collection_slug not in tool_Collections: | |
| tool_collection = ToolCollection( | |
| collection_slug=collection_slug, | |
| trust_remote_code=True | |
| ) | |
| tool_Collections[collection_slug] = [tool.name for tool in tool_collection.tools] | |
| for tool in tool_collection.tools: | |
| if agent.tools.get(tool.name) is None: | |
| agent.tools[tool.name] = tool | |
| loaded_tools.add(tool) | |
| else: | |
| agent.tools[tool.name] = tool | |
| return all_tools_names() | |
| def createAgent() -> CodeAgent: | |
| """Create and return a CodeAgent instance.""" | |
| agent = CodeAgent( | |
| tools=filter_tools(list(loaded_tools)), | |
| model=HfApiModel(), | |
| additional_authorized_imports=["smolagents", "subprocess", "typing", "os", "inspect", "open", "requests"], | |
| add_base_tools=True, | |
| planning_interval=None, | |
| ) | |
| # Add base tools to the collection | |
| for tool in agent.tools: | |
| if tool not in loaded_tools: | |
| if "base tools" not in tool_Collections: | |
| tool_Collections["base tools"] = [] | |
| tool_Collections["base tools"].append(tool) | |
| return agent | |
| # Initialize the agent | |
| agent = createAgent() | |
| def dropdown_update_choices(choices): | |
| return gr.update(choices=choices, value=None) | |
| def process_logs(agent): | |
| logs = "" | |
| if hasattr(agent, 'logs'): | |
| for entry in agent.logs: | |
| if hasattr(entry, 'llm_output'): | |
| logs += str(entry.llm_output) + "\n" | |
| return logs | |
| return "No logs available." | |
| def get_tools(): | |
| return [{"name": tool.name, "description": tool.description} for tool in agent.tools.values()] | |
| def get_functions(): | |
| return agent.python_executor.custom_tools | |
| def get_function_code(selected_function_name): | |
| func = get_functions().get(selected_function_name) | |
| if func: | |
| try: | |
| return inspect.getsource(func) | |
| except OSError: | |
| return "Source code not available." | |
| return "Function not found." | |
| def get_tool_description(selected_tool_name): | |
| tools = get_tools() | |
| print("Selected tool name:", selected_tool_name) | |
| print("Tools:",tools ) | |
| for tool in tools: | |
| if tool["name"] == selected_tool_name: | |
| return tool["description"] | |
| return "No description available." | |
| def refresh_ui_elements(): | |
| print("Refreshing UI elements...") | |
| updated_tools = get_tools() | |
| updated_functions = get_functions() | |
| tool_names = [tool["name"] for tool in updated_tools] | |
| print("Tool names:", tool_names) | |
| function_names = list(updated_functions.keys()) | |
| current_tool = tool_names[0] if tool_names else None | |
| current_function = function_names[0] if function_names else None | |
| tool_description = get_tool_description(current_tool) | |
| function_code = get_function_code(current_function) if current_function else "" | |
| tool_dropdown_update = dropdown_update_choices(tool_names) | |
| function_dropdown_update = dropdown_update_choices(function_names) | |
| return tool_dropdown_update, function_dropdown_update, tool_description, function_code | |
| def update_agent(collection_slug: str): | |
| load_collection_from_space(agent, collection_slug=collection_slug) | |
| return refresh_ui_elements() | |
| def respond(message, console_output, chat_history): | |
| try: | |
| bot_message = agent.run(message) | |
| new_console_output = process_logs(agent) | |
| chat_history.extend([ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": bot_message} | |
| ]) | |
| updated_console = console_output + f"\nQuery: {message}\nLogs: {new_console_output}" | |
| ui_updates = refresh_ui_elements() | |
| return "", updated_console, chat_history, *ui_updates | |
| except Exception as e: | |
| logging.error(f"Error in respond function: {e}") | |
| return f"An error occurred: {str(e)}", console_output, chat_history, None, None, None, None | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Tab("Chat"): | |
| gr.Markdown("<center><h1>smolAgent Chat</h1></center>") | |
| chatbot = gr.Chatbot(type="messages") | |
| msg = gr.Textbox() | |
| clear = gr.ClearButton([msg, chatbot]) | |
| with gr.Tab("Console"): | |
| outputbox = gr.Textbox(lines=25, scale=1, interactive=False) | |
| with gr.Tab("Config"): | |
| gr.Markdown("## Configure litellm API Keys") | |
| api_key_inputs = { | |
| provider: gr.Textbox( | |
| label=f'{provider} API Key', | |
| placeholder='Enter key', | |
| type='password', | |
| value=litellm_api_keys[provider] | |
| ) for provider in litellm_api_keys | |
| } | |
| with gr.Column(): | |
| gr.Markdown("<center><h1>Tool Collection</h1></center>") | |
| tools = get_tools() | |
| tool_dropdown = gr.Dropdown( | |
| show_label=False, | |
| choices=[tool["name"] for tool in tools], | |
| value=tools[0]["name"] if tools else None, | |
| type="value", | |
| allow_custom_value=False, | |
| scale=3 | |
| ) | |
| description_textbox = gr.Textbox( | |
| label="Tool Description", | |
| value=get_tool_description(tool_dropdown.value), | |
| interactive=False, | |
| ) | |
| slug = gr.Textbox(label="Collection Slug", value="Mightypeacock/agent-tools-6777c9699c231b7a1e87fa31") | |
| greet_btn = gr.Button("Load") | |
| gr.Markdown("<center><h2>Functions</h2></center>") | |
| functions = get_functions() | |
| function_dropdown = gr.Dropdown( | |
| label="Select Function", | |
| choices=list(functions.keys()), | |
| value=None if not functions.keys() else list(functions.keys())[0], | |
| type="value", | |
| ) | |
| code = gr.Code(label="Function Code", language="python") | |
| tool_dropdown.change( | |
| fn=get_tool_description, | |
| inputs=[tool_dropdown], | |
| outputs=description_textbox, | |
| ) | |
| function_dropdown.change( | |
| fn=get_function_code, | |
| inputs=function_dropdown, | |
| outputs=code, | |
| ) | |
| greet_btn.click( | |
| fn=update_agent, | |
| inputs=slug, | |
| outputs=[tool_dropdown, function_dropdown, description_textbox, code], | |
| api_name="load_HF_Collection" | |
| ) | |
| msg.submit( | |
| respond, | |
| inputs=[msg, outputbox, chatbot], | |
| outputs=[msg, outputbox, chatbot, tool_dropdown, function_dropdown, description_textbox, code] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(show_error=True, debug=True) |