File size: 7,401 Bytes
c9edecd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""

财务数据缓存管理器

用于管理 2024 FY Financial Metrics 和 Latest 3 Years Financial Metrics 的数据加载和缓存

"""

from datetime import datetime
from typing import Callable, Dict, Any
import threading
import time


class DataTask:
    """数据加载任务"""
    def __init__(self, company: str, data_type: str, loader_func: Callable):
        self.company = company
        self.data_type = data_type
        self.loader_func = loader_func
        self.status = "pending"  # pending, running, completed, error
        self.result = None
        self.error = None
        self.start_time = None
        self.end_time = None
        self.thread = None
        
    def run(self):
        """执行数据加载"""
        self.status = "running"
        self.start_time = datetime.now()
        
        try:
            self.result = self.loader_func()
            self.status = "completed"
        except Exception as e:
            self.status = "error"
            self.error = str(e)
            import traceback
            self.error_trace = traceback.format_exc()
        finally:
            self.end_time = datetime.now()
    
    def get_age_seconds(self):
        """获取任务年龄(秒)"""
        if self.end_time:
            return (datetime.now() - self.end_time).total_seconds()
        elif self.start_time:
            return (datetime.now() - self.start_time).total_seconds()
        return 0


class FinancialDataCacheManager:
    """财务数据缓存管理器"""
    
    def __init__(self, cache_ttl_seconds=1800, max_cache_size=100):
        self.cache_ttl = cache_ttl_seconds
        self.max_cache_size = max_cache_size
        self.tasks: Dict[tuple, DataTask] = {}
        self.lock = threading.Lock()
        self.stats = {
            "cache_hits": 0,
            "cache_misses": 0,
            "background_completions": 0,
            "total_requests": 0
        }
    
    def get_or_load_data(self, company: str, data_type: str, loader_func: Callable) -> Any:
        """获取或加载财务数据 (同步版本,返回最终结果)"""
        with self.lock:
            self.stats["total_requests"] += 1
            key = (company, data_type)
            
            # 清理过期缓存
            self._cleanup_expired_cache()
            
            task = self.tasks.get(key)
            
            # 场景1: 缓存已存在且已完成
            if task:
                if task.status == "completed":
                    if task.get_age_seconds() < self.cache_ttl:
                        self.stats["cache_hits"] += 1
                        print(f"✅ [Data Cache HIT] {company} - {data_type} (age: {task.get_age_seconds():.1f}s)")
                        return task.result
                    else:
                        print(f"⏰ [Data Cache EXPIRED] {company} - {data_type}")
                        del self.tasks[key]
                        task = None
                
                # 场景2: 后台任务正在运行
                elif task.status == "running":
                    self.stats["cache_hits"] += 1
                    print(f"🔄 [Data Cache WAIT] {company} - {data_type} (running for {task.get_age_seconds():.1f}s)")
                    
                    # 等待后台任务完成
                    max_wait = 30
                    waited = 0
                    while task.status == "running" and waited < max_wait:
                        time.sleep(0.5)
                        waited += 0.5
                    
                    if task.status == "completed":
                        self.stats["background_completions"] += 1
                        print(f"✅ [Data Background COMPLETED] {company} - {data_type}")
                        return task.result
                    elif task.status == "error":
                        print(f"❌ [Data Background ERROR] {company} - {data_type}: {task.error}")
                        raise Exception(f"Data loading failed: {task.error}")
                
                # 场景3: 之前失败了,重试
                elif task.status == "error":
                    print(f"🔄 [Data Retry after ERROR] {company} - {data_type}")
                    del self.tasks[key]
                    task = None
            
            # 场景4: 缓存不存在,启动新任务
            if not task:
                self.stats["cache_misses"] += 1
                print(f"🆕 [Data Cache MISS] {company} - {data_type} - Starting background loading")
                
                task = DataTask(company, data_type, loader_func)
                self.tasks[key] = task
                task.thread = threading.Thread(target=task.run, daemon=True)
                task.thread.start()
                
                # 等待任务完成
                max_wait = 30
                waited = 0
                while task.status == "running" and waited < max_wait:
                    time.sleep(0.5)
                    waited += 0.5
                
                if task.status == "completed":
                    print(f"✅ [Data NEW COMPLETED] {company} - {data_type}")
                    return task.result
                elif task.status == "error":
                    print(f"❌ [Data NEW ERROR] {company} - {data_type}: {task.error}")
                    raise Exception(f"Data loading failed: {task.error}")
                else:
                    print(f"⏱️ [Data TIMEOUT] {company} - {data_type}")
                    raise Exception(f"Data loading timeout after {max_wait}s")
    
    def _cleanup_expired_cache(self):
        """清理过期缓存"""
        keys_to_remove = []
        for key, task in self.tasks.items():
            if task.status == "completed" and task.get_age_seconds() > self.cache_ttl:
                keys_to_remove.append(key)
        
        for key in keys_to_remove:
            company, data_type = key
            print(f"🗑️ [Data Cache CLEANUP] {company} - {data_type}")
            del self.tasks[key]
        
        # 限制缓存大小
        if len(self.tasks) > self.max_cache_size:
            completed_tasks = [(k, v) for k, v in self.tasks.items() if v.status == "completed"]
            completed_tasks.sort(key=lambda x: x[1].end_time or datetime.min)
            to_remove = len(self.tasks) - self.max_cache_size
            for i in range(to_remove):
                key, task = completed_tasks[i]
                company, data_type = key
                print(f"🗑️ [Data Cache SIZE LIMIT] {company} - {data_type}")
                del self.tasks[key]
    
    def get_stats(self):
        """获取缓存统计"""
        with self.lock:
            total = self.stats["total_requests"]
            hits = self.stats["cache_hits"]
            misses = self.stats["cache_misses"]
            hit_rate = (hits / total * 100) if total > 0 else 0
            
            return {
                **self.stats,
                "hit_rate": f"{hit_rate:.1f}%",
                "active_tasks": len([t for t in self.tasks.values() if t.status == "running"]),
                "cached_data": len([t for t in self.tasks.values() if t.status == "completed"])
            }