Spaces:
Running
Running
| import logging | |
| import time | |
| from abc import ABC, abstractmethod | |
| from datetime import date, datetime | |
| from typing import List, Dict, Any | |
| from news_scraper.helpers.news_db_logger import NewsDBLogger | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| class CalendarHandler(ABC): | |
| """Base handler class for the Chain of Responsibility pattern.""" | |
| def __init__(self, next_handler=None): | |
| self.next_handler = next_handler | |
| def process(self, event: Dict[str, Any]): | |
| pass | |
| def set_next(self, handler): | |
| self.next_handler = handler | |
| return handler | |
| class EconomicEventsHandler(CalendarHandler): | |
| """ | |
| Handler 1: Retrieve/Process all calendar events with event_type economic_events and ticker UNITED_STATES. | |
| """ | |
| def process(self, event: Dict[str, Any]): | |
| event_type = event.get('data_type') or event.get('event_type') | |
| ticker = event.get('ticker') | |
| if event_type == 'economic_events' and ticker == 'UNITED_STATES': | |
| title = event.get('data', {}).get('event', 'Unknown Event') | |
| actual = event.get('data', {}).get('actual') | |
| forecast = event.get('data', {}).get('forecast') | |
| logger.info(f"[CALENDAR] 🇺🇸 Economic Event: {title} | Actual: {actual} | Forecast: {forecast}") | |
| # Economic events don't go through ticker validation/fundamentals | |
| return event | |
| if self.next_handler: | |
| return self.next_handler.process(event) | |
| return event | |
| class TickerEventHandler(CalendarHandler): | |
| """ | |
| Handler 2: Processes a single calendar event regarding a ticker. | |
| Basic formatting and logging. | |
| """ | |
| def process(self, event: Dict[str, Any]): | |
| ticker = event.get('ticker') | |
| event_type = event.get('data_type') or event.get('event_type') | |
| if not ticker or ticker == 'UNITED_STATES': | |
| # Should have been caught by previous handler or is invalid | |
| if self.next_handler: | |
| return self.next_handler.process(event) | |
| return event | |
| logger.info(f"[CALENDAR] Processing {event_type} for {ticker}") | |
| if self.next_handler: | |
| return self.next_handler.process(event) | |
| return event | |
| class AvailabilityHandler(CalendarHandler): | |
| """ | |
| Handler 3: Checks if the ticker is available to trade. | |
| """ | |
| def __init__(self, db_logger, next_handler=None): | |
| super().__init__(next_handler) | |
| self.db_logger = db_logger | |
| def process(self, event: Dict[str, Any]): | |
| ticker = event.get('ticker') | |
| if not ticker: | |
| return event | |
| if not self.db_logger.db.is_ticker_available(ticker): | |
| logger.info(f"[CALENDAR] [SKIP] Ticker {ticker} not in available_tickers") | |
| return event | |
| logger.info(f"[CALENDAR] [PASS] Ticker {ticker} is available") | |
| if self.next_handler: | |
| return self.next_handler.process(event) | |
| return event | |
| class FundamentalSignalHandler(CalendarHandler): | |
| """ | |
| Handler 4: Retrieves the ticker's fundamentals to see it is worth buying or selling. | |
| Generates signal if recommendation is not neutral/hold. | |
| """ | |
| def __init__(self, db_logger, fundamental_service, next_handler=None): | |
| super().__init__(next_handler) | |
| self.db_logger = db_logger | |
| self.fundamental_service = fundamental_service | |
| def process(self, event: Dict[str, Any]): | |
| ticker = event.get('ticker') | |
| event_type = event.get('data_type') or event.get('event_type') | |
| # Query Fundamental Analysis | |
| entries = self.db_logger.db.query( | |
| data_type='fundamental_analysis', | |
| ticker=ticker, | |
| limit=1 | |
| ) | |
| fundamental_entry = entries[0] if entries else None | |
| if not fundamental_entry: | |
| logger.info(f"[CALENDAR] [SKIP] No fundamental analysis found for {ticker}. Requesting analysis...") | |
| self.fundamental_service.request_analysis( | |
| ticker, | |
| lambda t, r: self._generate_signal(t, r, None, event) | |
| ) | |
| return event | |
| self._generate_signal(ticker, fundamental_entry.data, fundamental_entry.generate_key(), event) | |
| return event | |
| def _generate_signal(self, ticker, fundamental_data, fundamental_key, event): | |
| try: | |
| event_type = event.get('data_type') or event.get('event_type') | |
| recommendation = fundamental_data.get('recommendation', 'UNKNOWN') | |
| logger.info(f"[CALENDAR] Fundamental Rec for {ticker}: {recommendation}") | |
| # If key is missing (e.g. from callback), try to generate or find it | |
| if fundamental_key is None: | |
| entries = self.db_logger.db.query(data_type='fundamental_analysis', ticker=ticker, limit=1) | |
| if entries: | |
| fundamental_key = entries[0].generate_key() | |
| signal_position = None | |
| # Logic: "see it is worth buying or selling based ( so non neutral or hold)" | |
| if recommendation == "BUY": | |
| signal_position = "BUY" | |
| elif recommendation == "SELL": | |
| signal_position = "SELL" | |
| else: | |
| logger.info(f"[CALENDAR] [SKIP] Recommendation is {recommendation} (Neutral/Hold)") | |
| return | |
| # Generate Signal | |
| if signal_position: | |
| calendar_keys = [] | |
| if hasattr(event, 'generate_key'): | |
| calendar_keys.append(event.generate_key()) | |
| elif isinstance(event, dict) and 'entry_key' in event: | |
| calendar_keys.append(event['entry_key']) | |
| # Construct sentiment dictionary | |
| sentiment = { | |
| 'fundamental_sentiment': recommendation, | |
| 'calendar_sentiment': 'POSITIVE' | |
| } | |
| success = self.db_logger.db.save_signal( | |
| ticker=ticker, | |
| calendar_event_keys=calendar_keys, | |
| news_keys=[], | |
| fundamental_key=fundamental_key, | |
| signal_position=signal_position, | |
| sentiment=sentiment | |
| ) | |
| if success: | |
| logger.info(f"✅ [CALENDAR] Saved {signal_position} signal for {ticker} based on {event_type}") | |
| else: | |
| logger.error(f"❌ [CALENDAR] Failed to save signal for {ticker}") | |
| except Exception as e: | |
| logger.error(f"❌ [CALENDAR] Error in _generate_signal for {ticker}: {str(e)}") | |
| class CalendarProcessor: | |
| def __init__(self): | |
| self.db_logger = NewsDBLogger() | |
| from news_scraper.services.fundamental_analysis_service import FundamentalAnalysisService | |
| self.fundamental_service = FundamentalAnalysisService() | |
| self.fundamental_service.start() | |
| # Initialize Chain | |
| self.handler_chain = EconomicEventsHandler() | |
| ticker_handler = TickerEventHandler() | |
| availability_handler = AvailabilityHandler(db_logger=self.db_logger) | |
| signal_handler = FundamentalSignalHandler(db_logger=self.db_logger, fundamental_service=self.fundamental_service) | |
| self.handler_chain.set_next(ticker_handler) | |
| ticker_handler.set_next(availability_handler) | |
| availability_handler.set_next(signal_handler) | |
| def run_daily_scan(self): | |
| """ | |
| Collects all calendar events for today and processes them. | |
| """ | |
| today_str = date.today().isoformat() | |
| logger.info(f"📅 [CALENDAR PROCESSOR] Starting daily scan for {today_str}") | |
| # Query all calendar events for today | |
| event_types = ['earnings', 'ipo', 'stock_split', 'dividends', 'economic_events'] | |
| all_events = [] | |
| for et in event_types: | |
| events = self.db_logger.db.query( | |
| date_from=today_str, | |
| date_to=today_str, | |
| data_type=et | |
| ) | |
| all_events.extend(events) | |
| logger.info(f"📅 [CALENDAR PROCESSOR] Found {len(all_events)} events for today") | |
| for event_obj in all_events: | |
| event_dict = event_obj.to_dict() | |
| # Ensure entry_key is available for linking | |
| event_dict['entry_key'] = event_obj.generate_key() | |
| try: | |
| self.handler_chain.process(event_dict) | |
| except Exception as e: | |
| logger.error(f"❌ [CALENDAR] Error processing event: {e}") | |
| if __name__ == "__main__": | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| processor = CalendarProcessor() | |
| processor.run_daily_scan() | |