Easy-Financial-Report / service /financial_data_cache_manager.py
JC321's picture
Upload 2 files
c9edecd verified
"""
财务数据缓存管理器
用于管理 2024 FY Financial Metrics 和 Latest 3 Years Financial Metrics 的数据加载和缓存
"""
from datetime import datetime
from typing import Callable, Dict, Any
import threading
import time
class DataTask:
"""数据加载任务"""
def __init__(self, company: str, data_type: str, loader_func: Callable):
self.company = company
self.data_type = data_type
self.loader_func = loader_func
self.status = "pending" # pending, running, completed, error
self.result = None
self.error = None
self.start_time = None
self.end_time = None
self.thread = None
def run(self):
"""执行数据加载"""
self.status = "running"
self.start_time = datetime.now()
try:
self.result = self.loader_func()
self.status = "completed"
except Exception as e:
self.status = "error"
self.error = str(e)
import traceback
self.error_trace = traceback.format_exc()
finally:
self.end_time = datetime.now()
def get_age_seconds(self):
"""获取任务年龄(秒)"""
if self.end_time:
return (datetime.now() - self.end_time).total_seconds()
elif self.start_time:
return (datetime.now() - self.start_time).total_seconds()
return 0
class FinancialDataCacheManager:
"""财务数据缓存管理器"""
def __init__(self, cache_ttl_seconds=1800, max_cache_size=100):
self.cache_ttl = cache_ttl_seconds
self.max_cache_size = max_cache_size
self.tasks: Dict[tuple, DataTask] = {}
self.lock = threading.Lock()
self.stats = {
"cache_hits": 0,
"cache_misses": 0,
"background_completions": 0,
"total_requests": 0
}
def get_or_load_data(self, company: str, data_type: str, loader_func: Callable) -> Any:
"""获取或加载财务数据 (同步版本,返回最终结果)"""
with self.lock:
self.stats["total_requests"] += 1
key = (company, data_type)
# 清理过期缓存
self._cleanup_expired_cache()
task = self.tasks.get(key)
# 场景1: 缓存已存在且已完成
if task:
if task.status == "completed":
if task.get_age_seconds() < self.cache_ttl:
self.stats["cache_hits"] += 1
print(f"✅ [Data Cache HIT] {company} - {data_type} (age: {task.get_age_seconds():.1f}s)")
return task.result
else:
print(f"⏰ [Data Cache EXPIRED] {company} - {data_type}")
del self.tasks[key]
task = None
# 场景2: 后台任务正在运行
elif task.status == "running":
self.stats["cache_hits"] += 1
print(f"🔄 [Data Cache WAIT] {company} - {data_type} (running for {task.get_age_seconds():.1f}s)")
# 等待后台任务完成
max_wait = 30
waited = 0
while task.status == "running" and waited < max_wait:
time.sleep(0.5)
waited += 0.5
if task.status == "completed":
self.stats["background_completions"] += 1
print(f"✅ [Data Background COMPLETED] {company} - {data_type}")
return task.result
elif task.status == "error":
print(f"❌ [Data Background ERROR] {company} - {data_type}: {task.error}")
raise Exception(f"Data loading failed: {task.error}")
# 场景3: 之前失败了,重试
elif task.status == "error":
print(f"🔄 [Data Retry after ERROR] {company} - {data_type}")
del self.tasks[key]
task = None
# 场景4: 缓存不存在,启动新任务
if not task:
self.stats["cache_misses"] += 1
print(f"🆕 [Data Cache MISS] {company} - {data_type} - Starting background loading")
task = DataTask(company, data_type, loader_func)
self.tasks[key] = task
task.thread = threading.Thread(target=task.run, daemon=True)
task.thread.start()
# 等待任务完成
max_wait = 30
waited = 0
while task.status == "running" and waited < max_wait:
time.sleep(0.5)
waited += 0.5
if task.status == "completed":
print(f"✅ [Data NEW COMPLETED] {company} - {data_type}")
return task.result
elif task.status == "error":
print(f"❌ [Data NEW ERROR] {company} - {data_type}: {task.error}")
raise Exception(f"Data loading failed: {task.error}")
else:
print(f"⏱️ [Data TIMEOUT] {company} - {data_type}")
raise Exception(f"Data loading timeout after {max_wait}s")
def _cleanup_expired_cache(self):
"""清理过期缓存"""
keys_to_remove = []
for key, task in self.tasks.items():
if task.status == "completed" and task.get_age_seconds() > self.cache_ttl:
keys_to_remove.append(key)
for key in keys_to_remove:
company, data_type = key
print(f"🗑️ [Data Cache CLEANUP] {company} - {data_type}")
del self.tasks[key]
# 限制缓存大小
if len(self.tasks) > self.max_cache_size:
completed_tasks = [(k, v) for k, v in self.tasks.items() if v.status == "completed"]
completed_tasks.sort(key=lambda x: x[1].end_time or datetime.min)
to_remove = len(self.tasks) - self.max_cache_size
for i in range(to_remove):
key, task = completed_tasks[i]
company, data_type = key
print(f"🗑️ [Data Cache SIZE LIMIT] {company} - {data_type}")
del self.tasks[key]
def get_stats(self):
"""获取缓存统计"""
with self.lock:
total = self.stats["total_requests"]
hits = self.stats["cache_hits"]
misses = self.stats["cache_misses"]
hit_rate = (hits / total * 100) if total > 0 else 0
return {
**self.stats,
"hit_rate": f"{hit_rate:.1f}%",
"active_tasks": len([t for t in self.tasks.values() if t.status == "running"]),
"cached_data": len([t for t in self.tasks.values() if t.status == "completed"])
}