LifeFlow-AI / src /agent /core_team.py
Marco310's picture
buildup agent system
b7d08cf
raw
history blame
9.93 kB
from agno.db.sqlite import SqliteDb
from src.agent.base import creat_agent, creat_team
from src.infra.logger import get_logger
logger = get_logger(__name__)
TEAM_NAME = "team"
def _creat_member(name, model, tools, base_kwargs):
member = creat_agent(name=name,
model=model,
tools=tools,
add_session_state_to_context=True,
add_datetime_to_context=True,
**base_kwargs
)
logger.debug(f"👤 Member Agent created - {name} | Model: {model} | Tools: {tools}")
return member
def create_core_team(model_dict, base_kwargs={}, tools_dict=None, session_id=None):
team_db = SqliteDb(db_file="tmp/team.db")
team_model = model_dict.pop(TEAM_NAME)
team_tools = tools_dict.pop(TEAM_NAME, []) if tools_dict else []
member = []
for member_name in model_dict.keys():
member_agent = _creat_member(
name=member_name,
model=model_dict[member_name],
tools=tools_dict.get(member_name, []) if tools_dict else [],
base_kwargs=base_kwargs,
)
member.append(member_agent)
main_team = creat_team(
name=TEAM_NAME,
model=team_model,
db=team_db,
members=member,
tools=team_tools,
add_session_state_to_context=True,
markdown=True,
session_id=session_id,
debug_mode=False,
debug_level=1,
)
logger.info(f"🧑‍✈️ Multi-agent created - {main_team.session_id}")
return main_team
if __name__ == "__main__":
from agno.models.google import Gemini
from agno.agent import RunEvent
from src.infra.config import get_settings
from src.agent.base import UserState, Location, get_context
from src.agent.planner import create_planner_agent
import uuid, json
from src.infra.poi_repository import poi_repo
from src.infra.context import set_session_id
from agno.run.team import TeamRunEvent
from src.tools import (ScoutToolkit, OptimizationToolkit,
NavigationToolkit, WeatherToolkit, ReaderToolkit)
session_id = str(uuid.uuid4())
token = set_session_id(session_id)
print(f"🆔 Session ID: {session_id} | Token: {token}")
#user_message = "明天我需要到台大醫院看病, 而且要去郵局和 買菜"
user_message = "I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary."
setting = get_settings()
planner_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=2048,
api_key=setting.gemini_api_key)
main_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=1024,
api_key=setting.gemini_api_key)
model = Gemini(
id="gemini-2.5-flash-lite",
api_key=setting.gemini_api_key)
models_dict = {
TEAM_NAME: main_model,
"scout": main_model,
"optimizer": model,
"navigator": model,
"weatherman": model,
"presenter": main_model,
}
tools_dict = {
"scout": [ScoutToolkit()],
"optimizer": [OptimizationToolkit()],
"navigator": [NavigationToolkit()],
"weatherman": [WeatherToolkit()],
"presenter": [ReaderToolkit()],
}
use_state = UserState(location=Location(lat=25.058903, lng=121.549131))
planner_kwargs = {
"additional_context": get_context(use_state),
"timezone_identifier": use_state.utc_offset,
"debug_mode": False,
}
team_kwargs = {
"timezone_identifier": use_state.utc_offset,
}
planer_agent = create_planner_agent(planner_model, planner_kwargs, session_id=session_id)
core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session_id)
def planner_stream_handle(stream_item):
show = True
response = ""
for chuck in stream_item:
if chuck.event == RunEvent.run_content:
content = chuck.content
response += chuck.content
if show:
if "@@@" in response:
show = False
content = content.split("@@@")[0]
print(content)
json_data = "{" + response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "")
return json_data, response
def planner_message(agent, message):
stream = agent.run(f"help user to update the task_list, user's message: {message}",
stream=True, stream_events=True)
task_list, _response = planner_stream_handle(stream)
agent.update_session_state(
session_id=agent.session_id,
session_state_updates={"task_list": task_list},
)
import time
start = time.time()
planner_message(planer_agent, user_message)
print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds")
task_list_input = planer_agent.get_session_state()["task_list"]
print(task_list_input)
task_list_input = json.dumps(task_list_input, indent=2, ensure_ascii=False).replace("`", "").replace("@", "")
prompt_msg = f"""
⚠️ **PIPELINE START COMMAND** ⚠️
Here is the Task List JSON generated by the Planner.
**IMMEDIATELY** execute **Step 1** of your protocol: Send this data to **Scout**.
[DATA START]
{task_list_input}
[DATA END]
"""
start = time.time()
team_stream = core_team.run(
f"Plan this trip: {task_list_input}",
stream=True,
stream_events=True, # 確保開啟事件串流
session_id=session_id,
)
for event in team_stream:
# 1. 這裡印出 LLM 的思考過程或對話
if event.event in [TeamRunEvent.run_content]:
print(event.content, end="")
# 2. 這裡印出 LLM 決定呼叫工具的瞬間 (關鍵!)
elif event.event == "tool_call":
print(f"\n🔵 Leader 正在呼叫工具: {event.tool_call.get('function', {}).get('name')}")
# 3. 這裡印出工具執行完畢的回傳 (Scout 查完後會觸發這個)
elif event.event == "tool_output":
print(f"\n🟢 工具回傳結果 (Ref ID): {str(event.tool_output)[:50]}...")
# 4. 這裡印出錯誤 (如果有)
elif event.event == "run_failed":
print(f"\n🔴 執行失敗: {event.error}")
if event.event == TeamRunEvent.run_completed:
# Access total tokens from the completed event
print(f"\nTotal tokens: {event.metrics.total_tokens}")
print(f"Input tokens: {event.metrics.input_tokens}")
print(f"Output tokens: {event.metrics.output_tokens}")
print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds")
final_ref_id = poi_repo.get_last_id_by_session(session_id)
# ... (前略)
if final_ref_id:
print(f"\n\n🎯 Found Final Reference ID: {final_ref_id}")
# 從 DB 讀取完整的 JSON
structured_data = poi_repo.load(final_ref_id)
if structured_data:
# --- A. 提取與修正 Polyline & Legs (交通細節) ---
traffic_res = structured_data.get("precise_traffic_result", {})
raw_legs = traffic_res.get("legs", [])
# ... (中間 Polyline 處理邏輯保持不變) ...
polylines = []
segments = []
for i, leg in enumerate(raw_legs):
# ... (保留原有的 polyline 處理代碼)
p_data = leg.get("polyline")
if isinstance(p_data, dict):
p_str = p_data.get("encodedPolyline", "")
else:
p_str = str(p_data)
polylines.append(p_str)
segments.append({
"segment_index": i,
"distance_meters": leg.get("distance_meters", 0),
"duration_seconds": leg.get("duration_seconds", 0),
"description": f"Leg {i + 1}"
})
# --- B. 組裝給前端的最終 Payload ---
api_response = {
"meta": {
"status": "success",
"trip_title": "Custom Trip Plan",
# 繼承原本的交通摘要
"total_distance_km": structured_data.get("traffic_summary", {}).get("total_distance_km"),
"total_duration_min": structured_data.get("traffic_summary", {}).get("total_duration_min")
},
# ✅ [CRITICAL NEW] 這裡就是快樂表的數據!
# 前端需要這個來畫綠色的進度條
"optimization_metrics": structured_data.get("metrics"),
"route_geometry": {
"polylines": polylines
},
"route_segments": segments,
"itinerary": structured_data.get("timeline", [])
}
print("✅ Data Payload Constructed!")
# 檢查一下快樂表是否存在
if api_response.get("optimization_metrics"):
print("🎉 Optimization Metrics included in final payload!")
else:
print("⚠️ Warning: Optimization Metrics missing from payload.")
print(f"\n📦 JSON Preview :\n{json.dumps(api_response, indent=2, ensure_ascii=False)}")
# (Optional) 寫入檔案以便完整檢視
# with open("final_trip_payload.json", "w", encoding="utf-8") as f:
# json.dump(api_response, f, ensure_ascii=False, indent=2)
# print("\n💾 Full JSON saved to 'final_trip_payload.json'")
else:
print("❌ Error: ID found but data is empty.")
else:
print("⚠️ Warning: No data saved in this run.")