Spaces:
Running
Running
File size: 9,927 Bytes
b7d08cf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
from agno.db.sqlite import SqliteDb
from src.agent.base import creat_agent, creat_team
from src.infra.logger import get_logger
logger = get_logger(__name__)
TEAM_NAME = "team"
def _creat_member(name, model, tools, base_kwargs):
member = creat_agent(name=name,
model=model,
tools=tools,
add_session_state_to_context=True,
add_datetime_to_context=True,
**base_kwargs
)
logger.debug(f"👤 Member Agent created - {name} | Model: {model} | Tools: {tools}")
return member
def create_core_team(model_dict, base_kwargs={}, tools_dict=None, session_id=None):
team_db = SqliteDb(db_file="tmp/team.db")
team_model = model_dict.pop(TEAM_NAME)
team_tools = tools_dict.pop(TEAM_NAME, []) if tools_dict else []
member = []
for member_name in model_dict.keys():
member_agent = _creat_member(
name=member_name,
model=model_dict[member_name],
tools=tools_dict.get(member_name, []) if tools_dict else [],
base_kwargs=base_kwargs,
)
member.append(member_agent)
main_team = creat_team(
name=TEAM_NAME,
model=team_model,
db=team_db,
members=member,
tools=team_tools,
add_session_state_to_context=True,
markdown=True,
session_id=session_id,
debug_mode=False,
debug_level=1,
)
logger.info(f"🧑✈️ Multi-agent created - {main_team.session_id}")
return main_team
if __name__ == "__main__":
from agno.models.google import Gemini
from agno.agent import RunEvent
from src.infra.config import get_settings
from src.agent.base import UserState, Location, get_context
from src.agent.planner import create_planner_agent
import uuid, json
from src.infra.poi_repository import poi_repo
from src.infra.context import set_session_id
from agno.run.team import TeamRunEvent
from src.tools import (ScoutToolkit, OptimizationToolkit,
NavigationToolkit, WeatherToolkit, ReaderToolkit)
session_id = str(uuid.uuid4())
token = set_session_id(session_id)
print(f"🆔 Session ID: {session_id} | Token: {token}")
#user_message = "明天我需要到台大醫院看病, 而且要去郵局和 買菜"
user_message = "I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary."
setting = get_settings()
planner_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=2048,
api_key=setting.gemini_api_key)
main_model = Gemini(
id="gemini-2.5-flash",
thinking_budget=1024,
api_key=setting.gemini_api_key)
model = Gemini(
id="gemini-2.5-flash-lite",
api_key=setting.gemini_api_key)
models_dict = {
TEAM_NAME: main_model,
"scout": main_model,
"optimizer": model,
"navigator": model,
"weatherman": model,
"presenter": main_model,
}
tools_dict = {
"scout": [ScoutToolkit()],
"optimizer": [OptimizationToolkit()],
"navigator": [NavigationToolkit()],
"weatherman": [WeatherToolkit()],
"presenter": [ReaderToolkit()],
}
use_state = UserState(location=Location(lat=25.058903, lng=121.549131))
planner_kwargs = {
"additional_context": get_context(use_state),
"timezone_identifier": use_state.utc_offset,
"debug_mode": False,
}
team_kwargs = {
"timezone_identifier": use_state.utc_offset,
}
planer_agent = create_planner_agent(planner_model, planner_kwargs, session_id=session_id)
core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session_id)
def planner_stream_handle(stream_item):
show = True
response = ""
for chuck in stream_item:
if chuck.event == RunEvent.run_content:
content = chuck.content
response += chuck.content
if show:
if "@@@" in response:
show = False
content = content.split("@@@")[0]
print(content)
json_data = "{" + response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "")
return json_data, response
def planner_message(agent, message):
stream = agent.run(f"help user to update the task_list, user's message: {message}",
stream=True, stream_events=True)
task_list, _response = planner_stream_handle(stream)
agent.update_session_state(
session_id=agent.session_id,
session_state_updates={"task_list": task_list},
)
import time
start = time.time()
planner_message(planer_agent, user_message)
print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds")
task_list_input = planer_agent.get_session_state()["task_list"]
print(task_list_input)
task_list_input = json.dumps(task_list_input, indent=2, ensure_ascii=False).replace("`", "").replace("@", "")
prompt_msg = f"""
⚠️ **PIPELINE START COMMAND** ⚠️
Here is the Task List JSON generated by the Planner.
**IMMEDIATELY** execute **Step 1** of your protocol: Send this data to **Scout**.
[DATA START]
{task_list_input}
[DATA END]
"""
start = time.time()
team_stream = core_team.run(
f"Plan this trip: {task_list_input}",
stream=True,
stream_events=True, # 確保開啟事件串流
session_id=session_id,
)
for event in team_stream:
# 1. 這裡印出 LLM 的思考過程或對話
if event.event in [TeamRunEvent.run_content]:
print(event.content, end="")
# 2. 這裡印出 LLM 決定呼叫工具的瞬間 (關鍵!)
elif event.event == "tool_call":
print(f"\n🔵 Leader 正在呼叫工具: {event.tool_call.get('function', {}).get('name')}")
# 3. 這裡印出工具執行完畢的回傳 (Scout 查完後會觸發這個)
elif event.event == "tool_output":
print(f"\n🟢 工具回傳結果 (Ref ID): {str(event.tool_output)[:50]}...")
# 4. 這裡印出錯誤 (如果有)
elif event.event == "run_failed":
print(f"\n🔴 執行失敗: {event.error}")
if event.event == TeamRunEvent.run_completed:
# Access total tokens from the completed event
print(f"\nTotal tokens: {event.metrics.total_tokens}")
print(f"Input tokens: {event.metrics.input_tokens}")
print(f"Output tokens: {event.metrics.output_tokens}")
print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds")
final_ref_id = poi_repo.get_last_id_by_session(session_id)
# ... (前略)
if final_ref_id:
print(f"\n\n🎯 Found Final Reference ID: {final_ref_id}")
# 從 DB 讀取完整的 JSON
structured_data = poi_repo.load(final_ref_id)
if structured_data:
# --- A. 提取與修正 Polyline & Legs (交通細節) ---
traffic_res = structured_data.get("precise_traffic_result", {})
raw_legs = traffic_res.get("legs", [])
# ... (中間 Polyline 處理邏輯保持不變) ...
polylines = []
segments = []
for i, leg in enumerate(raw_legs):
# ... (保留原有的 polyline 處理代碼)
p_data = leg.get("polyline")
if isinstance(p_data, dict):
p_str = p_data.get("encodedPolyline", "")
else:
p_str = str(p_data)
polylines.append(p_str)
segments.append({
"segment_index": i,
"distance_meters": leg.get("distance_meters", 0),
"duration_seconds": leg.get("duration_seconds", 0),
"description": f"Leg {i + 1}"
})
# --- B. 組裝給前端的最終 Payload ---
api_response = {
"meta": {
"status": "success",
"trip_title": "Custom Trip Plan",
# 繼承原本的交通摘要
"total_distance_km": structured_data.get("traffic_summary", {}).get("total_distance_km"),
"total_duration_min": structured_data.get("traffic_summary", {}).get("total_duration_min")
},
# ✅ [CRITICAL NEW] 這裡就是快樂表的數據!
# 前端需要這個來畫綠色的進度條
"optimization_metrics": structured_data.get("metrics"),
"route_geometry": {
"polylines": polylines
},
"route_segments": segments,
"itinerary": structured_data.get("timeline", [])
}
print("✅ Data Payload Constructed!")
# 檢查一下快樂表是否存在
if api_response.get("optimization_metrics"):
print("🎉 Optimization Metrics included in final payload!")
else:
print("⚠️ Warning: Optimization Metrics missing from payload.")
print(f"\n📦 JSON Preview :\n{json.dumps(api_response, indent=2, ensure_ascii=False)}")
# (Optional) 寫入檔案以便完整檢視
# with open("final_trip_payload.json", "w", encoding="utf-8") as f:
# json.dump(api_response, f, ensure_ascii=False, indent=2)
# print("\n💾 Full JSON saved to 'final_trip_payload.json'")
else:
print("❌ Error: ID found but data is empty.")
else:
print("⚠️ Warning: No data saved in this run.")
|