Spaces:
Running
Running
| from agno.agent import RunEvent | |
| from agno.models.google import Gemini | |
| from agno.db.sqlite import SqliteDb | |
| from src.infra.logger import get_logger | |
| from src.agent.base import creat_agent, creat_team, get_context, UserState, Location | |
| logger = get_logger(__name__) | |
| class AgentManager: | |
| def __init__(self, user_state: UserState, models_dict: dict, tool_hooks_dict: dict, hooks_dict: dict): | |
| self.base_kwargs = { | |
| "additional_context": get_context(user_state), | |
| "timezone_identifier": user_state.utc_offset, | |
| "add_datetime_to_context": True, | |
| } | |
| self._planer_db = SqliteDb(db_file="tmp/agents.db") | |
| self._planer_session_state = {"task_list": None} | |
| self.planner_agent = creat_agent(name="planner", | |
| model=models_dict["planner"], | |
| add_session_state_to_context=True, | |
| db=self._planer_db, | |
| markdown=True, | |
| **self.base_kwargs) | |
| self._creat_agent_team( | |
| models=models_dict, | |
| tools=tool_hooks_dict, | |
| hooks=hooks_dict, | |
| ) | |
| def _creat_agent_team(self, | |
| models: dict[str: object], | |
| tools: dict[str: object], | |
| hooks: dict[str: object]): | |
| self._team_db = SqliteDb(db_file="tmp/team.db") | |
| self._team_session_state = { | |
| "scout_pois": {}, | |
| "optimized_route": {}, | |
| "traffic_data": [], | |
| "weather_forecasts": {}, | |
| "task_list": None, | |
| } | |
| agnent_members = ["scout", "optimizer", "traffic", "weather"] | |
| for name in agnent_members: | |
| _agent = creat_agent( | |
| name=name, | |
| model=models[name], | |
| tools=tools.get(name, []), | |
| tool_hooks=hooks.get(name, []), | |
| markdown=False, | |
| **self.base_kwargs | |
| ) | |
| setattr(self, f"{name}_agent", _agent) | |
| self.core_team = creat_team( | |
| name="team", | |
| model=models["team"], | |
| members=[getattr(self, f"{name}_agent") for name in agnent_members], | |
| tools=tools.get("team", []), | |
| tool_hooks=hooks.get("team", []), | |
| db=self._team_db, | |
| markdown=True, | |
| **self.base_kwargs | |
| ) | |
| def _planner_stream_handle(stream_item): | |
| show = True | |
| response = "" | |
| for chuck in stream_item: | |
| if chuck.event == RunEvent.run_content: | |
| content = chuck.content | |
| response += chuck.content | |
| if show: | |
| if "@@@" in response: | |
| show = False | |
| content = content.split("@@@")[0] | |
| print(content) | |
| json_data = "{" + response.split("{", maxsplit=1)[-1] | |
| return json_data, response | |
| def planner_message(self, message): | |
| planner_stream = self.planner_agent.run(f"help user to update the task_list, user's message: {message}", | |
| stream=True, stream_events=True, | |
| session_state=self._planer_session_state) | |
| self._planer_session_state["task_list"], _response = self._planner_stream_handle(planner_stream) | |
| def task_list(self): | |
| return self._planer_session_state["task_list"] | |
| def _core_team_stream_handle(stream_item): | |
| for event in stream_item: | |
| if event.event == "TeamRunContent": | |
| print(f"{event.content}", end="", flush=True) | |
| elif event.event == "TeamToolCallStarted": | |
| if event.tool.tool_name == "delegate_task_to_member": | |
| print(event.tool) | |
| # print(f"Supervisor began assigning tasks to member - {event.tool.tool_args['member_id']}...") | |
| else: | |
| print(f"Supervisor started using the tools: {event.tool.tool_name}") | |
| elif event.event == "TeamToolCallCompleted": | |
| if event.tool.tool_name == "delegate_task_to_member": | |
| print(f"{event.tool.tool_args['member_id']} has completed the task assigned by the supervisor...") | |
| # print(event.tool) | |
| else: | |
| print(f"Supervisor stop using tools:: {event.tool.tool_name}") | |
| elif event.event == "ToolCallStarted": | |
| print(f"{event.agent_id} Start using tools: {event.tool.tool_name}") | |
| elif event.event == "ToolCallCompleted": | |
| print(f"{event.agent_id} Stop using tools: {event.tool.tool_name}") | |
| elif event.event == "TeamReasoningStep": | |
| print(f"Supervisor is reasoning: {event.content}") | |
| def core_team_start(self): | |
| if not self.task_list: | |
| raise ValueError("Task list is empty, cannot start core team.") | |
| message = f""" | |
| Based on this structured task list, please coordinate with the team members to: | |
| 1. Use scout to find specific locations for each task | |
| 2. Use optimizer to optimize the route | |
| 3. Use weather to check conditions for tomorrow | |
| 4. Use traffic to calculate routes and travel times | |
| 5. Provide a comprehensive plan and route plan with all details | |
| "Once ALL scout tasks complete, IMMEDIATELY proceed to Step 3" | |
| "DO NOT wait for user input, the user info is already provided in the context" | |
| "You MUST delegate to Optimizer automatically" | |
| Here is the task list: | |
| {self.task_list} | |
| """ | |
| self._team_session_state["task_list"] = self.task_list | |
| team_stream = self.core_team.run(message, stream=True, stream_events=True, | |
| session_state=self._team_session_state,) | |
| self._core_team_stream_handle(team_stream) | |
| team_state = self.core_team.get_session_state() | |
| if __name__ == "__main__": | |
| from src.toolkits.googlemap_toolkit import GoogleMapsToolkit | |
| from src.toolkits.openweather_toolkit import OpenWeatherToolkit | |
| from src.toolkits.optimization_toolkit import OptimizationToolkit | |
| from src.infra.config import get_settings | |
| setting = get_settings() | |
| maps_kit = GoogleMapsToolkit(api_key=setting.google_maps_api_key) | |
| weather_kit = OpenWeatherToolkit(api_key=setting.openweather_api_key) | |
| opt_kit = OptimizationToolkit(api_key=setting.google_maps_api_key) | |
| state = UserState(location=Location(lat=25.058903, lng=121.549131)) | |
| settings = get_settings() | |
| model = Gemini( | |
| id="gemini-2.5-flash-lite", | |
| thinking_budget=512, | |
| api_key=settings.gemini_api_key) | |
| main_model = Gemini( | |
| id="gemini-2.5-flash", | |
| thinking_budget=1024, | |
| api_key=settings.gemini_api_key) | |
| models = {"planner": main_model, "team": main_model, | |
| "scout": model, "optimizer": model, | |
| "traffic": model, "weather": model} | |
| tools_dict = { | |
| "scout": [maps_kit.search_places], | |
| "optimizer": [opt_kit.solve_route_optimization], | |
| "traffic": [maps_kit.compute_routes], # 或 get_directions | |
| "weather": [weather_kit], | |
| "team": [] | |
| } | |
| am = AgentManager(user_state=state, | |
| models_dict=models, | |
| tool_hooks_dict=tools_dict, | |
| hooks_dict={}) | |
| am.planner_message("I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary.") | |
| print(f"\n[Planner Phase Complete]") | |
| print(f"Task List: {am.task_list}") | |
| #if am.task_list: | |
| # print("\n[Core Team Start]") | |
| # am.core_team_start() |