gotti_signal_gen / src /news_scraper /services /fundamental_analysis_service.py
Papaflessas's picture
Deploy Signal Generator app
3fe0726
import threading
import queue
import logging
from datetime import date
from news_scraper.helpers.news_db_logger import NewsDBLogger
from db.local_database import DatabaseEntry, DataType
from fundamental_analysis.decision_maker import evaluate_stock
logger = logging.getLogger(__name__)
class FundamentalAnalysisService:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(FundamentalAnalysisService, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
self.initialized = True
self.queue = queue.Queue()
self.db_logger = NewsDBLogger()
self.is_running = False
self.worker_thread = None
self.callbacks = {} # ticker -> list of callbacks
self.processing_tickers = set()
def start(self):
if self.is_running:
return
self.is_running = True
self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self.worker_thread.start()
logger.info("🚀 [FUNDAMENTAL SERVICE] Started")
def stop(self):
self.is_running = False
if self.worker_thread:
self.worker_thread.join()
def request_analysis(self, ticker, callback=None):
"""
Request analysis for a ticker.
callback: function(ticker, analysis_result)
"""
with self._lock:
if callback:
if ticker not in self.callbacks:
self.callbacks[ticker] = []
self.callbacks[ticker].append(callback)
# If already processing or queued, just add callback (done above) and return
if ticker in self.processing_tickers:
return
self.processing_tickers.add(ticker)
self.queue.put(ticker)
logger.info(f"📥 [FUNDAMENTAL SERVICE] Queued analysis for {ticker}")
def _process_queue(self):
while self.is_running:
try:
ticker = self.queue.get(timeout=1)
except queue.Empty:
continue
try:
logger.info(f"⚙️ [FUNDAMENTAL SERVICE] Analyzing {ticker}...")
# Run analysis
# evaluate_stock returns a dict
# We disable sector comparison for speed if needed, but user probably wants it.
# Let's keep it enabled as default.
result = evaluate_stock(ticker, compare_to_sector=True)
# Save to DB
entry = DatabaseEntry(
date=date.today().isoformat(),
data_type=DataType.FUNDAMENTAL.value,
ticker=ticker,
data=result
)
self.db_logger.db.save(entry, expiry_days=30)
logger.info(f"✅ [FUNDAMENTAL SERVICE] Saved analysis for {ticker}")
# Notify callbacks
with self._lock:
if ticker in self.callbacks:
callbacks = self.callbacks.pop(ticker)
for cb in callbacks:
try:
cb(ticker, result)
except Exception as e:
logger.error(f"❌ [FUNDAMENTAL SERVICE] Callback error: {e}")
self.processing_tickers.discard(ticker)
except Exception as e:
logger.error(f"❌ [FUNDAMENTAL SERVICE] Error analyzing {ticker}: {e}")
with self._lock:
self.processing_tickers.discard(ticker)
finally:
self.queue.task_done()