Spaces:
Running
Running
File size: 8,026 Bytes
b7d08cf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
from agno.agent import RunEvent
from agno.models.google import Gemini
from agno.db.sqlite import SqliteDb
from src.infra.logger import get_logger
from src.agent.base import creat_agent, creat_team, get_context, UserState, Location
logger = get_logger(__name__)
class AgentManager:
def __init__(self, user_state: UserState, models_dict: dict, tool_hooks_dict: dict, hooks_dict: dict):
self.base_kwargs = {
"additional_context": get_context(user_state),
"timezone_identifier": user_state.utc_offset,
"add_datetime_to_context": True,
}
self._planer_db = SqliteDb(db_file="tmp/agents.db")
self._planer_session_state = {"task_list": None}
self.planner_agent = creat_agent(name="planner",
model=models_dict["planner"],
add_session_state_to_context=True,
db=self._planer_db,
markdown=True,
**self.base_kwargs)
self._creat_agent_team(
models=models_dict,
tools=tool_hooks_dict,
hooks=hooks_dict,
)
def _creat_agent_team(self,
models: dict[str: object],
tools: dict[str: object],
hooks: dict[str: object]):
self._team_db = SqliteDb(db_file="tmp/team.db")
self._team_session_state = {
"scout_pois": {},
"optimized_route": {},
"traffic_data": [],
"weather_forecasts": {},
"task_list": None,
}
agnent_members = ["scout", "optimizer", "traffic", "weather"]
for name in agnent_members:
_agent = creat_agent(
name=name,
model=models[name],
tools=tools.get(name, []),
tool_hooks=hooks.get(name, []),
markdown=False,
**self.base_kwargs
)
setattr(self, f"{name}_agent", _agent)
self.core_team = creat_team(
name="team",
model=models["team"],
members=[getattr(self, f"{name}_agent") for name in agnent_members],
tools=tools.get("team", []),
tool_hooks=hooks.get("team", []),
db=self._team_db,
markdown=True,
**self.base_kwargs
)
@staticmethod
def _planner_stream_handle(stream_item):
show = True
response = ""
for chuck in stream_item:
if chuck.event == RunEvent.run_content:
content = chuck.content
response += chuck.content
if show:
if "@@@" in response:
show = False
content = content.split("@@@")[0]
print(content)
json_data = "{" + response.split("{", maxsplit=1)[-1]
return json_data, response
def planner_message(self, message):
planner_stream = self.planner_agent.run(f"help user to update the task_list, user's message: {message}",
stream=True, stream_events=True,
session_state=self._planer_session_state)
self._planer_session_state["task_list"], _response = self._planner_stream_handle(planner_stream)
@property
def task_list(self):
return self._planer_session_state["task_list"]
@staticmethod
def _core_team_stream_handle(stream_item):
for event in stream_item:
if event.event == "TeamRunContent":
print(f"{event.content}", end="", flush=True)
elif event.event == "TeamToolCallStarted":
if event.tool.tool_name == "delegate_task_to_member":
print(event.tool)
# print(f"Supervisor began assigning tasks to member - {event.tool.tool_args['member_id']}...")
else:
print(f"Supervisor started using the tools: {event.tool.tool_name}")
elif event.event == "TeamToolCallCompleted":
if event.tool.tool_name == "delegate_task_to_member":
print(f"{event.tool.tool_args['member_id']} has completed the task assigned by the supervisor...")
# print(event.tool)
else:
print(f"Supervisor stop using tools:: {event.tool.tool_name}")
elif event.event == "ToolCallStarted":
print(f"{event.agent_id} Start using tools: {event.tool.tool_name}")
elif event.event == "ToolCallCompleted":
print(f"{event.agent_id} Stop using tools: {event.tool.tool_name}")
elif event.event == "TeamReasoningStep":
print(f"Supervisor is reasoning: {event.content}")
def core_team_start(self):
if not self.task_list:
raise ValueError("Task list is empty, cannot start core team.")
message = f"""
Based on this structured task list, please coordinate with the team members to:
1. Use scout to find specific locations for each task
2. Use optimizer to optimize the route
3. Use weather to check conditions for tomorrow
4. Use traffic to calculate routes and travel times
5. Provide a comprehensive plan and route plan with all details
"Once ALL scout tasks complete, IMMEDIATELY proceed to Step 3"
"DO NOT wait for user input, the user info is already provided in the context"
"You MUST delegate to Optimizer automatically"
Here is the task list:
{self.task_list}
"""
self._team_session_state["task_list"] = self.task_list
team_stream = self.core_team.run(message, stream=True, stream_events=True,
session_state=self._team_session_state,)
self._core_team_stream_handle(team_stream)
team_state = self.core_team.get_session_state()
if __name__ == "__main__":
from src.toolkits.googlemap_toolkit import GoogleMapsToolkit
from src.toolkits.openweather_toolkit import OpenWeatherToolkit
from src.toolkits.optimization_toolkit import OptimizationToolkit
from src.infra.config import get_settings
setting = get_settings()
maps_kit = GoogleMapsToolkit(api_key=setting.google_maps_api_key)
weather_kit = OpenWeatherToolkit(api_key=setting.openweather_api_key)
opt_kit = OptimizationToolkit(api_key=setting.google_maps_api_key)
state = UserState(location=Location(lat=25.058903, lng=121.549131))
settings = get_settings()
model = Gemini(
id="gemini-2.5-flash-lite",
thinking_budget=512,
api_key=settings.gemini_api_key)
main_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=1024,
api_key=settings.gemini_api_key)
models = {"planner": main_model, "team": main_model,
"scout": model, "optimizer": model,
"traffic": model, "weather": model}
tools_dict = {
"scout": [maps_kit.search_places],
"optimizer": [opt_kit.solve_route_optimization],
"traffic": [maps_kit.compute_routes], # 或 get_directions
"weather": [weather_kit],
"team": []
}
am = AgentManager(user_state=state,
models_dict=models,
tool_hooks_dict=tools_dict,
hooks_dict={})
am.planner_message("I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary.")
print(f"\n[Planner Phase Complete]")
print(f"Task List: {am.task_list}")
#if am.task_list:
# print("\n[Core Team Start]")
# am.core_team_start() |