Spaces:
Running
Running
| import queue | |
| import asyncio | |
| import threading | |
| import logging | |
| from abc import ABC, abstractmethod | |
| from news_scraper.nlp_models.finbert_prosusAI import FinBertSentimentAnalyzer_ProsusAI as Plutus | |
| from news_scraper.services.sentiment_analysis_gemini import SentimentAnalyzer | |
| from news_scraper.helpers.news_db_logger import NewsDBLogger | |
| from news_scraper.services.marketaux_service import MarketAuxService | |
| from news_scraper.helpers.performance_logger import PerformanceLogger | |
| import time | |
| import yfinance | |
| from concurrent.futures import ThreadPoolExecutor | |
| from bs4 import BeautifulSoup | |
| # Configure logger for main thread visibility | |
| logger = logging.getLogger(__name__) | |
| class NewsHandler(ABC): | |
| """Base handler class for the Chain of Responsibility pattern.""" | |
| def __init__(self, next_handler=None): | |
| self.next_handler = next_handler | |
| def process(self, news_item, metrics=None): | |
| pass | |
| def set_next(self, handler): | |
| self.next_handler = handler | |
| return handler | |
| class FirstStepHandler(NewsHandler): | |
| def __init__(self, pre_sentiment, next_handler=None): | |
| super().__init__(next_handler) | |
| self.pre_sentiment = pre_sentiment | |
| def process(self, news_item, metrics=None): | |
| start_time = time.time() | |
| if metrics is None: | |
| metrics = {} | |
| # Extract text to analyze | |
| if isinstance(news_item, dict): | |
| text_to_analyze = news_item.get('title', news_item.get('headline', '')) | |
| metrics['headline'] = text_to_analyze | |
| metrics['ticker'] = news_item.get('symbols', ['N/A'])[0] if news_item.get('symbols') else 'N/A' | |
| else: | |
| text_to_analyze = getattr(news_item, 'title', getattr(news_item, 'headline', '')) | |
| metrics['headline'] = text_to_analyze | |
| metrics['ticker'] = getattr(news_item, 'symbols', ['N/A'])[0] if getattr(news_item, 'symbols', None) else 'N/A' | |
| # Analyze sentiment | |
| sentiment = self.pre_sentiment.predict_sentiment(text_to_analyze) | |
| logger.info(f"[PRE SENTIMENT] {sentiment} | {text_to_analyze}") | |
| # Record duration | |
| metrics['handler1_duration'] = time.time() - start_time | |
| # Only proceed if sentiment is POSITIVE or NEGATIVE | |
| if sentiment in ["POSITIVE", "NEGATIVE"]: | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, sentiment, metrics) | |
| return news_item | |
| else: | |
| logger.info(f"[NEUTRAL] {text_to_analyze}") | |
| return news_item | |
| class SecondStepHandler(NewsHandler): | |
| """ | |
| Stock hotness analysis | |
| Basically check for reddit/twitter/google trends if they mention the stock | |
| Check pre-market volume for the stock compared to the average volume | |
| """ | |
| def __init__(self, db_logger, next_handler=None): | |
| super().__init__(next_handler) | |
| self.db_logger = db_logger | |
| def process(self, news_item, sentiment, metrics): | |
| start_time = time.time() | |
| title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '') | |
| logger.info(f"[HOTNESS CHECK] {title}") | |
| # Get all symbols | |
| symbols = news_item['symbols'] if isinstance(news_item, dict) and 'symbols' in news_item else getattr(news_item, 'symbols', []) | |
| if not symbols: | |
| symbols = [] | |
| valid_tickers = [] | |
| for symbol in symbols: | |
| _ticker = yfinance.Ticker(symbol) | |
| # Check availability | |
| if not self.db_logger.db.is_ticker_available(_ticker.ticker): | |
| logger.info(f"[SKIP] Ticker {_ticker.ticker} not in available_tickers") | |
| continue | |
| # Check volume | |
| avg_volume = _ticker.info.get('averageVolume', 0) | |
| if not avg_volume > 10**6: | |
| logger.info(f"[SKIP] Ticker {_ticker.ticker} volume < 1M (avg: {avg_volume:,})") | |
| continue | |
| logger.info(f"[PASS] {_ticker.ticker} volume check (avg: {avg_volume:,})") | |
| valid_tickers.append(symbol) | |
| # Record duration | |
| metrics['handler2_duration'] = time.time() - start_time | |
| # If no valid tickers found, skip | |
| if not valid_tickers: | |
| logger.info(f"[SKIP] No valid tickers found for news item") | |
| return news_item | |
| # Proceed if at least one ticker is valid | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, sentiment, metrics) | |
| return news_item | |
| class ThirdStepHandler(NewsHandler): | |
| """ | |
| Headline Sentiment Filter (Persona B: Headline Fast-Track) | |
| """ | |
| def __init__(self, sentiment_analyzer, next_handler=None): | |
| super().__init__(next_handler) | |
| self.sentiment_analyzer = sentiment_analyzer | |
| def process(self, news_item, sentiment, metrics): | |
| start_time = time.time() | |
| title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '') | |
| logger.info(f"[HEADLINE ANALYSIS] {title}") | |
| # Use Ollama Headline Persona (by passing short text) | |
| # We pass 'content' as the title. analyze_sentiment will detect length < 250 and use Headline prompt. | |
| rating, analysis, impact_type = self.sentiment_analyzer.analyze_sentiment(content=title) | |
| logger.info(f"[HEADLINE] Rating: {rating} | Impact: {impact_type} | Analysis: {analysis}") | |
| # Check if Positive (>=4) or Negative (<=2) | |
| is_positive = isinstance(rating, (int, float)) and rating >= 4 | |
| is_negative = isinstance(rating, (int, float)) and rating <= 2 | |
| # Record duration | |
| metrics['handler3_duration'] = time.time() - start_time | |
| if is_positive or is_negative: | |
| # Update pre_sentiment to match Ollama output | |
| updated_sentiment = "POSITIVE" if is_positive else "NEGATIVE" | |
| logger.info(f"[HEADLINE] Sentiment significant ({updated_sentiment}). Proceeding.") | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, updated_sentiment, rating, analysis, impact_type, metrics) | |
| else: | |
| logger.info(f"[HEADLINE] Sentiment Neutral/Weak (Rating: {rating}). Stopping.") | |
| return news_item | |
| class FourthStepHandler(NewsHandler): | |
| """ | |
| Context Gathering (Persona A: Short-Term Sentiment) | |
| Fetches last 3 news items from MarketAux and analyzes them. | |
| """ | |
| def __init__(self, sentiment_analyzer, market_aux_service, next_handler=None): | |
| super().__init__(next_handler) | |
| self.sentiment_analyzer = sentiment_analyzer | |
| self.market_aux_service = market_aux_service | |
| def process(self, news_item, sentiment, headline_rating, headline_analysis, headline_impact, metrics): | |
| start_time = time.time() | |
| logger.info("[CONTEXT CHECK] Fetching recent news context from MarketAux...") | |
| # Get tickers | |
| tickers = [] | |
| if hasattr(news_item, 'symbols') and news_item.symbols: | |
| tickers = news_item.symbols | |
| elif isinstance(news_item, dict) and 'symbols' in news_item and news_item['symbols']: | |
| tickers = news_item['symbols'] | |
| if not tickers: | |
| logger.warning("[CONTEXT] No tickers found, skipping context analysis") | |
| metrics['handler4_duration'] = time.time() - start_time | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, sentiment, headline_rating, headline_analysis, headline_impact, [], metrics) | |
| return news_item | |
| ticker_symbol = tickers[0] # Focus on primary ticker | |
| context_results = [] | |
| metrics['marketaux_processing_times'] = [] | |
| # Check for rich content in news_item (Alpaca often provides this) | |
| content = news_item.get('content', '') if isinstance(news_item, dict) else getattr(news_item, 'content', '') | |
| summary = news_item.get('summary', '') if isinstance(news_item, dict) else getattr(news_item, 'summary', '') | |
| # Prefer content, fallback to summary | |
| text_to_analyze = content if content and len(content) > 50 else summary | |
| # If we have substantial text, use it directly instead of MarketAux | |
| if text_to_analyze and len(text_to_analyze) > 100: | |
| try: | |
| logger.info(f"[CONTEXT] Found rich content/summary ({len(text_to_analyze)} chars). Analyzing directly...") | |
| # Clean HTML if present | |
| if "<" in text_to_analyze and ">" in text_to_analyze: | |
| soup = BeautifulSoup(text_to_analyze, 'html.parser') | |
| text_to_analyze = soup.get_text(separator=' ', strip=True) | |
| # Analyze sentiment directly | |
| # We pass content=text_to_analyze. analyze_sentiment will use Short-Term Persona if length > 250 | |
| result = self.sentiment_analyzer.analyze_sentiment(content=text_to_analyze) | |
| context_results = [result] | |
| metrics['marketaux_processing_times'] = ["0.0"] # Placeholder as we didn't use MarketAux | |
| logger.info(f"[CONTEXT] Direct Analysis: Rating {result[0]} | {result[2]}") | |
| except Exception as e: | |
| logger.error(f"❌ [CONTEXT] Error in direct analysis: {e}") | |
| # Fallback to MarketAux if direct analysis fails? | |
| # For now, let's proceed to MarketAux only if context_results is empty | |
| pass | |
| # If no direct content analysis was successful, proceed with MarketAux | |
| if not context_results: | |
| try: | |
| # Fetch from MarketAux with advanced filtering (Time only, no sentiment bias) | |
| # Limit 3, 12h window | |
| market_aux_news = self.market_aux_service.get_company_news( | |
| symbol=ticker_symbol, | |
| limit=3 | |
| ) | |
| if market_aux_news: | |
| logger.info(f"[CONTEXT] Analyzing last {len(market_aux_news)} news items for {ticker_symbol}") | |
| def analyze_item(item): | |
| item_start = time.time() | |
| url = item.get('url') | |
| result = (3, "No URL", "Unknown") | |
| if url: | |
| # This will use Short-Term Persona because we pass a URL (fetches full content) | |
| result = self.sentiment_analyzer.analyze_sentiment(url=url) | |
| duration = time.time() - item_start | |
| return result, duration | |
| # We can't easily track individual times inside ThreadPoolExecutor map unless we wrap the function | |
| # So we do it sequentially or use submit to track futures? | |
| # For simplicity and logging, let's just wrap the function as above and unpack results. | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| futures_results = list(executor.map(analyze_item, market_aux_news)) | |
| # Unpack results | |
| context_results = [r[0] for r in futures_results] | |
| durations = [r[1] for r in futures_results] | |
| metrics['marketaux_processing_times'] = [f"{d:.4f}" for d in durations] | |
| for i, res in enumerate(context_results): | |
| # Get title from original list if possible | |
| item_title = market_aux_news[i].get('title', 'Unknown Title') | |
| logger.info(f"[CONTEXT] Item {i+1}: {item_title} | Rating {res[0]} | {res[2]}") | |
| else: | |
| logger.info("[CONTEXT] No recent news found on MarketAux matching criteria.") | |
| except Exception as e: | |
| logger.error(f"❌ [CONTEXT] Error fetching/analyzing context: {e}") | |
| # Record duration | |
| metrics['handler4_duration'] = time.time() - start_time | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results, metrics) | |
| return news_item | |
| class FifthStepHandler(NewsHandler): | |
| """ | |
| Signal Generation (formerly FourthStepHandler) | |
| """ | |
| def __init__(self, db_logger, fundamental_service, performance_logger, next_handler=None): | |
| super().__init__(next_handler) | |
| self.db_logger = db_logger | |
| self.fundamental_service = fundamental_service | |
| self.performance_logger = performance_logger | |
| def process(self, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results, metrics): | |
| start_time = time.time() | |
| title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '') | |
| logger.info(f"[SIGNAL GENERATION] {title}") | |
| # Log news with proper sentiment data | |
| # We use the Headline Rating as the primary "News Rating" for the DB log | |
| # Note: processing_time passed to DB logger is just for this step or total? | |
| # Original code used 'time.time() - start_time' which was passed from FirstStepHandler. | |
| # But here start_time is local to this handler. | |
| # Let's use the total time accumulated so far? Or just this handler's time? | |
| # The DB logger expects 'processing_time'. Let's give it the total time from metrics if possible, | |
| # or just calculate it. | |
| # For consistency with previous logic, let's just use this handler's time for now, | |
| # but the request is about the CSV log. | |
| # Signal Generation Logic | |
| try: | |
| # 1. Get Tickers | |
| tickers = [] | |
| if hasattr(news_item, 'symbols') and news_item.symbols: | |
| tickers = news_item.symbols | |
| elif isinstance(news_item, dict) and 'symbols' in news_item and news_item['symbols']: | |
| tickers = news_item['symbols'] | |
| if not tickers: | |
| logger.warning("[SIGNAL] No tickers found in news item, skipping signal generation") | |
| metrics['handler5_duration'] = time.time() - start_time | |
| self.performance_logger.log_metrics(metrics) | |
| return news_item | |
| # Process each ticker | |
| for ticker in tickers: | |
| # 2. Get Fundamental Analysis from DB | |
| entries = self.db_logger.db.query( | |
| data_type='fundamental_analysis', | |
| ticker=ticker, | |
| limit=1 | |
| ) | |
| fundamental_entry = entries[0] if entries else None | |
| if fundamental_entry: | |
| self._generate_signal(ticker, fundamental_entry.data, fundamental_entry.generate_key(), news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results) | |
| else: | |
| logger.info(f"[SIGNAL] No fundamental analysis found for {ticker}. Requesting analysis...") | |
| # Request analysis and retry signal generation | |
| self.fundamental_service.request_analysis( | |
| ticker, | |
| lambda t, r: self._generate_signal( | |
| t, r, None, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"❌ [SIGNAL] Error generating signal: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| # Record duration | |
| metrics['handler5_duration'] = time.time() - start_time | |
| # Log Performance Metrics to CSV | |
| self.performance_logger.log_metrics(metrics) | |
| if self.next_handler: | |
| return self.next_handler.process(news_item, sentiment, metrics) | |
| return news_item | |
| def _generate_signal(self, ticker, fundamental_data, fundamental_key, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results): | |
| """ | |
| Helper method to generate signal based on fundamental data and news sentiment. | |
| """ | |
| try: | |
| fundamental_rec = fundamental_data.get('recommendation', 'UNKNOWN') | |
| # Extract confidence score, default to 0.5 (50%) if missing | |
| # Confidence is 0.0-1.0 | |
| confidence_score = fundamental_data.get('confidence', 0.5) | |
| logger.info(f"[SIGNAL] Fundamental analysis for {ticker}: {fundamental_rec} (Confidence: {confidence_score:.2f})") | |
| if fundamental_key is None: | |
| entries = self.db_logger.db.query(data_type='fundamental_analysis', ticker=ticker, limit=1) | |
| if entries: | |
| fundamental_key = entries[0].generate_key() | |
| # 3. Determine Signal | |
| signal_position = None | |
| # Use Headline Rating for primary signal logic | |
| is_news_negative = sentiment == "NEGATIVE" | |
| is_news_positive = sentiment == "POSITIVE" | |
| # Optional: Incorporate context_results into signal logic? | |
| if context_results: | |
| avg_context_rating = sum([r[0] for r in context_results if isinstance(r[0], (int, float))]) / len(context_results) | |
| logger.info(f"[SIGNAL] Context Average Rating: {avg_context_rating:.1f}") | |
| if is_news_negative: | |
| if fundamental_rec == "BUY": | |
| if confidence_score < 0.75: | |
| signal_position = "SELL" | |
| logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Low Confidence Fundamental BUY)") | |
| else: | |
| logger.info(f"[SIGNAL] SKIP: Negative news but High Confidence Fundamental BUY for {ticker}") | |
| elif fundamental_rec == "SELL": | |
| signal_position = "SELL" | |
| logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Fundamental SELL)") | |
| else: # HOLD or UNKNOWN | |
| signal_position = "SELL" # Default to SELL if news is negative and fundamental is not BUY | |
| logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Fundamental {fundamental_rec})") | |
| elif is_news_positive: | |
| if fundamental_rec == "BUY": | |
| signal_position = "BUY" | |
| logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Fundamental BUY)") | |
| elif fundamental_rec == "SELL": | |
| if confidence_score < 0.75: | |
| signal_position = "BUY" | |
| logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Low Confidence Fundamental SELL)") | |
| else: | |
| logger.info(f"[SIGNAL] SKIP: Positive news but High Confidence Fundamental SELL for {ticker}") | |
| else: # HOLD or UNKNOWN | |
| signal_position = "BUY" # Default to BUY if news is positive and fundamental is not SELL | |
| logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Fundamental {fundamental_rec})") | |
| # 4. Write Signal to DB | |
| if signal_position: | |
| news_id = getattr(news_item, 'id', news_item.get('id') if isinstance(news_item, dict) else None) | |
| news_keys = [str(news_id)] if news_id else [] | |
| # Construct sentiment dictionary | |
| news_sentiment_val = "POSITIVE" if is_news_positive else "NEGATIVE" | |
| sentiment_data = { | |
| 'news_sentiment': news_sentiment_val, | |
| 'fundamental_sentiment': fundamental_rec, | |
| 'gemini_sentiment': headline_rating, | |
| 'gemini_analysis': headline_analysis, | |
| 'gemini_impact_type': headline_impact, | |
| 'context_analysis': str(context_results) # Store context results | |
| } | |
| success = self.db_logger.db.save_signal( | |
| ticker=ticker, | |
| calendar_event_keys=[], | |
| news_keys=news_keys, | |
| fundamental_key=fundamental_key, | |
| sentiment=sentiment_data, | |
| signal_position=signal_position | |
| ) | |
| if success: | |
| logger.info(f"✅ [SIGNAL] Saved {signal_position} signal for {ticker}") | |
| else: | |
| logger.error(f"❌ [SIGNAL] Failed to save signal for {ticker}") | |
| except Exception as e: | |
| logger.error(f"❌ [SIGNAL] Error in _generate_signal for {ticker}: {str(e)}") | |
| class NewsProcessor: | |
| def __new__(cls): | |
| if not hasattr(cls, 'instance'): | |
| cls.instance = super(NewsProcessor, cls).__new__(cls) | |
| return cls.instance | |
| def __init__(self): | |
| if not hasattr(self, 'initialized'): | |
| self.initialized = True | |
| self.news_data = [] | |
| self.processed_news = [] | |
| self.callbacks = [] | |
| self.news_queue = queue.Queue() # Add a queue for news items | |
| # Initialize services | |
| self.db_logger = NewsDBLogger() | |
| self.pre_sentiment = Plutus() | |
| self.sentiment_analyzer = SentimentAnalyzer() | |
| self.market_aux_service = MarketAuxService(api_token="MNWZJoLDbQKLBCBwln7TePo9nw9hdOcJWMyczdWi") | |
| self.performance_logger = PerformanceLogger() | |
| from news_scraper.services.fundamental_analysis_service import FundamentalAnalysisService | |
| self.fundamental_service = FundamentalAnalysisService() | |
| self.fundamental_service.start() | |
| # Set up the Chain of Responsibility | |
| self.handler_chain = FirstStepHandler(pre_sentiment=self.pre_sentiment) | |
| second_step = SecondStepHandler(db_logger=self.db_logger) | |
| third_step = ThirdStepHandler(sentiment_analyzer=self.sentiment_analyzer) | |
| fourth_step = FourthStepHandler(sentiment_analyzer=self.sentiment_analyzer, market_aux_service=self.market_aux_service) | |
| fifth_step = FifthStepHandler(db_logger=self.db_logger, fundamental_service=self.fundamental_service, performance_logger=self.performance_logger) | |
| self.handler_chain.set_next(second_step) | |
| second_step.set_next(third_step) | |
| third_step.set_next(fourth_step) | |
| fourth_step.set_next(fifth_step) | |
| # Initialize processing variables | |
| self.processing_task = None | |
| self.loop = None | |
| self.loop_thread = None | |
| self.is_running = False | |
| def add_news(self, news_item): | |
| """ | |
| Add a news item to the news queue. | |
| """ | |
| self.news_queue.put(news_item) | |
| async def fifo_process_news(self): | |
| """ | |
| Process the news items using a FIFO approach asynchronously. | |
| """ | |
| while True: | |
| if not self.news_queue.empty(): | |
| # Get the next news item from the queue | |
| news_item = self.news_queue.get() | |
| try: | |
| # Process the news item through the handler chain | |
| processed_item = self.handler_chain.process(news_item) | |
| # Store processed item | |
| self.processed_news.append(processed_item) | |
| # Notify callbacks if any | |
| for callback in self.callbacks: | |
| await callback(processed_item) | |
| headline = news_item['headline'] if isinstance(news_item, dict) and 'headline' in news_item else getattr(news_item, 'headline', '') | |
| logger.info(f"✅ [COMPLETED] {headline}") | |
| except Exception as e: | |
| logger.error(f"❌ [ERROR] Processing news: {str(e)}") | |
| finally: | |
| # Mark the task as done in the queue | |
| self.news_queue.task_done() | |
| # Sleep to avoid high CPU usage | |
| await asyncio.sleep(1) | |
| def start_processing(self): | |
| """ | |
| Start the async processing task. | |
| """ | |
| logger.info("🚀 [NEWS PROCESSOR] Attempting to start news processing") | |
| if self.is_running: | |
| logger.info("⚠️ [NEWS PROCESSOR] Already running - ignoring start request") | |
| return | |
| try: | |
| # Try to get the current event loop if we're in an asyncio context | |
| self.loop = asyncio.get_event_loop() | |
| if self.loop.is_running(): | |
| # We're already in a running event loop, just create the task | |
| logger.info("📌 [NEWS PROCESSOR] Using existing running event loop") | |
| self.processing_task = self.loop.create_task(self.fifo_process_news()) | |
| self.is_running = True | |
| logger.info("✅ [NEWS PROCESSOR] Task created in existing event loop") | |
| return | |
| except RuntimeError: | |
| # If we're not in an asyncio context, create a new loop | |
| logger.info("🔄 [NEWS PROCESSOR] No running event loop found, creating new one") | |
| self.loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self.loop) | |
| # Create and start the processing task in a new thread | |
| def run_event_loop(): | |
| thread_id = threading.get_ident() | |
| logger.info(f"🧵 [NEWS PROCESSOR] Thread started with ID: {thread_id}") | |
| asyncio.set_event_loop(self.loop) | |
| self.processing_task = self.loop.create_task(self.fifo_process_news()) | |
| logger.info("▶️ [NEWS PROCESSOR] Event loop running") | |
| self.loop.run_forever() | |
| self.loop_thread = threading.Thread(target=run_event_loop, daemon=True) | |
| self.loop_thread.start() | |
| self.is_running = True | |
| logger.info(f"✅ [NEWS PROCESSOR] Thread started: {self.loop_thread.name}") | |
| def stop_processing(self): | |
| """ | |
| Stop the async processing task. | |
| """ | |
| if not self.is_running: | |
| logger.info("⚠️ [NEWS PROCESSOR] Not running") | |
| return | |
| if self.processing_task: | |
| self.processing_task.cancel() | |
| if self.loop and self.loop.is_running() and self.loop_thread: | |
| # Stop the loop and wait for the thread to finish | |
| self.loop.call_soon_threadsafe(self.loop.stop) | |
| if threading.current_thread() != self.loop_thread: | |
| self.loop_thread.join(timeout=5.0) | |
| self.is_running = False | |
| self.fundamental_service.stop() | |
| logger.info("🛑 [NEWS PROCESSOR] Stopped") | |
| def register_callback(self, callback): | |
| """ | |
| Register a callback function to be called after processing news items. | |
| """ | |
| self.callbacks.append(callback) | |
| if __name__ == "__main__": | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # Example usage | |
| news_processor = NewsProcessor() | |
| # Register a callback function | |
| async def example_callback(news_item): | |
| print(f"[PROCESSOR] [FUNC] Processing | {news_item.headline if hasattr(news_item, 'headline') else ''}") | |
| news_processor.register_callback(example_callback) | |
| # Start processing news items | |
| news_processor.start_processing() | |
| # Keep the main thread alive for testing | |
| try: | |
| while True: | |
| import time | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| news_processor.stop_processing() | |
| print("Exiting...") | |