Spaces:
Running
Running
| """ | |
| Database Logger for News Items | |
| Replaces ExcelLogger - logs news with sentiment directly to MySQL news table | |
| """ | |
| import sys | |
| import logging | |
| import threading | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, List, Tuple | |
| # Add src to path for imports | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from db.local_database import LocalDatabase, DatabaseEntry, DataType | |
| logger = logging.getLogger(__name__) | |
| class NewsDBLogger: | |
| """ | |
| Logs news items with sentiment analysis directly to the MySQL news table. | |
| Replaces the old ExcelLogger functionality. | |
| """ | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super(NewsDBLogger, cls).__new__(cls) | |
| cls._instance.initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| """Initialize the database logger.""" | |
| if self.initialized: | |
| return | |
| self.initialized = True | |
| self.db = LocalDatabase() | |
| logger.info("✅ NewsDBLogger initialized with MySQL backend") | |
| def log_news_with_sentiment(self, news_item, pre_sentiment=None, sentiment=None, rating=None, processing_time=None): | |
| """ | |
| Log a news item and its sentiment analysis to the database. | |
| Args: | |
| news_item: The news item object containing news details | |
| pre_sentiment (str, optional): Pre-processed sentiment analysis text | |
| sentiment (str, optional): Processed sentiment analysis text | |
| rating (str or float, optional): Sentiment score/rating | |
| processing_time (float, optional): Time taken to process this news item in seconds | |
| """ | |
| try: | |
| # Extract symbols/ticker | |
| ticker = "GENERAL" # Default ticker | |
| symbols_str = "" | |
| if hasattr(news_item, 'symbols') and news_item.symbols: | |
| symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols] | |
| symbols_str = ', '.join(symbols_list) | |
| ticker = symbols_list[0] if symbols_list else "GENERAL" | |
| elif isinstance(news_item, dict) and 'symbols' in news_item: | |
| symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']] | |
| symbols_str = ', '.join(symbols_list) | |
| ticker = symbols_list[0] if symbols_list else "GENERAL" | |
| # Get date | |
| news_date = datetime.now().strftime("%Y-%m-%d") | |
| if hasattr(news_item, 'created_at'): | |
| try: | |
| news_date = str(news_item.created_at).split('T')[0] | |
| except: | |
| pass | |
| elif isinstance(news_item, dict) and 'created_at' in news_item: | |
| try: | |
| news_date = str(news_item['created_at']).split('T')[0] | |
| except: | |
| pass | |
| # Build the data payload matching Excel format | |
| data = { | |
| 'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| 'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None), | |
| 'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None), | |
| 'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None), | |
| 'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None), | |
| 'Symbols': symbols_str, | |
| 'PreSentimentScore': pre_sentiment, | |
| 'SentimentScore': rating, | |
| 'SentimentAnalysis': sentiment, | |
| 'TimeToProcess': processing_time | |
| } | |
| # Create database entry | |
| entry = DatabaseEntry( | |
| date=news_date, | |
| data_type=DataType.NEWS.value, # "news" | |
| ticker=ticker, | |
| data=data, | |
| metadata={ | |
| 'logged_at': datetime.now().isoformat(), | |
| 'has_sentiment': sentiment is not None, | |
| 'processing_time': processing_time | |
| } | |
| ) | |
| # Save to database | |
| success = self.db.save(entry, expiry_days=90) # Keep news for 90 days | |
| if success: | |
| headline = data.get('Headline', 'Unknown headline') | |
| logger.info(f"✅ Logged to DB | {headline[:60]}...") | |
| else: | |
| logger.error(f"❌ Failed to log news to database") | |
| return success | |
| except Exception as e: | |
| logger.error(f"❌ Error logging news: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def log_batch(self, news_items_with_sentiment_and_times: List[Tuple]): | |
| """ | |
| Log multiple news items with sentiment in batch. | |
| Args: | |
| news_items_with_sentiment_and_times: List of tuples (news_item, sentiment_data, processing_time) | |
| processing_time can be None if unavailable | |
| """ | |
| try: | |
| entries = [] | |
| for item_data in news_items_with_sentiment_and_times: | |
| # Unpack the tuple - handle both 2-element and 3-element tuples | |
| if len(item_data) == 2: | |
| news_item, sentiment_data = item_data | |
| processing_time = None | |
| elif len(item_data) == 3: | |
| news_item, sentiment_data, processing_time = item_data | |
| else: | |
| print(f"⚠️ Invalid item data format: {item_data}") | |
| continue | |
| # Extract sentiment details | |
| pre_sentiment = sentiment_data.get('pre_sentiment') if isinstance(sentiment_data, dict) else None | |
| sentiment = sentiment_data.get('sentiment') if isinstance(sentiment_data, dict) else None | |
| rating = sentiment_data.get('rating') if isinstance(sentiment_data, dict) else None | |
| # Extract symbols/ticker | |
| ticker = "GENERAL" | |
| symbols_str = "" | |
| if hasattr(news_item, 'symbols') and news_item.symbols: | |
| symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols] | |
| symbols_str = ', '.join(symbols_list) | |
| ticker = symbols_list[0] if symbols_list else "GENERAL" | |
| elif isinstance(news_item, dict) and 'symbols' in news_item: | |
| symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']] | |
| symbols_str = ', '.join(symbols_list) | |
| ticker = symbols_list[0] if symbols_list else "GENERAL" | |
| # Get date | |
| news_date = datetime.now().strftime("%Y-%m-%d") | |
| if hasattr(news_item, 'created_at'): | |
| try: | |
| news_date = str(news_item.created_at).split('T')[0] | |
| except: | |
| pass | |
| elif isinstance(news_item, dict) and 'created_at' in news_item: | |
| try: | |
| news_date = str(news_item['created_at']).split('T')[0] | |
| except: | |
| pass | |
| # Build data payload | |
| data = { | |
| 'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| 'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None), | |
| 'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None), | |
| 'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None), | |
| 'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None), | |
| 'Symbols': symbols_str, | |
| 'PreSentimentScore': pre_sentiment, | |
| 'SentimentScore': rating, | |
| 'SentimentAnalysis': sentiment, | |
| 'TimeToProcess': processing_time | |
| } | |
| # Create database entry | |
| entry = DatabaseEntry( | |
| date=news_date, | |
| data_type=DataType.NEWS.value, | |
| ticker=ticker, | |
| data=data, | |
| metadata={ | |
| 'logged_at': datetime.now().isoformat(), | |
| 'has_sentiment': sentiment is not None, | |
| 'processing_time': processing_time | |
| } | |
| ) | |
| entries.append(entry) | |
| # Batch save to database | |
| if entries: | |
| saved_count = self.db.save_batch(entries, expiry_days=90) | |
| logger.info(f"✅ Batch logged {saved_count}/{len(entries)} news items") | |
| return saved_count | |
| else: | |
| logger.warning("⚠️ No valid entries to log") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"❌ Error batch logging: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return 0 | |
| # Example usage | |
| if __name__ == "__main__": | |
| logger = NewsDBLogger() | |
| # Test with a mock news item | |
| class MockNews: | |
| def __init__(self): | |
| self.id = "test123" | |
| self.headline = "Test Headline" | |
| self.url = "https://example.com" | |
| self.source = "TestSource" | |
| self.symbols = ["AAPL", "MSFT"] | |
| self.created_at = "2025-01-15T10:30:00Z" | |
| mock_news = MockNews() | |
| logger.log_news_with_sentiment( | |
| mock_news, | |
| pre_sentiment="POSITIVE", | |
| sentiment="The news is very positive", | |
| rating=0.85, | |
| processing_time=1.5 | |
| ) | |
| print("\n✅ Test completed - check database for entry") | |