Spaces:
Running
Running
| import threading | |
| import queue | |
| import logging | |
| from datetime import date | |
| from news_scraper.helpers.news_db_logger import NewsDBLogger | |
| from db.local_database import DatabaseEntry, DataType | |
| from fundamental_analysis.decision_maker import evaluate_stock | |
| logger = logging.getLogger(__name__) | |
| class FundamentalAnalysisService: | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super(FundamentalAnalysisService, cls).__new__(cls) | |
| cls._instance.initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self.initialized: | |
| return | |
| self.initialized = True | |
| self.queue = queue.Queue() | |
| self.db_logger = NewsDBLogger() | |
| self.is_running = False | |
| self.worker_thread = None | |
| self.callbacks = {} # ticker -> list of callbacks | |
| self.processing_tickers = set() | |
| def start(self): | |
| if self.is_running: | |
| return | |
| self.is_running = True | |
| self.worker_thread = threading.Thread(target=self._process_queue, daemon=True) | |
| self.worker_thread.start() | |
| logger.info("🚀 [FUNDAMENTAL SERVICE] Started") | |
| def stop(self): | |
| self.is_running = False | |
| if self.worker_thread: | |
| self.worker_thread.join() | |
| def request_analysis(self, ticker, callback=None): | |
| """ | |
| Request analysis for a ticker. | |
| callback: function(ticker, analysis_result) | |
| """ | |
| with self._lock: | |
| if callback: | |
| if ticker not in self.callbacks: | |
| self.callbacks[ticker] = [] | |
| self.callbacks[ticker].append(callback) | |
| # If already processing or queued, just add callback (done above) and return | |
| if ticker in self.processing_tickers: | |
| return | |
| self.processing_tickers.add(ticker) | |
| self.queue.put(ticker) | |
| logger.info(f"📥 [FUNDAMENTAL SERVICE] Queued analysis for {ticker}") | |
| def _process_queue(self): | |
| while self.is_running: | |
| try: | |
| ticker = self.queue.get(timeout=1) | |
| except queue.Empty: | |
| continue | |
| try: | |
| logger.info(f"⚙️ [FUNDAMENTAL SERVICE] Analyzing {ticker}...") | |
| # Run analysis | |
| # evaluate_stock returns a dict | |
| # We disable sector comparison for speed if needed, but user probably wants it. | |
| # Let's keep it enabled as default. | |
| result = evaluate_stock(ticker, compare_to_sector=True) | |
| # Save to DB | |
| entry = DatabaseEntry( | |
| date=date.today().isoformat(), | |
| data_type=DataType.FUNDAMENTAL.value, | |
| ticker=ticker, | |
| data=result | |
| ) | |
| self.db_logger.db.save(entry, expiry_days=30) | |
| logger.info(f"✅ [FUNDAMENTAL SERVICE] Saved analysis for {ticker}") | |
| # Notify callbacks | |
| with self._lock: | |
| if ticker in self.callbacks: | |
| callbacks = self.callbacks.pop(ticker) | |
| for cb in callbacks: | |
| try: | |
| cb(ticker, result) | |
| except Exception as e: | |
| logger.error(f"❌ [FUNDAMENTAL SERVICE] Callback error: {e}") | |
| self.processing_tickers.discard(ticker) | |
| except Exception as e: | |
| logger.error(f"❌ [FUNDAMENTAL SERVICE] Error analyzing {ticker}: {e}") | |
| with self._lock: | |
| self.processing_tickers.discard(ticker) | |
| finally: | |
| self.queue.task_done() | |