gotti_signal_gen / src /news_scraper /services /calendar_processor.py
Papaflessas's picture
Deploy Signal Generator app
3fe0726
import logging
import time
from abc import ABC, abstractmethod
from datetime import date, datetime
from typing import List, Dict, Any
from news_scraper.helpers.news_db_logger import NewsDBLogger
# Configure logger
logger = logging.getLogger(__name__)
class CalendarHandler(ABC):
"""Base handler class for the Chain of Responsibility pattern."""
def __init__(self, next_handler=None):
self.next_handler = next_handler
@abstractmethod
def process(self, event: Dict[str, Any]):
pass
def set_next(self, handler):
self.next_handler = handler
return handler
class EconomicEventsHandler(CalendarHandler):
"""
Handler 1: Retrieve/Process all calendar events with event_type economic_events and ticker UNITED_STATES.
"""
def process(self, event: Dict[str, Any]):
event_type = event.get('data_type') or event.get('event_type')
ticker = event.get('ticker')
if event_type == 'economic_events' and ticker == 'UNITED_STATES':
title = event.get('data', {}).get('event', 'Unknown Event')
actual = event.get('data', {}).get('actual')
forecast = event.get('data', {}).get('forecast')
logger.info(f"[CALENDAR] 🇺🇸 Economic Event: {title} | Actual: {actual} | Forecast: {forecast}")
# Economic events don't go through ticker validation/fundamentals
return event
if self.next_handler:
return self.next_handler.process(event)
return event
class TickerEventHandler(CalendarHandler):
"""
Handler 2: Processes a single calendar event regarding a ticker.
Basic formatting and logging.
"""
def process(self, event: Dict[str, Any]):
ticker = event.get('ticker')
event_type = event.get('data_type') or event.get('event_type')
if not ticker or ticker == 'UNITED_STATES':
# Should have been caught by previous handler or is invalid
if self.next_handler:
return self.next_handler.process(event)
return event
logger.info(f"[CALENDAR] Processing {event_type} for {ticker}")
if self.next_handler:
return self.next_handler.process(event)
return event
class AvailabilityHandler(CalendarHandler):
"""
Handler 3: Checks if the ticker is available to trade.
"""
def __init__(self, db_logger, next_handler=None):
super().__init__(next_handler)
self.db_logger = db_logger
def process(self, event: Dict[str, Any]):
ticker = event.get('ticker')
if not ticker:
return event
if not self.db_logger.db.is_ticker_available(ticker):
logger.info(f"[CALENDAR] [SKIP] Ticker {ticker} not in available_tickers")
return event
logger.info(f"[CALENDAR] [PASS] Ticker {ticker} is available")
if self.next_handler:
return self.next_handler.process(event)
return event
class FundamentalSignalHandler(CalendarHandler):
"""
Handler 4: Retrieves the ticker's fundamentals to see it is worth buying or selling.
Generates signal if recommendation is not neutral/hold.
"""
def __init__(self, db_logger, fundamental_service, next_handler=None):
super().__init__(next_handler)
self.db_logger = db_logger
self.fundamental_service = fundamental_service
def process(self, event: Dict[str, Any]):
ticker = event.get('ticker')
event_type = event.get('data_type') or event.get('event_type')
# Query Fundamental Analysis
entries = self.db_logger.db.query(
data_type='fundamental_analysis',
ticker=ticker,
limit=1
)
fundamental_entry = entries[0] if entries else None
if not fundamental_entry:
logger.info(f"[CALENDAR] [SKIP] No fundamental analysis found for {ticker}. Requesting analysis...")
self.fundamental_service.request_analysis(
ticker,
lambda t, r: self._generate_signal(t, r, None, event)
)
return event
self._generate_signal(ticker, fundamental_entry.data, fundamental_entry.generate_key(), event)
return event
def _generate_signal(self, ticker, fundamental_data, fundamental_key, event):
try:
event_type = event.get('data_type') or event.get('event_type')
recommendation = fundamental_data.get('recommendation', 'UNKNOWN')
logger.info(f"[CALENDAR] Fundamental Rec for {ticker}: {recommendation}")
# If key is missing (e.g. from callback), try to generate or find it
if fundamental_key is None:
entries = self.db_logger.db.query(data_type='fundamental_analysis', ticker=ticker, limit=1)
if entries:
fundamental_key = entries[0].generate_key()
signal_position = None
# Logic: "see it is worth buying or selling based ( so non neutral or hold)"
if recommendation == "BUY":
signal_position = "BUY"
elif recommendation == "SELL":
signal_position = "SELL"
else:
logger.info(f"[CALENDAR] [SKIP] Recommendation is {recommendation} (Neutral/Hold)")
return
# Generate Signal
if signal_position:
calendar_keys = []
if hasattr(event, 'generate_key'):
calendar_keys.append(event.generate_key())
elif isinstance(event, dict) and 'entry_key' in event:
calendar_keys.append(event['entry_key'])
# Construct sentiment dictionary
sentiment = {
'fundamental_sentiment': recommendation,
'calendar_sentiment': 'POSITIVE'
}
success = self.db_logger.db.save_signal(
ticker=ticker,
calendar_event_keys=calendar_keys,
news_keys=[],
fundamental_key=fundamental_key,
signal_position=signal_position,
sentiment=sentiment
)
if success:
logger.info(f"✅ [CALENDAR] Saved {signal_position} signal for {ticker} based on {event_type}")
else:
logger.error(f"❌ [CALENDAR] Failed to save signal for {ticker}")
except Exception as e:
logger.error(f"❌ [CALENDAR] Error in _generate_signal for {ticker}: {str(e)}")
class CalendarProcessor:
def __init__(self):
self.db_logger = NewsDBLogger()
from news_scraper.services.fundamental_analysis_service import FundamentalAnalysisService
self.fundamental_service = FundamentalAnalysisService()
self.fundamental_service.start()
# Initialize Chain
self.handler_chain = EconomicEventsHandler()
ticker_handler = TickerEventHandler()
availability_handler = AvailabilityHandler(db_logger=self.db_logger)
signal_handler = FundamentalSignalHandler(db_logger=self.db_logger, fundamental_service=self.fundamental_service)
self.handler_chain.set_next(ticker_handler)
ticker_handler.set_next(availability_handler)
availability_handler.set_next(signal_handler)
def run_daily_scan(self):
"""
Collects all calendar events for today and processes them.
"""
today_str = date.today().isoformat()
logger.info(f"📅 [CALENDAR PROCESSOR] Starting daily scan for {today_str}")
# Query all calendar events for today
event_types = ['earnings', 'ipo', 'stock_split', 'dividends', 'economic_events']
all_events = []
for et in event_types:
events = self.db_logger.db.query(
date_from=today_str,
date_to=today_str,
data_type=et
)
all_events.extend(events)
logger.info(f"📅 [CALENDAR PROCESSOR] Found {len(all_events)} events for today")
for event_obj in all_events:
event_dict = event_obj.to_dict()
# Ensure entry_key is available for linking
event_dict['entry_key'] = event_obj.generate_key()
try:
self.handler_chain.process(event_dict)
except Exception as e:
logger.error(f"❌ [CALENDAR] Error processing event: {e}")
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
processor = CalendarProcessor()
processor.run_daily_scan()