LifeFlow-AI / src /tools /reader_toolkit.py
Marco310's picture
feat: πŸš€ Evolve to Stateful MCP Architecture with Context Injection Middleware
529a8bd
import json
from agno.tools import Toolkit
from src.infra.poi_repository import poi_repo
from src.infra.context import get_session_id
from src.infra.logger import get_logger
logger = get_logger(__name__)
class ReaderToolkit(Toolkit):
def __init__(self):
super().__init__(name="reader_toolkit")
self.register(self.read_final_itinerary)
@staticmethod
def read_final_itinerary(ref_id: str) -> str:
"""
Retrieves the complete, enriched itinerary data for final presentation.
This tool acts as the 'Data Fetcher' for the Presenter. It loads the fully processed
trip plan (including Weather, Traffic, and Optimized Route) associated with the `ref_id`.
Args:
ref_id (str): The unique reference ID returned by the Weatherman Agent (e.g., "final_itinerary_xyz").
This ID links to the completed dataset ready for reporting.
Returns:
str: A structured JSON string containing the full trip details.
Structure:
{
"status": "COMPLETE" | "INCOMPLETE",
"global_info": { ... },
"traffic_summary": { "total_distance": ..., "total_drive_time": ... },
"schedule": [
{ "time": "10:00", "location": "...", "weather": "...", "air_quality": "..." },
...,
]
}
"""
logger.info(f"πŸ“– Presenter: QA Loading Ref {ref_id}...")
data = poi_repo.load(ref_id)
if not data:
logger.warning(f"⚠️ Warning: Ref ID '{ref_id}' not found.")
session_id = get_session_id()
if session_id:
latest_id = poi_repo.get_last_id_by_session(session_id)
if latest_id and latest_id != ref_id:
logger.warning(f"πŸ”„ Auto-Correcting: Switching to latest Session ID: {latest_id}")
data = poi_repo.load(latest_id)
if not data:
return "CRITICAL_ERROR: Ref ID not found."
if not data.get("timeline"):
return json.dumps({"status": "INCOMPLETE", "action_required": "DELEGATE_BACK_TO_WEATHERMAN"})
traffic = data.get("traffic_summary", {})
global_info = data.get("global_info", {})
timeline = data.get("timeline", [])
cleaned_timeline = []
for stop in timeline:
addr = stop.get("address", "")
if not addr:
coords = stop.get("coordinates", {})
addr = f"coords: {coords.get('lat'):.4f}, {coords.get('lng'):.4f}"
aqi_data = stop.get("aqi", {})
aqi_text = aqi_data.get("label", "N/A")
cleaned_timeline.append({
"time": stop.get("time"),
"location": stop.get("location"),
"address": addr,
"weather": stop.get("weather"),
"air_quality": aqi_text,
"travel_time_from_prev": stop.get("travel_time_from_prev", "- mins"),
"travel_mode": stop.get("travel_mode", "DRIVE")
})
summary_view = {
"status": "COMPLETE",
"global_info": global_info,
"traffic_summary": {
"total_distance": f"{traffic.get('total_distance_km', 0):.1f} km",
"total_drive_time": f"{traffic.get('total_duration_min', 0)} mins",
},
"schedule": cleaned_timeline
}
return json.dumps(summary_view, ensure_ascii=False, indent=2)