File size: 10,911 Bytes
3fe0726
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
Database Logger for News Items
Replaces ExcelLogger - logs news with sentiment directly to MySQL news table
"""

import sys
import logging
import threading
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Tuple

# Add src to path for imports
sys.path.append(str(Path(__file__).parent.parent.parent))

from db.local_database import LocalDatabase, DatabaseEntry, DataType

logger = logging.getLogger(__name__)


class NewsDBLogger:
    """
    Logs news items with sentiment analysis directly to the MySQL news table.
    Replaces the old ExcelLogger functionality.
    """
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(NewsDBLogger, cls).__new__(cls)
                    cls._instance.initialized = False
        return cls._instance
    
    def __init__(self):
        """Initialize the database logger."""
        if self.initialized:
            return
        self.initialized = True
        self.db = LocalDatabase()
        logger.info("✅ NewsDBLogger initialized with MySQL backend")
    
    def log_news_with_sentiment(self, news_item, pre_sentiment=None, sentiment=None, rating=None, processing_time=None):
        """
        Log a news item and its sentiment analysis to the database.
        
        Args:
            news_item: The news item object containing news details
            pre_sentiment (str, optional): Pre-processed sentiment analysis text
            sentiment (str, optional): Processed sentiment analysis text
            rating (str or float, optional): Sentiment score/rating
            processing_time (float, optional): Time taken to process this news item in seconds
        """
        try:
            # Extract symbols/ticker
            ticker = "GENERAL"  # Default ticker
            symbols_str = ""
            
            if hasattr(news_item, 'symbols') and news_item.symbols:
                symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols]
                symbols_str = ', '.join(symbols_list)
                ticker = symbols_list[0] if symbols_list else "GENERAL"
            elif isinstance(news_item, dict) and 'symbols' in news_item:
                symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']]
                symbols_str = ', '.join(symbols_list)
                ticker = symbols_list[0] if symbols_list else "GENERAL"
            
            # Get date
            news_date = datetime.now().strftime("%Y-%m-%d")
            if hasattr(news_item, 'created_at'):
                try:
                    news_date = str(news_item.created_at).split('T')[0]
                except:
                    pass
            elif isinstance(news_item, dict) and 'created_at' in news_item:
                try:
                    news_date = str(news_item['created_at']).split('T')[0]
                except:
                    pass
            
            # Build the data payload matching Excel format
            data = {
                'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None),
                'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None),
                'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None),
                'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None),
                'Symbols': symbols_str,
                'PreSentimentScore': pre_sentiment,
                'SentimentScore': rating,
                'SentimentAnalysis': sentiment,
                'TimeToProcess': processing_time
            }
            
            # Create database entry
            entry = DatabaseEntry(
                date=news_date,
                data_type=DataType.NEWS.value,  # "news"
                ticker=ticker,
                data=data,
                metadata={
                    'logged_at': datetime.now().isoformat(),
                    'has_sentiment': sentiment is not None,
                    'processing_time': processing_time
                }
            )
            
            # Save to database
            success = self.db.save(entry, expiry_days=90)  # Keep news for 90 days
            
            if success:
                headline = data.get('Headline', 'Unknown headline')
                logger.info(f"✅ Logged to DB | {headline[:60]}...")
            else:
                logger.error(f"❌ Failed to log news to database")
                
            return success
            
        except Exception as e:
            logger.error(f"❌ Error logging news: {str(e)}")
            import traceback
            traceback.print_exc()
            return False
    
    def log_batch(self, news_items_with_sentiment_and_times: List[Tuple]):
        """
        Log multiple news items with sentiment in batch.
        
        Args:
            news_items_with_sentiment_and_times: List of tuples (news_item, sentiment_data, processing_time)
                                               processing_time can be None if unavailable
        """
        try:
            entries = []
            
            for item_data in news_items_with_sentiment_and_times:
                # Unpack the tuple - handle both 2-element and 3-element tuples
                if len(item_data) == 2:
                    news_item, sentiment_data = item_data
                    processing_time = None
                elif len(item_data) == 3:
                    news_item, sentiment_data, processing_time = item_data
                else:
                    print(f"⚠️  Invalid item data format: {item_data}")
                    continue
                
                # Extract sentiment details
                pre_sentiment = sentiment_data.get('pre_sentiment') if isinstance(sentiment_data, dict) else None
                sentiment = sentiment_data.get('sentiment') if isinstance(sentiment_data, dict) else None
                rating = sentiment_data.get('rating') if isinstance(sentiment_data, dict) else None
                
                # Extract symbols/ticker
                ticker = "GENERAL"
                symbols_str = ""
                
                if hasattr(news_item, 'symbols') and news_item.symbols:
                    symbols_list = news_item.symbols if isinstance(news_item.symbols, list) else [news_item.symbols]
                    symbols_str = ', '.join(symbols_list)
                    ticker = symbols_list[0] if symbols_list else "GENERAL"
                elif isinstance(news_item, dict) and 'symbols' in news_item:
                    symbols_list = news_item['symbols'] if isinstance(news_item['symbols'], list) else [news_item['symbols']]
                    symbols_str = ', '.join(symbols_list)
                    ticker = symbols_list[0] if symbols_list else "GENERAL"
                
                # Get date
                news_date = datetime.now().strftime("%Y-%m-%d")
                if hasattr(news_item, 'created_at'):
                    try:
                        news_date = str(news_item.created_at).split('T')[0]
                    except:
                        pass
                elif isinstance(news_item, dict) and 'created_at' in news_item:
                    try:
                        news_date = str(news_item['created_at']).split('T')[0]
                    except:
                        pass
                
                # Build data payload
                data = {
                    'Timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    'NewsID': getattr(news_item, 'id', None) if hasattr(news_item, 'id') else (news_item.get('id') if isinstance(news_item, dict) else None),
                    'Headline': getattr(news_item, 'headline', None) if hasattr(news_item, 'headline') else (news_item.get('headline') if isinstance(news_item, dict) else None),
                    'URL': getattr(news_item, 'url', None) if hasattr(news_item, 'url') else (news_item.get('url') if isinstance(news_item, dict) else None),
                    'Source': getattr(news_item, 'source', None) if hasattr(news_item, 'source') else (news_item.get('source') if isinstance(news_item, dict) else None),
                    'Symbols': symbols_str,
                    'PreSentimentScore': pre_sentiment,
                    'SentimentScore': rating,
                    'SentimentAnalysis': sentiment,
                    'TimeToProcess': processing_time
                }
                
                # Create database entry
                entry = DatabaseEntry(
                    date=news_date,
                    data_type=DataType.NEWS.value,
                    ticker=ticker,
                    data=data,
                    metadata={
                        'logged_at': datetime.now().isoformat(),
                        'has_sentiment': sentiment is not None,
                        'processing_time': processing_time
                    }
                )
                
                entries.append(entry)
            
            # Batch save to database
            if entries:
                saved_count = self.db.save_batch(entries, expiry_days=90)
                logger.info(f"✅ Batch logged {saved_count}/{len(entries)} news items")
                return saved_count
            else:
                logger.warning("⚠️  No valid entries to log")
                return 0
                
        except Exception as e:
            logger.error(f"❌ Error batch logging: {str(e)}")
            import traceback
            traceback.print_exc()
            return 0


# Example usage
if __name__ == "__main__":
    logger = NewsDBLogger()
    
    # Test with a mock news item
    class MockNews:
        def __init__(self):
            self.id = "test123"
            self.headline = "Test Headline"
            self.url = "https://example.com"
            self.source = "TestSource"
            self.symbols = ["AAPL", "MSFT"]
            self.created_at = "2025-01-15T10:30:00Z"
    
    mock_news = MockNews()
    logger.log_news_with_sentiment(
        mock_news,
        pre_sentiment="POSITIVE",
        sentiment="The news is very positive",
        rating=0.85,
        processing_time=1.5
    )
    
    print("\n✅ Test completed - check database for entry")