Dmitry Beresnev commited on
Commit
9a265da
·
1 Parent(s): 5260ec0

add cache for the downloaded data

Browse files
src/core/ticker_scanner/__init__.py CHANGED
@@ -1,6 +1,7 @@
1
  """
2
  Ticker Scanner Module
3
  Monitors and analyzes stock tickers for growth potential
 
4
  """
5
 
6
  from src.core.ticker_scanner.ticker_analyzer import TickerAnalyzer
@@ -8,6 +9,10 @@ from src.core.ticker_scanner.scheduler import Scheduler
8
  from src.core.ticker_scanner.growth_speed_analyzer import GrowthSpeedAnalyzer
9
  from src.core.ticker_scanner.core_enums import StockExchange, GrowthCategory
10
  from src.core.ticker_scanner.growth_metrics import GrowthSpeedMetrics
 
 
 
 
11
 
12
  __all__ = [
13
  'TickerAnalyzer',
@@ -16,4 +21,6 @@ __all__ = [
16
  'StockExchange',
17
  'GrowthCategory',
18
  'GrowthSpeedMetrics',
 
 
19
  ]
 
1
  """
2
  Ticker Scanner Module
3
  Monitors and analyzes stock tickers for growth potential
4
+ Includes in-memory caching with 2-hour expiry
5
  """
6
 
7
  from src.core.ticker_scanner.ticker_analyzer import TickerAnalyzer
 
9
  from src.core.ticker_scanner.growth_speed_analyzer import GrowthSpeedAnalyzer
10
  from src.core.ticker_scanner.core_enums import StockExchange, GrowthCategory
11
  from src.core.ticker_scanner.growth_metrics import GrowthSpeedMetrics
12
+ from src.core.ticker_scanner.parallel_data_downloader import (
13
+ clear_cache,
14
+ get_cache_stats
15
+ )
16
 
17
  __all__ = [
18
  'TickerAnalyzer',
 
21
  'StockExchange',
22
  'GrowthCategory',
23
  'GrowthSpeedMetrics',
24
+ 'clear_cache',
25
+ 'get_cache_stats',
26
  ]
src/core/ticker_scanner/parallel_data_downloader.py CHANGED
@@ -9,7 +9,6 @@ import time
9
  import random
10
  from itertools import islice
11
  from typing import Any, Optional
12
- from datetime import datetime, timedelta
13
  from concurrent.futures import ProcessPoolExecutor, as_completed
14
 
15
  import yfinance as yf
@@ -17,6 +16,7 @@ import yfinance as yf
17
  from src.core.ticker_scanner.core_enums import StockExchange
18
  from src.core.ticker_scanner.tickers_provider import TickersProvider
19
  from src.telegram_bot.logger import main_logger as logger
 
20
 
21
 
22
  MAX_WORKERS = 8 # Number of parallel processes
@@ -24,75 +24,35 @@ MAX_RETRIES = 3 # Retry count on failure
24
  SLEEP_BETWEEN_RETRIES = 1.0 # Seconds between retries
25
  BATCH_SIZE = 50 # Number of tickers per batch
26
  MIN_DATA_POINTS = 50 # Minimum number of price points required
27
- CACHE_EXPIRY_HOURS = 2 # Cache expiry time in hours
28
 
29
- # In-memory cache for ticker data
30
- _ticker_cache: dict[str, dict[str, Any]] = {}
31
- _cache_timestamps: dict[str, datetime] = {}
32
 
33
-
34
- def _is_cache_valid(ticker: str) -> bool:
35
- """Check if cached data for ticker is still valid (not expired)"""
36
- if ticker not in _cache_timestamps:
37
- return False
38
-
39
- cache_age = datetime.now() - _cache_timestamps[ticker]
40
- return cache_age < timedelta(hours=CACHE_EXPIRY_HOURS)
41
-
42
-
43
- def _get_cached_data(ticker: str) -> Optional[dict[str, Any]]:
44
- """Get cached data if valid, None otherwise"""
45
- if _is_cache_valid(ticker):
46
- logger.debug(f"Using cached data for {ticker}")
47
- return _ticker_cache.get(ticker)
48
- return None
49
-
50
-
51
- def _cache_data(ticker: str, data: dict[str, Any]) -> None:
52
- """Cache ticker data with current timestamp"""
53
- _ticker_cache[ticker] = data
54
- _cache_timestamps[ticker] = datetime.now()
55
- logger.debug(f"Cached data for {ticker}")
56
 
57
 
58
  def clear_cache() -> None:
59
  """Clear all cached data (useful for testing or manual refresh)"""
60
- global _ticker_cache, _cache_timestamps
61
- _ticker_cache.clear()
62
- _cache_timestamps.clear()
63
- logger.info("Cache cleared")
64
 
65
 
66
  def get_cache_stats() -> dict[str, Any]:
67
  """Get cache statistics"""
68
- valid_count = sum(1 for ticker in _ticker_cache.keys() if _is_cache_valid(ticker))
69
- return {
70
- 'total_cached': len(_ticker_cache),
71
- 'valid_cached': valid_count,
72
- 'expired_cached': len(_ticker_cache) - valid_count
73
- }
74
 
75
 
76
- def fetch_prices(ticker: str, max_retries: int = MAX_RETRIES, use_cache: bool = True) -> dict[str, Any]:
77
  """
78
  Download all-time closing prices for a single ticker safely.
79
- Uses in-memory cache if available and not expired.
80
 
81
  Args:
82
  ticker: Stock ticker symbol
83
  max_retries: Maximum number of retry attempts
84
- use_cache: Whether to use cached data if available
85
 
86
  Returns:
87
  dict {'ticker': ticker, 'prices': ndarray, 'dates': DatetimeIndex} or None if failed
88
  """
89
- # Check cache first
90
- if use_cache:
91
- cached_data = _get_cached_data(ticker)
92
- if cached_data is not None:
93
- return cached_data
94
-
95
- # Download fresh data
96
  for attempt in range(max_retries):
97
  try:
98
  df = yf.download(ticker, period="max", progress=False, auto_adjust=True)
@@ -123,18 +83,12 @@ def fetch_prices(ticker: str, max_retries: int = MAX_RETRIES, use_cache: bool =
123
  if prices.ndim > 1:
124
  prices = prices.flatten()
125
 
126
- result = {
127
  "ticker": ticker,
128
  "prices": prices,
129
  "dates": dates
130
  }
131
 
132
- # Cache the result
133
- if use_cache:
134
- _cache_data(ticker, result)
135
-
136
- return result
137
-
138
  except yf.shared.YFRateLimitError:
139
  wait = SLEEP_BETWEEN_RETRIES + random.random()
140
  logger.warning(f"Rate limited for {ticker}. Waiting {wait:.1f}s and retrying...")
@@ -155,7 +109,8 @@ def batch(iterable: list[str], n: int = BATCH_SIZE):
155
  break
156
  yield chunk
157
 
158
- def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS,
 
159
  use_cache: bool = True) -> list[dict[str, Any]]:
160
  """
161
  Download a large list of tickers in parallel batches.
@@ -163,6 +118,7 @@ def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS
163
 
164
  Args:
165
  tickers: List of ticker symbols to download
 
166
  max_workers: Number of parallel workers
167
  use_cache: Whether to use cached data
168
 
@@ -175,7 +131,7 @@ def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS
175
 
176
  if use_cache:
177
  for ticker in tickers:
178
- cached_data = _get_cached_data(ticker)
179
  if cached_data:
180
  cached_results.append(cached_data)
181
  else:
@@ -194,7 +150,7 @@ def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS
194
  logger.info(f"Downloading {len(tickers_to_download)} tickers...")
195
  for batch_num, ticker_batch in enumerate(batch(tickers_to_download, BATCH_SIZE), start=1):
196
  logger.info(f"Processing batch {batch_num}: {len(ticker_batch)} tickers")
197
- results, failed = process_batch(ticker_batch, max_workers)
198
  all_results.extend(results)
199
  all_failed.extend(failed)
200
  # small sleep between batches to reduce rate-limit chance
@@ -206,20 +162,30 @@ def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS
206
 
207
  return all_results
208
 
209
- def process_batch(ticker_batch: list[str], max_workers: int) -> tuple[list[dict[str, Any]], list[Any]]:
210
  """
211
  Process a batch of tickers in parallel using multiprocessing.
212
  Returns tuple (successful_results, failed_tickers)
 
 
 
 
 
 
 
213
  """
214
  results = []
215
  failed = []
216
  with ProcessPoolExecutor(max_workers=max_workers) as executor:
217
- futures = {executor.submit(fetch_prices, t): t for t in ticker_batch}
 
218
  for future in as_completed(futures):
219
  ticker = futures[future]
220
  try:
221
  res = future.result()
222
  if res:
 
 
223
  results.append(res)
224
  else:
225
  failed.append(ticker)
@@ -249,7 +215,7 @@ def run_parallel_data_downloader(exchange: StockExchange = StockExchange.NASDAQ,
249
  logger.info(f"Cache stats: {cache_stats['valid_cached']} valid, {cache_stats['expired_cached']} expired")
250
 
251
  logger.info(f"Starting download for {len(tickers)} tickers from {exchange.value}...")
252
- data = download_tickers_parallel(tickers, use_cache=use_cache)
253
  logger.info(f"Retrieved {len(data)} tickers successfully")
254
  return data
255
 
 
9
  import random
10
  from itertools import islice
11
  from typing import Any, Optional
 
12
  from concurrent.futures import ProcessPoolExecutor, as_completed
13
 
14
  import yfinance as yf
 
16
  from src.core.ticker_scanner.core_enums import StockExchange
17
  from src.core.ticker_scanner.tickers_provider import TickersProvider
18
  from src.telegram_bot.logger import main_logger as logger
19
+ from src.core.ticker_scanner.ticker_cache import TickerCache
20
 
21
 
22
  MAX_WORKERS = 8 # Number of parallel processes
 
24
  SLEEP_BETWEEN_RETRIES = 1.0 # Seconds between retries
25
  BATCH_SIZE = 50 # Number of tickers per batch
26
  MIN_DATA_POINTS = 50 # Minimum number of price points required
 
27
 
 
 
 
28
 
29
+ # Global cache instance
30
+ _cache = TickerCache()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
 
33
  def clear_cache() -> None:
34
  """Clear all cached data (useful for testing or manual refresh)"""
35
+ _cache.clear()
 
 
 
36
 
37
 
38
  def get_cache_stats() -> dict[str, Any]:
39
  """Get cache statistics"""
40
+ return _cache.get_stats()
 
 
 
 
 
41
 
42
 
43
+ def fetch_prices(ticker: str, max_retries: int = MAX_RETRIES, use_cache: bool = False) -> Optional[dict[str, Any]]:
44
  """
45
  Download all-time closing prices for a single ticker safely.
 
46
 
47
  Args:
48
  ticker: Stock ticker symbol
49
  max_retries: Maximum number of retry attempts
50
+ use_cache: Whether to use cached data (NOTE: typically False when called from subprocess)
51
 
52
  Returns:
53
  dict {'ticker': ticker, 'prices': ndarray, 'dates': DatetimeIndex} or None if failed
54
  """
55
+ # Download fresh data (cache is handled in main process)
 
 
 
 
 
 
56
  for attempt in range(max_retries):
57
  try:
58
  df = yf.download(ticker, period="max", progress=False, auto_adjust=True)
 
83
  if prices.ndim > 1:
84
  prices = prices.flatten()
85
 
86
+ return {
87
  "ticker": ticker,
88
  "prices": prices,
89
  "dates": dates
90
  }
91
 
 
 
 
 
 
 
92
  except yf.shared.YFRateLimitError:
93
  wait = SLEEP_BETWEEN_RETRIES + random.random()
94
  logger.warning(f"Rate limited for {ticker}. Waiting {wait:.1f}s and retrying...")
 
109
  break
110
  yield chunk
111
 
112
+ def download_tickers_parallel(tickers: list[str], exchange: str,
113
+ max_workers: int = MAX_WORKERS,
114
  use_cache: bool = True) -> list[dict[str, Any]]:
115
  """
116
  Download a large list of tickers in parallel batches.
 
118
 
119
  Args:
120
  tickers: List of ticker symbols to download
121
+ exchange: Exchange name (e.g., "NASDAQ", "NYSE")
122
  max_workers: Number of parallel workers
123
  use_cache: Whether to use cached data
124
 
 
131
 
132
  if use_cache:
133
  for ticker in tickers:
134
+ cached_data = _cache.get(exchange, ticker)
135
  if cached_data:
136
  cached_results.append(cached_data)
137
  else:
 
150
  logger.info(f"Downloading {len(tickers_to_download)} tickers...")
151
  for batch_num, ticker_batch in enumerate(batch(tickers_to_download, BATCH_SIZE), start=1):
152
  logger.info(f"Processing batch {batch_num}: {len(ticker_batch)} tickers")
153
+ results, failed = process_batch(ticker_batch, exchange, max_workers)
154
  all_results.extend(results)
155
  all_failed.extend(failed)
156
  # small sleep between batches to reduce rate-limit chance
 
162
 
163
  return all_results
164
 
165
+ def process_batch(ticker_batch: list[str], exchange: str, max_workers: int) -> tuple[list[dict[str, Any]], list[Any]]:
166
  """
167
  Process a batch of tickers in parallel using multiprocessing.
168
  Returns tuple (successful_results, failed_tickers)
169
+
170
+ Args:
171
+ ticker_batch: List of ticker symbols to process
172
+ exchange: Exchange name for cache key
173
+ max_workers: Number of parallel workers
174
+
175
+ Note: Downloads always fetch fresh data (cache checked before this step)
176
  """
177
  results = []
178
  failed = []
179
  with ProcessPoolExecutor(max_workers=max_workers) as executor:
180
+ # Don't use cache in subprocess - already handled in main process
181
+ futures = {executor.submit(fetch_prices, t, use_cache=False): t for t in ticker_batch}
182
  for future in as_completed(futures):
183
  ticker = futures[future]
184
  try:
185
  res = future.result()
186
  if res:
187
+ # Cache the result in the main process after download
188
+ _cache.set(exchange, res['ticker'], res)
189
  results.append(res)
190
  else:
191
  failed.append(ticker)
 
215
  logger.info(f"Cache stats: {cache_stats['valid_cached']} valid, {cache_stats['expired_cached']} expired")
216
 
217
  logger.info(f"Starting download for {len(tickers)} tickers from {exchange.value}...")
218
+ data = download_tickers_parallel(tickers, exchange.value, use_cache=use_cache)
219
  logger.info(f"Retrieved {len(data)} tickers successfully")
220
  return data
221
 
src/core/ticker_scanner/ticker_cache.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, Optional
2
+ from datetime import datetime, timedelta
3
+
4
+ from src.telegram_bot.logger import main_logger as logger
5
+
6
+
7
+ CACHE_EXPIRY_HOURS = 2 # Cache expiry time in hours
8
+
9
+
10
+ class TickerCache:
11
+ """
12
+ In-memory cache for ticker data with automatic expiry.
13
+ Uses exchange:ticker as key to support multiple exchanges.
14
+ """
15
+
16
+ def __init__(self, expiry_hours: int = CACHE_EXPIRY_HOURS):
17
+ self._cache: dict[str, dict[str, Any]] = {}
18
+ self._timestamps: dict[str, datetime] = {}
19
+ self._expiry_hours = expiry_hours
20
+
21
+ def _make_key(self, exchange: str, ticker: str) -> str:
22
+ """Create cache key from exchange and ticker"""
23
+ return f"{exchange}:{ticker}"
24
+
25
+ def is_valid(self, exchange: str, ticker: str) -> bool:
26
+ """Check if cached data is still valid (not expired)"""
27
+ key = self._make_key(exchange, ticker)
28
+ if key not in self._timestamps:
29
+ return False
30
+
31
+ cache_age = datetime.now() - self._timestamps[key]
32
+ return cache_age < timedelta(hours=self._expiry_hours)
33
+
34
+ def get(self, exchange: str, ticker: str) -> Optional[dict[str, Any]]:
35
+ """Get cached data if valid, None otherwise"""
36
+ if self.is_valid(exchange, ticker):
37
+ key = self._make_key(exchange, ticker)
38
+ logger.debug(f"Using cached data for {key}")
39
+ return self._cache.get(key)
40
+ return None
41
+
42
+ def set(self, exchange: str, ticker: str, data: dict[str, Any]) -> None:
43
+ """Cache ticker data with timestamp"""
44
+ key = self._make_key(exchange, ticker)
45
+ self._cache[key] = data
46
+ self._timestamps[key] = datetime.now()
47
+ logger.debug(f"Cached data for {key}")
48
+
49
+ def clear(self) -> None:
50
+ """Clear all cached data"""
51
+ self._cache.clear()
52
+ self._timestamps.clear()
53
+ logger.info("Cache cleared")
54
+
55
+ def get_stats(self) -> dict[str, Any]:
56
+ """Get cache statistics"""
57
+ valid_count = 0
58
+ for key in self._cache.keys():
59
+ # Parse key to get exchange and ticker
60
+ parts = key.split(':', 1)
61
+ if len(parts) == 2:
62
+ exchange, ticker = parts
63
+ if self.is_valid(exchange, ticker):
64
+ valid_count += 1
65
+
66
+ return {
67
+ 'total_cached': len(self._cache),
68
+ 'valid_cached': valid_count,
69
+ 'expired_cached': len(self._cache) - valid_count
70
+ }