Papaflessas's picture
Deploy Signal Generator app
3fe0726
import time
import logging
from datetime import datetime
import statistics
import os
class Timer:
"""
Helper class to track and log the time it takes to process news items.
Measures time from when an item is added to the queue until it's processed.
"""
def __init__(self, log_file_path=None):
self.start_times = {} # Map of news_id -> start timestamp
self.processing_times = [] # Store recent processing times for stats
self.max_history = 100 # Maximum number of processing times to store
self.log_file_path = log_file_path or "news_processing_times.log"
self.logger = self._setup_logger()
self.log_to_file = False # Flag to control logging to file
def _setup_logger(self):
"""Set up a dedicated logger for timing statistics"""
logger = logging.getLogger('news_processing_timer')
if not logger.handlers:
logger.setLevel(logging.INFO)
# Create log directory if it doesn't exist
log_dir = os.path.dirname(self.log_file_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# if self.log_to_file:
# # Set up file handler for logging to a file
# file_handler = logging.FileHandler(self.log_file_path)
# file_handler.setFormatter(logging.Formatter(
# '%(asctime)s - %(levelname)s - %(message)s'
# ))
# logger.addHandler(file_handler)
# Console handler for immediate visibility
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(console_handler)
return logger
def start_timing(self, news_item):
"""Start timing for a news item when it enters the queue"""
# Get a unique ID for the news item - prefer id attribute if available
news_id = getattr(news_item, 'id', str(id(news_item)))
# Store the start time
self.start_times[news_id] = time.time()
# Log the event at debug level
item_title = getattr(news_item, 'headline',
getattr(news_item, 'title', f"ID: {news_id}"))
self.logger.debug(f"Started timing for news item: {item_title}")
return news_id
def stop_timing(self, news_item):
"""Stop timing for a news item and log the processing time"""
end_time = time.time()
# Get a unique ID for the news item
news_id = getattr(news_item, 'id', str(id(news_item)))
if news_id in self.start_times:
start_time = self.start_times[news_id]
process_time = end_time - start_time
# Get title for better logging
item_title = getattr(news_item, 'headline',
getattr(news_item, 'title', f"ID: {news_id}"))
# Get symbols/ticker if available
symbols = getattr(news_item, 'symbols',
getattr(news_item, 'ticker', ''))
symbols_str = ', '.join(symbols) if isinstance(symbols, list) else str(symbols)
# Log the processing time
self.logger.info(f"Processing time: {process_time:.4f}s - {item_title} - Symbols: {symbols_str}")
# Store for statistics
self.processing_times.append(process_time)
if len(self.processing_times) > self.max_history:
self.processing_times.pop(0) # Remove oldest
# Remove the entry from the start times dictionary
del self.start_times[news_id]
return process_time
else:
self.logger.warning(f"No start time found for news item: {item_title}")
return None
def get_queue_stats(self):
"""Get statistics about items currently in the queue"""
current_time = time.time()
waiting_times = []
for news_id, start_time in self.start_times.items():
waiting_time = current_time - start_time
waiting_times.append(waiting_time)
stats = {
'queue_size': len(waiting_times),
'avg_wait_time': sum(waiting_times) / len(waiting_times) if waiting_times else 0,
'max_wait_time': max(waiting_times) if waiting_times else 0,
'min_wait_time': min(waiting_times) if waiting_times else 0
}
self.logger.info(f"Queue stats: {stats['queue_size']} items, avg wait: {stats['avg_wait_time']:.2f}s, max wait: {stats['max_wait_time']:.2f}s")
return stats
def get_processing_stats(self):
"""Get statistics about recent processing times"""
if not self.processing_times:
return {
'count': 0,
'avg': 0,
'max': 0,
'min': 0,
'median': 0
}
stats = {
'count': len(self.processing_times),
'avg': sum(self.processing_times) / len(self.processing_times),
'max': max(self.processing_times),
'min': min(self.processing_times),
'median': statistics.median(self.processing_times) if len(self.processing_times) > 0 else 0
}
self.logger.info(f"Processing stats: {stats['count']} items, avg: {stats['avg']:.4f}s, median: {stats['median']:.4f}s, min: {stats['min']:.4f}s, max: {stats['max']:.4f}s")
return stats