""" Database Logger for News Items Replaces ExcelLogger - logs news with sentiment directly to MySQL news table """ import sys import logging import threading from pathlib import Path from datetime import datetime from typing import Optional, List, Tuple # Add src to path for imports sys.path.append(str(Path(__file__).parent.parent.parent)) from db.local_database import LocalDatabase, DatabaseEntry, DataType logger = logging.getLogger(__name__) class NewsDBLogger: """ Logs news items with sentiment analysis directly to the MySQL news table. Replaces the old ExcelLogger functionality. """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super(NewsDBLogger, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): """Initialize the database logger.""" if self.initialized: return self.initialized = True self.db = LocalDatabase() logger.info("✅ NewsDBLogger initialized with MySQL backend") def log_news_with_sentiment(self, news_item, pre_sentiment=None, sentiment=None, rating=None, processing_time=None): """ Log a news item and its sentiment analysis to the database. Args: news_item: The news item object containing news details pre_sentiment (str, optional): Pre-processed sentiment analysis text sentiment (str, optional): Processed sentiment analysis text rating (str or float, optional): Sentiment score/rating processing_time (float, optional): Time taken to process this news item in seconds """ try: # Extract symbols/ticker ticker = "GENERAL" # Default ticker symbols_str = "" if hasattr(news_item, 'symbols') and news_item.symbols: symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols] symbols_str = ', '.join(symbols_list) ticker = symbols_list[0] if symbols_list else "GENERAL" elif isinstance(news_item, dict) and 'symbols' in news_item: symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']] symbols_str = ', '.join(symbols_list) ticker = symbols_list[0] if symbols_list else "GENERAL" # Get date news_date = datetime.now().strftime("%Y-%m-%d") if hasattr(news_item, 'created_at'): try: news_date = str(news_item.created_at).split('T')[0] except: pass elif isinstance(news_item, dict) and 'created_at' in news_item: try: news_date = str(news_item['created_at']).split('T')[0] except: pass # Build the data payload matching Excel format data = { 'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None), 'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None), 'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None), 'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None), 'Symbols': symbols_str, 'PreSentimentScore': pre_sentiment, 'SentimentScore': rating, 'SentimentAnalysis': sentiment, 'TimeToProcess': processing_time } # Create database entry entry = DatabaseEntry( date=news_date, data_type=DataType.NEWS.value, # "news" ticker=ticker, data=data, metadata={ 'logged_at': datetime.now().isoformat(), 'has_sentiment': sentiment is not None, 'processing_time': processing_time } ) # Save to database success = self.db.save(entry, expiry_days=90) # Keep news for 90 days if success: headline = data.get('Headline', 'Unknown headline') logger.info(f"✅ Logged to DB | {headline[:60]}...") else: logger.error(f"❌ Failed to log news to database") return success except Exception as e: logger.error(f"❌ Error logging news: {str(e)}") import traceback traceback.print_exc() return False def log_batch(self, news_items_with_sentiment_and_times: List[Tuple]): """ Log multiple news items with sentiment in batch. Args: news_items_with_sentiment_and_times: List of tuples (news_item, sentiment_data, processing_time) processing_time can be None if unavailable """ try: entries = [] for item_data in news_items_with_sentiment_and_times: # Unpack the tuple - handle both 2-element and 3-element tuples if len(item_data) == 2: news_item, sentiment_data = item_data processing_time = None elif len(item_data) == 3: news_item, sentiment_data, processing_time = item_data else: print(f"⚠️ Invalid item data format: {item_data}") continue # Extract sentiment details pre_sentiment = sentiment_data.get('pre_sentiment') if isinstance(sentiment_data, dict) else None sentiment = sentiment_data.get('sentiment') if isinstance(sentiment_data, dict) else None rating = sentiment_data.get('rating') if isinstance(sentiment_data, dict) else None # Extract symbols/ticker ticker = "GENERAL" symbols_str = "" if hasattr(news_item, 'symbols') and news_item.symbols: symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols] symbols_str = ', '.join(symbols_list) ticker = symbols_list[0] if symbols_list else "GENERAL" elif isinstance(news_item, dict) and 'symbols' in news_item: symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']] symbols_str = ', '.join(symbols_list) ticker = symbols_list[0] if symbols_list else "GENERAL" # Get date news_date = datetime.now().strftime("%Y-%m-%d") if hasattr(news_item, 'created_at'): try: news_date = str(news_item.created_at).split('T')[0] except: pass elif isinstance(news_item, dict) and 'created_at' in news_item: try: news_date = str(news_item['created_at']).split('T')[0] except: pass # Build data payload data = { 'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None), 'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None), 'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None), 'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None), 'Symbols': symbols_str, 'PreSentimentScore': pre_sentiment, 'SentimentScore': rating, 'SentimentAnalysis': sentiment, 'TimeToProcess': processing_time } # Create database entry entry = DatabaseEntry( date=news_date, data_type=DataType.NEWS.value, ticker=ticker, data=data, metadata={ 'logged_at': datetime.now().isoformat(), 'has_sentiment': sentiment is not None, 'processing_time': processing_time } ) entries.append(entry) # Batch save to database if entries: saved_count = self.db.save_batch(entries, expiry_days=90) logger.info(f"✅ Batch logged {saved_count}/{len(entries)} news items") return saved_count else: logger.warning("⚠️ No valid entries to log") return 0 except Exception as e: logger.error(f"❌ Error batch logging: {str(e)}") import traceback traceback.print_exc() return 0 # Example usage if __name__ == "__main__": logger = NewsDBLogger() # Test with a mock news item class MockNews: def __init__(self): self.id = "test123" self.headline = "Test Headline" self.url = "https://example.com" self.source = "TestSource" self.symbols = ["AAPL", "MSFT"] self.created_at = "2025-01-15T10:30:00Z" mock_news = MockNews() logger.log_news_with_sentiment( mock_news, pre_sentiment="POSITIVE", sentiment="The news is very positive", rating=0.85, processing_time=1.5 ) print("\n✅ Test completed - check database for entry")