Papaflessas's picture
Deploy Signal Generator app
3fe0726
"""
Database Logger for News Items
Replaces ExcelLogger - logs news with sentiment directly to MySQL news table
"""
import sys
import logging
import threading
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Tuple
# Add src to path for imports
sys.path.append(str(Path(__file__).parent.parent.parent))
from db.local_database import LocalDatabase, DatabaseEntry, DataType
logger = logging.getLogger(__name__)
class NewsDBLogger:
"""
Logs news items with sentiment analysis directly to the MySQL news table.
Replaces the old ExcelLogger functionality.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(NewsDBLogger, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
"""Initialize the database logger."""
if self.initialized:
return
self.initialized = True
self.db = LocalDatabase()
logger.info("✅ NewsDBLogger initialized with MySQL backend")
def log_news_with_sentiment(self, news_item, pre_sentiment=None, sentiment=None, rating=None, processing_time=None):
"""
Log a news item and its sentiment analysis to the database.
Args:
news_item: The news item object containing news details
pre_sentiment (str, optional): Pre-processed sentiment analysis text
sentiment (str, optional): Processed sentiment analysis text
rating (str or float, optional): Sentiment score/rating
processing_time (float, optional): Time taken to process this news item in seconds
"""
try:
# Extract symbols/ticker
ticker = "GENERAL" # Default ticker
symbols_str = ""
if hasattr(news_item, 'symbols') and news_item.symbols:
symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols]
symbols_str = ', '.join(symbols_list)
ticker = symbols_list[0] if symbols_list else "GENERAL"
elif isinstance(news_item, dict) and 'symbols' in news_item:
symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']]
symbols_str = ', '.join(symbols_list)
ticker = symbols_list[0] if symbols_list else "GENERAL"
# Get date
news_date = datetime.now().strftime("%Y-%m-%d")
if hasattr(news_item, 'created_at'):
try:
news_date = str(news_item.created_at).split('T')[0]
except:
pass
elif isinstance(news_item, dict) and 'created_at' in news_item:
try:
news_date = str(news_item['created_at']).split('T')[0]
except:
pass
# Build the data payload matching Excel format
data = {
'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None),
'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None),
'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None),
'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None),
'Symbols': symbols_str,
'PreSentimentScore': pre_sentiment,
'SentimentScore': rating,
'SentimentAnalysis': sentiment,
'TimeToProcess': processing_time
}
# Create database entry
entry = DatabaseEntry(
date=news_date,
data_type=DataType.NEWS.value, # "news"
ticker=ticker,
data=data,
metadata={
'logged_at': datetime.now().isoformat(),
'has_sentiment': sentiment is not None,
'processing_time': processing_time
}
)
# Save to database
success = self.db.save(entry, expiry_days=90) # Keep news for 90 days
if success:
headline = data.get('Headline', 'Unknown headline')
logger.info(f"✅ Logged to DB | {headline[:60]}...")
else:
logger.error(f"❌ Failed to log news to database")
return success
except Exception as e:
logger.error(f"❌ Error logging news: {str(e)}")
import traceback
traceback.print_exc()
return False
def log_batch(self, news_items_with_sentiment_and_times: List[Tuple]):
"""
Log multiple news items with sentiment in batch.
Args:
news_items_with_sentiment_and_times: List of tuples (news_item, sentiment_data, processing_time)
processing_time can be None if unavailable
"""
try:
entries = []
for item_data in news_items_with_sentiment_and_times:
# Unpack the tuple - handle both 2-element and 3-element tuples
if len(item_data) == 2:
news_item, sentiment_data = item_data
processing_time = None
elif len(item_data) == 3:
news_item, sentiment_data, processing_time = item_data
else:
print(f"⚠️ Invalid item data format: {item_data}")
continue
# Extract sentiment details
pre_sentiment = sentiment_data.get('pre_sentiment') if isinstance(sentiment_data, dict) else None
sentiment = sentiment_data.get('sentiment') if isinstance(sentiment_data, dict) else None
rating = sentiment_data.get('rating') if isinstance(sentiment_data, dict) else None
# Extract symbols/ticker
ticker = "GENERAL"
symbols_str = ""
if hasattr(news_item, 'symbols') and news_item.symbols:
symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols]
symbols_str = ', '.join(symbols_list)
ticker = symbols_list[0] if symbols_list else "GENERAL"
elif isinstance(news_item, dict) and 'symbols' in news_item:
symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']]
symbols_str = ', '.join(symbols_list)
ticker = symbols_list[0] if symbols_list else "GENERAL"
# Get date
news_date = datetime.now().strftime("%Y-%m-%d")
if hasattr(news_item, 'created_at'):
try:
news_date = str(news_item.created_at).split('T')[0]
except:
pass
elif isinstance(news_item, dict) and 'created_at' in news_item:
try:
news_date = str(news_item['created_at']).split('T')[0]
except:
pass
# Build data payload
data = {
'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None),
'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None),
'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None),
'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None),
'Symbols': symbols_str,
'PreSentimentScore': pre_sentiment,
'SentimentScore': rating,
'SentimentAnalysis': sentiment,
'TimeToProcess': processing_time
}
# Create database entry
entry = DatabaseEntry(
date=news_date,
data_type=DataType.NEWS.value,
ticker=ticker,
data=data,
metadata={
'logged_at': datetime.now().isoformat(),
'has_sentiment': sentiment is not None,
'processing_time': processing_time
}
)
entries.append(entry)
# Batch save to database
if entries:
saved_count = self.db.save_batch(entries, expiry_days=90)
logger.info(f"✅ Batch logged {saved_count}/{len(entries)} news items")
return saved_count
else:
logger.warning("⚠️ No valid entries to log")
return 0
except Exception as e:
logger.error(f"❌ Error batch logging: {str(e)}")
import traceback
traceback.print_exc()
return 0
# Example usage
if __name__ == "__main__":
logger = NewsDBLogger()
# Test with a mock news item
class MockNews:
def __init__(self):
self.id = "test123"
self.headline = "Test Headline"
self.url = "https://example.com"
self.source = "TestSource"
self.symbols = ["AAPL", "MSFT"]
self.created_at = "2025-01-15T10:30:00Z"
mock_news = MockNews()
logger.log_news_with_sentiment(
mock_news,
pre_sentiment="POSITIVE",
sentiment="The news is very positive",
rating=0.85,
processing_time=1.5
)
print("\n✅ Test completed - check database for entry")