ADMP-LS / client /services /task_monitor.py
jackkuo's picture
reinit repo
82bf89e
import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional, Callable, Dict, Any
from services.logging_service import get_logger
class LongRunningTaskMonitor:
"""
长时间运行任务监控器,用于在MCP工具执行期间定期发送心跳
"""
def __init__(self, heartbeat_interval: int = 300): # 5分钟 = 300秒
self.heartbeat_interval = heartbeat_interval
self.logger = get_logger()
self.active_tasks: Dict[str, Dict[str, Any]] = {}
def start_monitoring(self, task_id: str, task_name: str, chat_id: Optional[str] = None,
heartbeat_callback: Optional[Callable] = None):
"""
开始监控一个长时间运行的任务
Args:
task_id: 任务唯一标识
task_name: 任务名称
chat_id: 聊天ID
heartbeat_callback: 心跳回调函数
"""
self.active_tasks[task_id] = {
'task_name': task_name,
'chat_id': chat_id,
'start_time': time.time(),
'heartbeat_callback': heartbeat_callback,
'last_heartbeat': time.time(),
'heartbeat_count': 0
}
self.logger.log_system_status(
f"Started monitoring long-running task: {task_name}",
{'task_id': task_id, 'chat_id': chat_id}
)
def stop_monitoring(self, task_id: str):
"""
停止监控一个任务
Args:
task_id: 任务唯一标识
"""
if task_id in self.active_tasks:
task_info = self.active_tasks[task_id]
duration = time.time() - task_info['start_time']
self.logger.log_long_running_task(
task_info['task_name'],
duration,
task_info['chat_id']
)
del self.active_tasks[task_id]
async def send_heartbeat(self, task_id: str):
"""
发送心跳信号
Args:
task_id: 任务唯一标识
"""
if task_id not in self.active_tasks:
return
task_info = self.active_tasks[task_id]
current_time = time.time()
# 检查是否需要发送心跳
if current_time - task_info['last_heartbeat'] >= self.heartbeat_interval:
task_info['last_heartbeat'] = current_time
task_info['heartbeat_count'] += 1
duration = current_time - task_info['start_time']
# 记录心跳日志
self.logger.log_system_status(
f"Heartbeat for long-running task: {task_info['task_name']}",
{
'task_id': task_id,
'chat_id': task_info['chat_id'],
'duration_seconds': duration,
'heartbeat_count': task_info['heartbeat_count']
}
)
# 执行心跳回调
if task_info['heartbeat_callback']:
try:
await task_info['heartbeat_callback'](task_id, task_info)
except Exception as e:
self.logger.log_error(
"HeartbeatCallbackError",
str(e),
{'task_id': task_id, 'task_name': task_info['task_name']}
)
async def monitor_all_tasks(self):
"""
监控所有活跃任务并发送心跳
"""
while True:
try:
# 为每个活跃任务发送心跳
for task_id in list(self.active_tasks.keys()):
await self.send_heartbeat(task_id)
# 等待下一次检查
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
self.logger.log_error(
"TaskMonitorError",
str(e),
{'active_tasks_count': len(self.active_tasks)}
)
await asyncio.sleep(60) # 出错后等待1分钟再继续
def get_active_tasks_info(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有活跃任务的信息
Returns:
活跃任务信息字典
"""
result = {}
current_time = time.time()
for task_id, task_info in self.active_tasks.items():
duration = current_time - task_info['start_time']
result[task_id] = {
'task_name': task_info['task_name'],
'chat_id': task_info['chat_id'],
'duration_seconds': duration,
'heartbeat_count': task_info['heartbeat_count'],
'last_heartbeat_seconds_ago': current_time - task_info['last_heartbeat']
}
return result
# 全局任务监控器实例
task_monitor = LongRunningTaskMonitor()
def get_task_monitor():
"""获取全局任务监控器"""
return task_monitor
async def start_task_monitoring():
"""启动任务监控"""
monitor = get_task_monitor()
await monitor.monitor_all_tasks()