Marco310's picture
buildup agent system
b7d08cf
raw
history blame
8.03 kB
from agno.agent import RunEvent
from agno.models.google import Gemini
from agno.db.sqlite import SqliteDb
from src.infra.logger import get_logger
from src.agent.base import creat_agent, creat_team, get_context, UserState, Location
logger = get_logger(__name__)
class AgentManager:
def __init__(self, user_state: UserState, models_dict: dict, tool_hooks_dict: dict, hooks_dict: dict):
self.base_kwargs = {
"additional_context": get_context(user_state),
"timezone_identifier": user_state.utc_offset,
"add_datetime_to_context": True,
}
self._planer_db = SqliteDb(db_file="tmp/agents.db")
self._planer_session_state = {"task_list": None}
self.planner_agent = creat_agent(name="planner",
model=models_dict["planner"],
add_session_state_to_context=True,
db=self._planer_db,
markdown=True,
**self.base_kwargs)
self._creat_agent_team(
models=models_dict,
tools=tool_hooks_dict,
hooks=hooks_dict,
)
def _creat_agent_team(self,
models: dict[str: object],
tools: dict[str: object],
hooks: dict[str: object]):
self._team_db = SqliteDb(db_file="tmp/team.db")
self._team_session_state = {
"scout_pois": {},
"optimized_route": {},
"traffic_data": [],
"weather_forecasts": {},
"task_list": None,
}
agnent_members = ["scout", "optimizer", "traffic", "weather"]
for name in agnent_members:
_agent = creat_agent(
name=name,
model=models[name],
tools=tools.get(name, []),
tool_hooks=hooks.get(name, []),
markdown=False,
**self.base_kwargs
)
setattr(self, f"{name}_agent", _agent)
self.core_team = creat_team(
name="team",
model=models["team"],
members=[getattr(self, f"{name}_agent") for name in agnent_members],
tools=tools.get("team", []),
tool_hooks=hooks.get("team", []),
db=self._team_db,
markdown=True,
**self.base_kwargs
)
@staticmethod
def _planner_stream_handle(stream_item):
show = True
response = ""
for chuck in stream_item:
if chuck.event == RunEvent.run_content:
content = chuck.content
response += chuck.content
if show:
if "@@@" in response:
show = False
content = content.split("@@@")[0]
print(content)
json_data = "{" + response.split("{", maxsplit=1)[-1]
return json_data, response
def planner_message(self, message):
planner_stream = self.planner_agent.run(f"help user to update the task_list, user's message: {message}",
stream=True, stream_events=True,
session_state=self._planer_session_state)
self._planer_session_state["task_list"], _response = self._planner_stream_handle(planner_stream)
@property
def task_list(self):
return self._planer_session_state["task_list"]
@staticmethod
def _core_team_stream_handle(stream_item):
for event in stream_item:
if event.event == "TeamRunContent":
print(f"{event.content}", end="", flush=True)
elif event.event == "TeamToolCallStarted":
if event.tool.tool_name == "delegate_task_to_member":
print(event.tool)
# print(f"Supervisor began assigning tasks to member - {event.tool.tool_args['member_id']}...")
else:
print(f"Supervisor started using the tools: {event.tool.tool_name}")
elif event.event == "TeamToolCallCompleted":
if event.tool.tool_name == "delegate_task_to_member":
print(f"{event.tool.tool_args['member_id']} has completed the task assigned by the supervisor...")
# print(event.tool)
else:
print(f"Supervisor stop using tools:: {event.tool.tool_name}")
elif event.event == "ToolCallStarted":
print(f"{event.agent_id} Start using tools: {event.tool.tool_name}")
elif event.event == "ToolCallCompleted":
print(f"{event.agent_id} Stop using tools: {event.tool.tool_name}")
elif event.event == "TeamReasoningStep":
print(f"Supervisor is reasoning: {event.content}")
def core_team_start(self):
if not self.task_list:
raise ValueError("Task list is empty, cannot start core team.")
message = f"""
Based on this structured task list, please coordinate with the team members to:
1. Use scout to find specific locations for each task
2. Use optimizer to optimize the route
3. Use weather to check conditions for tomorrow
4. Use traffic to calculate routes and travel times
5. Provide a comprehensive plan and route plan with all details
"Once ALL scout tasks complete, IMMEDIATELY proceed to Step 3"
"DO NOT wait for user input, the user info is already provided in the context"
"You MUST delegate to Optimizer automatically"
Here is the task list:
{self.task_list}
"""
self._team_session_state["task_list"] = self.task_list
team_stream = self.core_team.run(message, stream=True, stream_events=True,
session_state=self._team_session_state,)
self._core_team_stream_handle(team_stream)
team_state = self.core_team.get_session_state()
if __name__ == "__main__":
from src.toolkits.googlemap_toolkit import GoogleMapsToolkit
from src.toolkits.openweather_toolkit import OpenWeatherToolkit
from src.toolkits.optimization_toolkit import OptimizationToolkit
from src.infra.config import get_settings
setting = get_settings()
maps_kit = GoogleMapsToolkit(api_key=setting.google_maps_api_key)
weather_kit = OpenWeatherToolkit(api_key=setting.openweather_api_key)
opt_kit = OptimizationToolkit(api_key=setting.google_maps_api_key)
state = UserState(location=Location(lat=25.058903, lng=121.549131))
settings = get_settings()
model = Gemini(
id="gemini-2.5-flash-lite",
thinking_budget=512,
api_key=settings.gemini_api_key)
main_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=1024,
api_key=settings.gemini_api_key)
models = {"planner": main_model, "team": main_model,
"scout": model, "optimizer": model,
"traffic": model, "weather": model}
tools_dict = {
"scout": [maps_kit.search_places],
"optimizer": [opt_kit.solve_route_optimization],
"traffic": [maps_kit.compute_routes], # 或 get_directions
"weather": [weather_kit],
"team": []
}
am = AgentManager(user_state=state,
models_dict=models,
tool_hooks_dict=tools_dict,
hooks_dict={})
am.planner_message("I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary.")
print(f"\n[Planner Phase Complete]")
print(f"Task List: {am.task_list}")
#if am.task_list:
# print("\n[Core Team Start]")
# am.core_team_start()