File size: 7,401 Bytes
c9edecd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
"""
财务数据缓存管理器
用于管理 2024 FY Financial Metrics 和 Latest 3 Years Financial Metrics 的数据加载和缓存
"""
from datetime import datetime
from typing import Callable, Dict, Any
import threading
import time
class DataTask:
"""数据加载任务"""
def __init__(self, company: str, data_type: str, loader_func: Callable):
self.company = company
self.data_type = data_type
self.loader_func = loader_func
self.status = "pending" # pending, running, completed, error
self.result = None
self.error = None
self.start_time = None
self.end_time = None
self.thread = None
def run(self):
"""执行数据加载"""
self.status = "running"
self.start_time = datetime.now()
try:
self.result = self.loader_func()
self.status = "completed"
except Exception as e:
self.status = "error"
self.error = str(e)
import traceback
self.error_trace = traceback.format_exc()
finally:
self.end_time = datetime.now()
def get_age_seconds(self):
"""获取任务年龄(秒)"""
if self.end_time:
return (datetime.now() - self.end_time).total_seconds()
elif self.start_time:
return (datetime.now() - self.start_time).total_seconds()
return 0
class FinancialDataCacheManager:
"""财务数据缓存管理器"""
def __init__(self, cache_ttl_seconds=1800, max_cache_size=100):
self.cache_ttl = cache_ttl_seconds
self.max_cache_size = max_cache_size
self.tasks: Dict[tuple, DataTask] = {}
self.lock = threading.Lock()
self.stats = {
"cache_hits": 0,
"cache_misses": 0,
"background_completions": 0,
"total_requests": 0
}
def get_or_load_data(self, company: str, data_type: str, loader_func: Callable) -> Any:
"""获取或加载财务数据 (同步版本,返回最终结果)"""
with self.lock:
self.stats["total_requests"] += 1
key = (company, data_type)
# 清理过期缓存
self._cleanup_expired_cache()
task = self.tasks.get(key)
# 场景1: 缓存已存在且已完成
if task:
if task.status == "completed":
if task.get_age_seconds() < self.cache_ttl:
self.stats["cache_hits"] += 1
print(f"✅ [Data Cache HIT] {company} - {data_type} (age: {task.get_age_seconds():.1f}s)")
return task.result
else:
print(f"⏰ [Data Cache EXPIRED] {company} - {data_type}")
del self.tasks[key]
task = None
# 场景2: 后台任务正在运行
elif task.status == "running":
self.stats["cache_hits"] += 1
print(f"🔄 [Data Cache WAIT] {company} - {data_type} (running for {task.get_age_seconds():.1f}s)")
# 等待后台任务完成
max_wait = 30
waited = 0
while task.status == "running" and waited < max_wait:
time.sleep(0.5)
waited += 0.5
if task.status == "completed":
self.stats["background_completions"] += 1
print(f"✅ [Data Background COMPLETED] {company} - {data_type}")
return task.result
elif task.status == "error":
print(f"❌ [Data Background ERROR] {company} - {data_type}: {task.error}")
raise Exception(f"Data loading failed: {task.error}")
# 场景3: 之前失败了,重试
elif task.status == "error":
print(f"🔄 [Data Retry after ERROR] {company} - {data_type}")
del self.tasks[key]
task = None
# 场景4: 缓存不存在,启动新任务
if not task:
self.stats["cache_misses"] += 1
print(f"🆕 [Data Cache MISS] {company} - {data_type} - Starting background loading")
task = DataTask(company, data_type, loader_func)
self.tasks[key] = task
task.thread = threading.Thread(target=task.run, daemon=True)
task.thread.start()
# 等待任务完成
max_wait = 30
waited = 0
while task.status == "running" and waited < max_wait:
time.sleep(0.5)
waited += 0.5
if task.status == "completed":
print(f"✅ [Data NEW COMPLETED] {company} - {data_type}")
return task.result
elif task.status == "error":
print(f"❌ [Data NEW ERROR] {company} - {data_type}: {task.error}")
raise Exception(f"Data loading failed: {task.error}")
else:
print(f"⏱️ [Data TIMEOUT] {company} - {data_type}")
raise Exception(f"Data loading timeout after {max_wait}s")
def _cleanup_expired_cache(self):
"""清理过期缓存"""
keys_to_remove = []
for key, task in self.tasks.items():
if task.status == "completed" and task.get_age_seconds() > self.cache_ttl:
keys_to_remove.append(key)
for key in keys_to_remove:
company, data_type = key
print(f"🗑️ [Data Cache CLEANUP] {company} - {data_type}")
del self.tasks[key]
# 限制缓存大小
if len(self.tasks) > self.max_cache_size:
completed_tasks = [(k, v) for k, v in self.tasks.items() if v.status == "completed"]
completed_tasks.sort(key=lambda x: x[1].end_time or datetime.min)
to_remove = len(self.tasks) - self.max_cache_size
for i in range(to_remove):
key, task = completed_tasks[i]
company, data_type = key
print(f"🗑️ [Data Cache SIZE LIMIT] {company} - {data_type}")
del self.tasks[key]
def get_stats(self):
"""获取缓存统计"""
with self.lock:
total = self.stats["total_requests"]
hits = self.stats["cache_hits"]
misses = self.stats["cache_misses"]
hit_rate = (hits / total * 100) if total > 0 else 0
return {
**self.stats,
"hit_rate": f"{hit_rate:.1f}%",
"active_tasks": len([t for t in self.tasks.values() if t.status == "running"]),
"cached_data": len([t for t in self.tasks.values() if t.status == "completed"])
}
|