Papaflessas's picture
Deploy Signal Generator app
3fe0726
import queue
import asyncio
import threading
import logging
from abc import ABC, abstractmethod
from news_scraper.nlp_models.finbert_prosusAI import FinBertSentimentAnalyzer_ProsusAI as Plutus
from news_scraper.services.sentiment_analysis_gemini import SentimentAnalyzer
from news_scraper.helpers.news_db_logger import NewsDBLogger
from news_scraper.services.marketaux_service import MarketAuxService
from news_scraper.helpers.performance_logger import PerformanceLogger
import time
import yfinance
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
# Configure logger for main thread visibility
logger = logging.getLogger(__name__)
class NewsHandler(ABC):
"""Base handler class for the Chain of Responsibility pattern."""
def __init__(self, next_handler=None):
self.next_handler = next_handler
@abstractmethod
def process(self, news_item, metrics=None):
pass
def set_next(self, handler):
self.next_handler = handler
return handler
class FirstStepHandler(NewsHandler):
def __init__(self, pre_sentiment, next_handler=None):
super().__init__(next_handler)
self.pre_sentiment = pre_sentiment
def process(self, news_item, metrics=None):
start_time = time.time()
if metrics is None:
metrics = {}
# Extract text to analyze
if isinstance(news_item, dict):
text_to_analyze = news_item.get('title', news_item.get('headline', ''))
metrics['headline'] = text_to_analyze
metrics['ticker'] = news_item.get('symbols', ['N/A'])[0] if news_item.get('symbols') else 'N/A'
else:
text_to_analyze = getattr(news_item, 'title', getattr(news_item, 'headline', ''))
metrics['headline'] = text_to_analyze
metrics['ticker'] = getattr(news_item, 'symbols', ['N/A'])[0] if getattr(news_item, 'symbols', None) else 'N/A'
# Analyze sentiment
sentiment = self.pre_sentiment.predict_sentiment(text_to_analyze)
logger.info(f"[PRE SENTIMENT] {sentiment} | {text_to_analyze}")
# Record duration
metrics['handler1_duration'] = time.time() - start_time
# Only proceed if sentiment is POSITIVE or NEGATIVE
if sentiment in ["POSITIVE", "NEGATIVE"]:
if self.next_handler:
return self.next_handler.process(news_item, sentiment, metrics)
return news_item
else:
logger.info(f"[NEUTRAL] {text_to_analyze}")
return news_item
class SecondStepHandler(NewsHandler):
"""
Stock hotness analysis
Basically check for reddit/twitter/google trends if they mention the stock
Check pre-market volume for the stock compared to the average volume
"""
def __init__(self, db_logger, next_handler=None):
super().__init__(next_handler)
self.db_logger = db_logger
def process(self, news_item, sentiment, metrics):
start_time = time.time()
title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '')
logger.info(f"[HOTNESS CHECK] {title}")
# Get all symbols
symbols = news_item['symbols'] if isinstance(news_item, dict) and 'symbols' in news_item else getattr(news_item, 'symbols', [])
if not symbols:
symbols = []
valid_tickers = []
for symbol in symbols:
_ticker = yfinance.Ticker(symbol)
# Check availability
if not self.db_logger.db.is_ticker_available(_ticker.ticker):
logger.info(f"[SKIP] Ticker {_ticker.ticker} not in available_tickers")
continue
# Check volume
avg_volume = _ticker.info.get('averageVolume', 0)
if not avg_volume > 10**6:
logger.info(f"[SKIP] Ticker {_ticker.ticker} volume < 1M (avg: {avg_volume:,})")
continue
logger.info(f"[PASS] {_ticker.ticker} volume check (avg: {avg_volume:,})")
valid_tickers.append(symbol)
# Record duration
metrics['handler2_duration'] = time.time() - start_time
# If no valid tickers found, skip
if not valid_tickers:
logger.info(f"[SKIP] No valid tickers found for news item")
return news_item
# Proceed if at least one ticker is valid
if self.next_handler:
return self.next_handler.process(news_item, sentiment, metrics)
return news_item
class ThirdStepHandler(NewsHandler):
"""
Headline Sentiment Filter (Persona B: Headline Fast-Track)
"""
def __init__(self, sentiment_analyzer, next_handler=None):
super().__init__(next_handler)
self.sentiment_analyzer = sentiment_analyzer
def process(self, news_item, sentiment, metrics):
start_time = time.time()
title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '')
logger.info(f"[HEADLINE ANALYSIS] {title}")
# Use Ollama Headline Persona (by passing short text)
# We pass 'content' as the title. analyze_sentiment will detect length < 250 and use Headline prompt.
rating, analysis, impact_type = self.sentiment_analyzer.analyze_sentiment(content=title)
logger.info(f"[HEADLINE] Rating: {rating} | Impact: {impact_type} | Analysis: {analysis}")
# Check if Positive (>=4) or Negative (<=2)
is_positive = isinstance(rating, (int, float)) and rating >= 4
is_negative = isinstance(rating, (int, float)) and rating <= 2
# Record duration
metrics['handler3_duration'] = time.time() - start_time
if is_positive or is_negative:
# Update pre_sentiment to match Ollama output
updated_sentiment = "POSITIVE" if is_positive else "NEGATIVE"
logger.info(f"[HEADLINE] Sentiment significant ({updated_sentiment}). Proceeding.")
if self.next_handler:
return self.next_handler.process(news_item, updated_sentiment, rating, analysis, impact_type, metrics)
else:
logger.info(f"[HEADLINE] Sentiment Neutral/Weak (Rating: {rating}). Stopping.")
return news_item
class FourthStepHandler(NewsHandler):
"""
Context Gathering (Persona A: Short-Term Sentiment)
Fetches last 3 news items from MarketAux and analyzes them.
"""
def __init__(self, sentiment_analyzer, market_aux_service, next_handler=None):
super().__init__(next_handler)
self.sentiment_analyzer = sentiment_analyzer
self.market_aux_service = market_aux_service
def process(self, news_item, sentiment, headline_rating, headline_analysis, headline_impact, metrics):
start_time = time.time()
logger.info("[CONTEXT CHECK] Fetching recent news context from MarketAux...")
# Get tickers
tickers = []
if hasattr(news_item, 'symbols') and news_item.symbols:
tickers = news_item.symbols
elif isinstance(news_item, dict) and 'symbols' in news_item and news_item['symbols']:
tickers = news_item['symbols']
if not tickers:
logger.warning("[CONTEXT] No tickers found, skipping context analysis")
metrics['handler4_duration'] = time.time() - start_time
if self.next_handler:
return self.next_handler.process(news_item, sentiment, headline_rating, headline_analysis, headline_impact, [], metrics)
return news_item
ticker_symbol = tickers[0] # Focus on primary ticker
context_results = []
metrics['marketaux_processing_times'] = []
# Check for rich content in news_item (Alpaca often provides this)
content = news_item.get('content', '') if isinstance(news_item, dict) else getattr(news_item, 'content', '')
summary = news_item.get('summary', '') if isinstance(news_item, dict) else getattr(news_item, 'summary', '')
# Prefer content, fallback to summary
text_to_analyze = content if content and len(content) > 50 else summary
# If we have substantial text, use it directly instead of MarketAux
if text_to_analyze and len(text_to_analyze) > 100:
try:
logger.info(f"[CONTEXT] Found rich content/summary ({len(text_to_analyze)} chars). Analyzing directly...")
# Clean HTML if present
if "<" in text_to_analyze and ">" in text_to_analyze:
soup = BeautifulSoup(text_to_analyze, 'html.parser')
text_to_analyze = soup.get_text(separator=' ', strip=True)
# Analyze sentiment directly
# We pass content=text_to_analyze. analyze_sentiment will use Short-Term Persona if length > 250
result = self.sentiment_analyzer.analyze_sentiment(content=text_to_analyze)
context_results = [result]
metrics['marketaux_processing_times'] = ["0.0"] # Placeholder as we didn't use MarketAux
logger.info(f"[CONTEXT] Direct Analysis: Rating {result[0]} | {result[2]}")
except Exception as e:
logger.error(f"❌ [CONTEXT] Error in direct analysis: {e}")
# Fallback to MarketAux if direct analysis fails?
# For now, let's proceed to MarketAux only if context_results is empty
pass
# If no direct content analysis was successful, proceed with MarketAux
if not context_results:
try:
# Fetch from MarketAux with advanced filtering (Time only, no sentiment bias)
# Limit 3, 12h window
market_aux_news = self.market_aux_service.get_company_news(
symbol=ticker_symbol,
limit=3
)
if market_aux_news:
logger.info(f"[CONTEXT] Analyzing last {len(market_aux_news)} news items for {ticker_symbol}")
def analyze_item(item):
item_start = time.time()
url = item.get('url')
result = (3, "No URL", "Unknown")
if url:
# This will use Short-Term Persona because we pass a URL (fetches full content)
result = self.sentiment_analyzer.analyze_sentiment(url=url)
duration = time.time() - item_start
return result, duration
# We can't easily track individual times inside ThreadPoolExecutor map unless we wrap the function
# So we do it sequentially or use submit to track futures?
# For simplicity and logging, let's just wrap the function as above and unpack results.
with ThreadPoolExecutor(max_workers=3) as executor:
futures_results = list(executor.map(analyze_item, market_aux_news))
# Unpack results
context_results = [r[0] for r in futures_results]
durations = [r[1] for r in futures_results]
metrics['marketaux_processing_times'] = [f"{d:.4f}" for d in durations]
for i, res in enumerate(context_results):
# Get title from original list if possible
item_title = market_aux_news[i].get('title', 'Unknown Title')
logger.info(f"[CONTEXT] Item {i+1}: {item_title} | Rating {res[0]} | {res[2]}")
else:
logger.info("[CONTEXT] No recent news found on MarketAux matching criteria.")
except Exception as e:
logger.error(f"❌ [CONTEXT] Error fetching/analyzing context: {e}")
# Record duration
metrics['handler4_duration'] = time.time() - start_time
if self.next_handler:
return self.next_handler.process(news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results, metrics)
return news_item
class FifthStepHandler(NewsHandler):
"""
Signal Generation (formerly FourthStepHandler)
"""
def __init__(self, db_logger, fundamental_service, performance_logger, next_handler=None):
super().__init__(next_handler)
self.db_logger = db_logger
self.fundamental_service = fundamental_service
self.performance_logger = performance_logger
def process(self, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results, metrics):
start_time = time.time()
title = news_item['title'] if isinstance(news_item, dict) and 'title' in news_item else getattr(news_item, 'title', '')
logger.info(f"[SIGNAL GENERATION] {title}")
# Log news with proper sentiment data
# We use the Headline Rating as the primary "News Rating" for the DB log
# Note: processing_time passed to DB logger is just for this step or total?
# Original code used 'time.time() - start_time' which was passed from FirstStepHandler.
# But here start_time is local to this handler.
# Let's use the total time accumulated so far? Or just this handler's time?
# The DB logger expects 'processing_time'. Let's give it the total time from metrics if possible,
# or just calculate it.
# For consistency with previous logic, let's just use this handler's time for now,
# but the request is about the CSV log.
# Signal Generation Logic
try:
# 1. Get Tickers
tickers = []
if hasattr(news_item, 'symbols') and news_item.symbols:
tickers = news_item.symbols
elif isinstance(news_item, dict) and 'symbols' in news_item and news_item['symbols']:
tickers = news_item['symbols']
if not tickers:
logger.warning("[SIGNAL] No tickers found in news item, skipping signal generation")
metrics['handler5_duration'] = time.time() - start_time
self.performance_logger.log_metrics(metrics)
return news_item
# Process each ticker
for ticker in tickers:
# 2. Get Fundamental Analysis from DB
entries = self.db_logger.db.query(
data_type='fundamental_analysis',
ticker=ticker,
limit=1
)
fundamental_entry = entries[0] if entries else None
if fundamental_entry:
self._generate_signal(ticker, fundamental_entry.data, fundamental_entry.generate_key(), news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results)
else:
logger.info(f"[SIGNAL] No fundamental analysis found for {ticker}. Requesting analysis...")
# Request analysis and retry signal generation
self.fundamental_service.request_analysis(
ticker,
lambda t, r: self._generate_signal(
t, r, None, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results
)
)
except Exception as e:
logger.error(f"❌ [SIGNAL] Error generating signal: {str(e)}")
import traceback
traceback.print_exc()
# Record duration
metrics['handler5_duration'] = time.time() - start_time
# Log Performance Metrics to CSV
self.performance_logger.log_metrics(metrics)
if self.next_handler:
return self.next_handler.process(news_item, sentiment, metrics)
return news_item
def _generate_signal(self, ticker, fundamental_data, fundamental_key, news_item, sentiment, headline_rating, headline_analysis, headline_impact, context_results):
"""
Helper method to generate signal based on fundamental data and news sentiment.
"""
try:
fundamental_rec = fundamental_data.get('recommendation', 'UNKNOWN')
# Extract confidence score, default to 0.5 (50%) if missing
# Confidence is 0.0-1.0
confidence_score = fundamental_data.get('confidence', 0.5)
logger.info(f"[SIGNAL] Fundamental analysis for {ticker}: {fundamental_rec} (Confidence: {confidence_score:.2f})")
if fundamental_key is None:
entries = self.db_logger.db.query(data_type='fundamental_analysis', ticker=ticker, limit=1)
if entries:
fundamental_key = entries[0].generate_key()
# 3. Determine Signal
signal_position = None
# Use Headline Rating for primary signal logic
is_news_negative = sentiment == "NEGATIVE"
is_news_positive = sentiment == "POSITIVE"
# Optional: Incorporate context_results into signal logic?
if context_results:
avg_context_rating = sum([r[0] for r in context_results if isinstance(r[0], (int, float))]) / len(context_results)
logger.info(f"[SIGNAL] Context Average Rating: {avg_context_rating:.1f}")
if is_news_negative:
if fundamental_rec == "BUY":
if confidence_score < 0.75:
signal_position = "SELL"
logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Low Confidence Fundamental BUY)")
else:
logger.info(f"[SIGNAL] SKIP: Negative news but High Confidence Fundamental BUY for {ticker}")
elif fundamental_rec == "SELL":
signal_position = "SELL"
logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Fundamental SELL)")
else: # HOLD or UNKNOWN
signal_position = "SELL" # Default to SELL if news is negative and fundamental is not BUY
logger.info(f"[SIGNAL] GENERATED: SELL signal for {ticker} (Negative News + Fundamental {fundamental_rec})")
elif is_news_positive:
if fundamental_rec == "BUY":
signal_position = "BUY"
logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Fundamental BUY)")
elif fundamental_rec == "SELL":
if confidence_score < 0.75:
signal_position = "BUY"
logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Low Confidence Fundamental SELL)")
else:
logger.info(f"[SIGNAL] SKIP: Positive news but High Confidence Fundamental SELL for {ticker}")
else: # HOLD or UNKNOWN
signal_position = "BUY" # Default to BUY if news is positive and fundamental is not SELL
logger.info(f"[SIGNAL] GENERATED: BUY signal for {ticker} (Positive News + Fundamental {fundamental_rec})")
# 4. Write Signal to DB
if signal_position:
news_id = getattr(news_item, 'id', news_item.get('id') if isinstance(news_item, dict) else None)
news_keys = [str(news_id)] if news_id else []
# Construct sentiment dictionary
news_sentiment_val = "POSITIVE" if is_news_positive else "NEGATIVE"
sentiment_data = {
'news_sentiment': news_sentiment_val,
'fundamental_sentiment': fundamental_rec,
'gemini_sentiment': headline_rating,
'gemini_analysis': headline_analysis,
'gemini_impact_type': headline_impact,
'context_analysis': str(context_results) # Store context results
}
success = self.db_logger.db.save_signal(
ticker=ticker,
calendar_event_keys=[],
news_keys=news_keys,
fundamental_key=fundamental_key,
sentiment=sentiment_data,
signal_position=signal_position
)
if success:
logger.info(f"✅ [SIGNAL] Saved {signal_position} signal for {ticker}")
else:
logger.error(f"❌ [SIGNAL] Failed to save signal for {ticker}")
except Exception as e:
logger.error(f"❌ [SIGNAL] Error in _generate_signal for {ticker}: {str(e)}")
class NewsProcessor:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(NewsProcessor, cls).__new__(cls)
return cls.instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.initialized = True
self.news_data = []
self.processed_news = []
self.callbacks = []
self.news_queue = queue.Queue() # Add a queue for news items
# Initialize services
self.db_logger = NewsDBLogger()
self.pre_sentiment = Plutus()
self.sentiment_analyzer = SentimentAnalyzer()
self.market_aux_service = MarketAuxService(api_token="MNWZJoLDbQKLBCBwln7TePo9nw9hdOcJWMyczdWi")
self.performance_logger = PerformanceLogger()
from news_scraper.services.fundamental_analysis_service import FundamentalAnalysisService
self.fundamental_service = FundamentalAnalysisService()
self.fundamental_service.start()
# Set up the Chain of Responsibility
self.handler_chain = FirstStepHandler(pre_sentiment=self.pre_sentiment)
second_step = SecondStepHandler(db_logger=self.db_logger)
third_step = ThirdStepHandler(sentiment_analyzer=self.sentiment_analyzer)
fourth_step = FourthStepHandler(sentiment_analyzer=self.sentiment_analyzer, market_aux_service=self.market_aux_service)
fifth_step = FifthStepHandler(db_logger=self.db_logger, fundamental_service=self.fundamental_service, performance_logger=self.performance_logger)
self.handler_chain.set_next(second_step)
second_step.set_next(third_step)
third_step.set_next(fourth_step)
fourth_step.set_next(fifth_step)
# Initialize processing variables
self.processing_task = None
self.loop = None
self.loop_thread = None
self.is_running = False
def add_news(self, news_item):
"""
Add a news item to the news queue.
"""
self.news_queue.put(news_item)
async def fifo_process_news(self):
"""
Process the news items using a FIFO approach asynchronously.
"""
while True:
if not self.news_queue.empty():
# Get the next news item from the queue
news_item = self.news_queue.get()
try:
# Process the news item through the handler chain
processed_item = self.handler_chain.process(news_item)
# Store processed item
self.processed_news.append(processed_item)
# Notify callbacks if any
for callback in self.callbacks:
await callback(processed_item)
headline = news_item['headline'] if isinstance(news_item, dict) and 'headline' in news_item else getattr(news_item, 'headline', '')
logger.info(f"✅ [COMPLETED] {headline}")
except Exception as e:
logger.error(f"❌ [ERROR] Processing news: {str(e)}")
finally:
# Mark the task as done in the queue
self.news_queue.task_done()
# Sleep to avoid high CPU usage
await asyncio.sleep(1)
def start_processing(self):
"""
Start the async processing task.
"""
logger.info("🚀 [NEWS PROCESSOR] Attempting to start news processing")
if self.is_running:
logger.info("⚠️ [NEWS PROCESSOR] Already running - ignoring start request")
return
try:
# Try to get the current event loop if we're in an asyncio context
self.loop = asyncio.get_event_loop()
if self.loop.is_running():
# We're already in a running event loop, just create the task
logger.info("📌 [NEWS PROCESSOR] Using existing running event loop")
self.processing_task = self.loop.create_task(self.fifo_process_news())
self.is_running = True
logger.info("✅ [NEWS PROCESSOR] Task created in existing event loop")
return
except RuntimeError:
# If we're not in an asyncio context, create a new loop
logger.info("🔄 [NEWS PROCESSOR] No running event loop found, creating new one")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Create and start the processing task in a new thread
def run_event_loop():
thread_id = threading.get_ident()
logger.info(f"🧵 [NEWS PROCESSOR] Thread started with ID: {thread_id}")
asyncio.set_event_loop(self.loop)
self.processing_task = self.loop.create_task(self.fifo_process_news())
logger.info("▶️ [NEWS PROCESSOR] Event loop running")
self.loop.run_forever()
self.loop_thread = threading.Thread(target=run_event_loop, daemon=True)
self.loop_thread.start()
self.is_running = True
logger.info(f"✅ [NEWS PROCESSOR] Thread started: {self.loop_thread.name}")
def stop_processing(self):
"""
Stop the async processing task.
"""
if not self.is_running:
logger.info("⚠️ [NEWS PROCESSOR] Not running")
return
if self.processing_task:
self.processing_task.cancel()
if self.loop and self.loop.is_running() and self.loop_thread:
# Stop the loop and wait for the thread to finish
self.loop.call_soon_threadsafe(self.loop.stop)
if threading.current_thread() != self.loop_thread:
self.loop_thread.join(timeout=5.0)
self.is_running = False
self.fundamental_service.stop()
logger.info("🛑 [NEWS PROCESSOR] Stopped")
def register_callback(self, callback):
"""
Register a callback function to be called after processing news items.
"""
self.callbacks.append(callback)
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Example usage
news_processor = NewsProcessor()
# Register a callback function
async def example_callback(news_item):
print(f"[PROCESSOR] [FUNC] Processing | {news_item.headline if hasattr(news_item, 'headline') else ''}")
news_processor.register_callback(example_callback)
# Start processing news items
news_processor.start_processing()
# Keep the main thread alive for testing
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
news_processor.stop_processing()
print("Exiting...")