LifeFlow-AI / src /tools /scout_toolkit.py
Marco310's picture
feat: 🚀 Evolve to Stateful MCP Architecture with Context Injection Middleware
529a8bd
import json
import ast
from agno.tools import Toolkit
from src.services.googlemap_api_service import GoogleMapAPIService
from src.infra.poi_repository import poi_repo
from src.infra.logger import get_logger
logger = get_logger(__name__)
MAX_SEARCH = 1000
class ScoutToolkit(Toolkit):
def __init__(self, google_maps_api_key: str):
super().__init__(name="scout_toolkit")
self.gmaps = GoogleMapAPIService(api_key=google_maps_api_key)
self.register(self.search_and_offload)
def _extract_first_json_object(self, text: str) -> str:
text = text.strip()
# 1. 尋找第一個 '{'
start_idx = text.find('{')
if start_idx == -1:
return text # 找不到,原樣回傳碰運氣
# 2. 開始數括號
balance = 0
for i in range(start_idx, len(text)):
char = text[i]
if char == '{':
balance += 1
elif char == '}':
balance -= 1
# 當括號歸零時,代表找到了一個完整的 JSON Object
if balance == 0:
return text[start_idx: i + 1]
# 如果跑完迴圈 balance 還不是 0,代表 JSON 被截斷了 (Truncated)
return text[start_idx:]
def _robust_parse_json(self, text: str) -> dict:
"""
強力解析器
"""
# 1. 先移除 Markdown Code Block 標記 (如果有)
if "```" in text:
lines = text.split('\n')
clean_lines = []
in_code = False
for line in lines:
if "```" in line:
in_code = not in_code
continue
if in_code: # 只保留 code block 內的內容
clean_lines.append(line)
# 如果有提取到內容,就用提取的;否則假設整個 text 都是
if clean_lines:
text = "\n".join(clean_lines)
# 2. 使用堆疊提取器抓出純淨的 JSON 字串
json_str = self._extract_first_json_object(text)
# 3. 第一關:標準 JSON load
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
# 4. 第二關:處理 Python 風格 (單引號, True/False/None)
try:
return ast.literal_eval(json_str)
except (ValueError, SyntaxError):
pass
# 5. 第三關:暴力修正 (針對 Python 字串中的 unescaped quotes)
# 嘗試把 Python 的 None/True/False 換成 JSON 格式
try:
fixed_text = json_str.replace("True", "true").replace("False", "false").replace("None", "null")
return json.loads(fixed_text)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse JSON via all methods. Raw: {text}...") from e
def search_and_offload(self, task_list_json: str) -> str:
"""
Performs a proximity search for POIs based on the provided tasks and global context, then offloads results to the DB.
CRITICAL: The input JSON **MUST** include the 'global_info' section containing 'start_location' (lat, lng) to ensure searches are performed nearby the user's starting point, not in a random location.
Args:
task_list_json (str): A JSON formatted string. The structure must be:
{
"global_info": {
"language": str,
"plan_type": str
"return_to_start": bool,
"start_location": ...,
"departure_time": str,
"deadline": str or null,
},
"tasks": [
{
"task_id": 1,
"category": "MEAL" | "LEISURE" | "ERRAND" | "SHOPPING",
"description": "Short description",
"location_hint": "Clean Keyword for Google Maps",
"priority": "HIGH" | "MEDIUM" | "LOW",
"service_duration_min": 30,
"time_window": {
"earliest_time": "ISO 8601" or null,
"latest_time": "ISO 8601" or null}
}
...,
]
}
Returns:
str: A Ref_id of DB system.
"""
try:
#print("task_list_json", task_list_json)
data = self._robust_parse_json(task_list_json)
tasks = data.get("tasks", [])
global_info = data.get("global_info", {})
except Exception as e:
logger.warning(f"❌ JSON Parse Error: {e}")
# 這裡回傳錯誤訊息給 Agent,讓它知道格式錯了,它通常會自我修正並重試
return f"❌ Error: Invalid JSON format. Please output RAW JSON only. Details: {e}"
logger.debug(f"🕵️ Scout: Processing Global Info & {len(tasks)} tasks...")
# ============================================================
# 1. 處理 Start Location & 設定錨點 (兼容性修復版)
# ============================================================
# Helper: 提取 lat/lng (兼容 lat/latitude)
def extract_lat_lng(d):
if not isinstance(d, dict): return None, None
lat = d.get("lat") or d.get("latitude")
lng = d.get("lng") or d.get("longitude")
return lat, lng
start_loc = global_info.get("start_location")
logger.info(f"🕵️ Scout: Start Location - {start_loc}")
anchor_point = None
if isinstance(start_loc, str):
logger.debug(f"🕵️ Scout: Resolving Start Location Name '{start_loc}'...")
try:
results = self.gmaps.text_search(query=start_loc, limit=1)
if results:
loc = results[0].get("location", {})
lat = loc.get("latitude") or loc.get("lat")
lng = loc.get("longitude") or loc.get("lng")
name = results[0].get("name") or start_loc
global_info["start_location"] = {"name": name, "lat": lat, "lng": lng}
anchor_point = {"lat": lat, "lng": lng}
logger.info(f" ✅ Resolved Start: {name}")
except Exception as e:
pass
logger.warning(f" ❌ Error searching start location: {e}")
elif isinstance(start_loc, dict):
lat, lng = extract_lat_lng(start_loc)
if lat is not None and lng is not None:
# 已經有座標
anchor_point = {"lat": lat, "lng": lng}
global_info["start_location"] = {"name": "User Location", "lat": lat, "lng": lng}
logger.info(f" ✅ Anchor Point set from input: {anchor_point}")
else:
query_name = start_loc.get("name", "Unknown Start")
logger.info(f"🕵️ Scout: Resolving Start Location Dict '{query_name}'...")
try:
results = self.gmaps.text_search(query=query_name, limit=1)
if results:
loc = results[0].get("location", {})
lat = loc.get("latitude") or loc.get("lat")
lng = loc.get("longitude") or loc.get("lng")
global_info["start_location"] = {
"name": results[0].get("name", query_name),
"lat": lat, "lng": lng
}
anchor_point = {"lat": lat, "lng": lng}
logger.info(f" ✅ Resolved Start: {global_info['start_location']}")
except Exception as e:
logger.warning(f" ❌ Error searching start location: {e}")
return json.dumps({
"status": "ERROR",
#"message": "Search POI complete.",
"scout_ref": None,
"note": "Failed to resolve start location., Please check the input and try again."
})
total_tasks_count = len(tasks)
total_node_budget = MAX_SEARCH ** 0.5
HARD_CAP_PER_TASK = 15
MIN_LIMIT = 3
logger.info(f"🕵️ Scout: Starting Adaptive Search (Budget: {total_node_budget} nodes)")
enriched_tasks = []
for i, task in enumerate(tasks):
tasks_remaining = total_tasks_count - i
if total_node_budget <= 0:
current_limit = MIN_LIMIT
else:
allocation = total_node_budget // tasks_remaining
current_limit = max(MIN_LIMIT, min(HARD_CAP_PER_TASK, allocation))
desc = task.get("description", "")
hint = task.get("location_hint", "")
query = hint if hint else desc
if not query: query = "Unknown Location"
try:
places = self.gmaps.text_search(
query=query,
limit=current_limit,
location=anchor_point
)
except Exception as e:
logger.warning(f"⚠️ Search failed for {query}: {e}")
places = []
candidates = []
for p in places:
loc = p.get("location", {})
lat = loc.get("latitude") if "latitude" in loc else loc.get("lat")
lng = loc.get("longitude") if "longitude" in loc else loc.get("lng")
if lat is not None and lng is not None:
candidates.append({
"poi_id": p.get("place_id") or p.get("id"),
"name": p.get("name") or p.get("displayName", {}).get("text"),
"lat": lat,
"lng": lng,
"rating": p.get("rating"),
"time_window": None
})
# ✅ ID Fallback 機制
raw_id = task.get("task_id") or task.get("id") # 兼容 task_id 和 id
if raw_id and str(raw_id).strip().lower() not in ["none", "", "null"]:
task_id = str(raw_id)
else:
task_id = f"task_{i + 1}"
task_entry = {
"task_id": task_id,
"priority": task.get("priority", "MEDIUM"),
"service_duration_min": task.get("service_duration_min", 60), # 兼容欄位名
"time_window": task.get("time_window"),
"candidates": candidates
}
enriched_tasks.append(task_entry)
logger.info(f" - Task {task_id}: Found {len(candidates)} POIs")
full_payload = {"global_info": global_info, "tasks": enriched_tasks}
ref_id = poi_repo.save(full_payload, data_type="scout_result")
return json.dumps({
"status": "SUCCESS",
"message_task": len(enriched_tasks),
"scout_ref": ref_id,
"note": "Please pass this scout_ref to the Optimizer immediately."
})