File size: 5,317 Bytes
82bf89e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional, Callable, Dict, Any
from services.logging_service import get_logger
class LongRunningTaskMonitor:
"""
长时间运行任务监控器,用于在MCP工具执行期间定期发送心跳
"""
def __init__(self, heartbeat_interval: int = 300): # 5分钟 = 300秒
self.heartbeat_interval = heartbeat_interval
self.logger = get_logger()
self.active_tasks: Dict[str, Dict[str, Any]] = {}
def start_monitoring(self, task_id: str, task_name: str, chat_id: Optional[str] = None,
heartbeat_callback: Optional[Callable] = None):
"""
开始监控一个长时间运行的任务
Args:
task_id: 任务唯一标识
task_name: 任务名称
chat_id: 聊天ID
heartbeat_callback: 心跳回调函数
"""
self.active_tasks[task_id] = {
'task_name': task_name,
'chat_id': chat_id,
'start_time': time.time(),
'heartbeat_callback': heartbeat_callback,
'last_heartbeat': time.time(),
'heartbeat_count': 0
}
self.logger.log_system_status(
f"Started monitoring long-running task: {task_name}",
{'task_id': task_id, 'chat_id': chat_id}
)
def stop_monitoring(self, task_id: str):
"""
停止监控一个任务
Args:
task_id: 任务唯一标识
"""
if task_id in self.active_tasks:
task_info = self.active_tasks[task_id]
duration = time.time() - task_info['start_time']
self.logger.log_long_running_task(
task_info['task_name'],
duration,
task_info['chat_id']
)
del self.active_tasks[task_id]
async def send_heartbeat(self, task_id: str):
"""
发送心跳信号
Args:
task_id: 任务唯一标识
"""
if task_id not in self.active_tasks:
return
task_info = self.active_tasks[task_id]
current_time = time.time()
# 检查是否需要发送心跳
if current_time - task_info['last_heartbeat'] >= self.heartbeat_interval:
task_info['last_heartbeat'] = current_time
task_info['heartbeat_count'] += 1
duration = current_time - task_info['start_time']
# 记录心跳日志
self.logger.log_system_status(
f"Heartbeat for long-running task: {task_info['task_name']}",
{
'task_id': task_id,
'chat_id': task_info['chat_id'],
'duration_seconds': duration,
'heartbeat_count': task_info['heartbeat_count']
}
)
# 执行心跳回调
if task_info['heartbeat_callback']:
try:
await task_info['heartbeat_callback'](task_id, task_info)
except Exception as e:
self.logger.log_error(
"HeartbeatCallbackError",
str(e),
{'task_id': task_id, 'task_name': task_info['task_name']}
)
async def monitor_all_tasks(self):
"""
监控所有活跃任务并发送心跳
"""
while True:
try:
# 为每个活跃任务发送心跳
for task_id in list(self.active_tasks.keys()):
await self.send_heartbeat(task_id)
# 等待下一次检查
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
self.logger.log_error(
"TaskMonitorError",
str(e),
{'active_tasks_count': len(self.active_tasks)}
)
await asyncio.sleep(60) # 出错后等待1分钟再继续
def get_active_tasks_info(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有活跃任务的信息
Returns:
活跃任务信息字典
"""
result = {}
current_time = time.time()
for task_id, task_info in self.active_tasks.items():
duration = current_time - task_info['start_time']
result[task_id] = {
'task_name': task_info['task_name'],
'chat_id': task_info['chat_id'],
'duration_seconds': duration,
'heartbeat_count': task_info['heartbeat_count'],
'last_heartbeat_seconds_ago': current_time - task_info['last_heartbeat']
}
return result
# 全局任务监控器实例
task_monitor = LongRunningTaskMonitor()
def get_task_monitor():
"""获取全局任务监控器"""
return task_monitor
async def start_task_monitoring():
"""启动任务监控"""
monitor = get_task_monitor()
await monitor.monitor_all_tasks() |