import time import logging from datetime import datetime import statistics import os class Timer: """ Helper class to track and log the time it takes to process news items. Measures time from when an item is added to the queue until it's processed. """ def __init__(self, log_file_path=None): self.start_times = {} # Map of news_id -> start timestamp self.processing_times = [] # Store recent processing times for stats self.max_history = 100 # Maximum number of processing times to store self.log_file_path = log_file_path or "news_processing_times.log" self.logger = self._setup_logger() self.log_to_file = False # Flag to control logging to file def _setup_logger(self): """Set up a dedicated logger for timing statistics""" logger = logging.getLogger('news_processing_timer') if not logger.handlers: logger.setLevel(logging.INFO) # Create log directory if it doesn't exist log_dir = os.path.dirname(self.log_file_path) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) # if self.log_to_file: # # Set up file handler for logging to a file # file_handler = logging.FileHandler(self.log_file_path) # file_handler.setFormatter(logging.Formatter( # '%(asctime)s - %(levelname)s - %(message)s' # )) # logger.addHandler(file_handler) # Console handler for immediate visibility console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) logger.addHandler(console_handler) return logger def start_timing(self, news_item): """Start timing for a news item when it enters the queue""" # Get a unique ID for the news item - prefer id attribute if available news_id = getattr(news_item, 'id', str(id(news_item))) # Store the start time self.start_times[news_id] = time.time() # Log the event at debug level item_title = getattr(news_item, 'headline', getattr(news_item, 'title', f"ID: {news_id}")) self.logger.debug(f"Started timing for news item: {item_title}") return news_id def stop_timing(self, news_item): """Stop timing for a news item and log the processing time""" end_time = time.time() # Get a unique ID for the news item news_id = getattr(news_item, 'id', str(id(news_item))) if news_id in self.start_times: start_time = self.start_times[news_id] process_time = end_time - start_time # Get title for better logging item_title = getattr(news_item, 'headline', getattr(news_item, 'title', f"ID: {news_id}")) # Get symbols/ticker if available symbols = getattr(news_item, 'symbols', getattr(news_item, 'ticker', '')) symbols_str = ', '.join(symbols) if isinstance(symbols, list) else str(symbols) # Log the processing time self.logger.info(f"Processing time: {process_time:.4f}s - {item_title} - Symbols: {symbols_str}") # Store for statistics self.processing_times.append(process_time) if len(self.processing_times) > self.max_history: self.processing_times.pop(0) # Remove oldest # Remove the entry from the start times dictionary del self.start_times[news_id] return process_time else: self.logger.warning(f"No start time found for news item: {item_title}") return None def get_queue_stats(self): """Get statistics about items currently in the queue""" current_time = time.time() waiting_times = [] for news_id, start_time in self.start_times.items(): waiting_time = current_time - start_time waiting_times.append(waiting_time) stats = { 'queue_size': len(waiting_times), 'avg_wait_time': sum(waiting_times) / len(waiting_times) if waiting_times else 0, 'max_wait_time': max(waiting_times) if waiting_times else 0, 'min_wait_time': min(waiting_times) if waiting_times else 0 } self.logger.info(f"Queue stats: {stats['queue_size']} items, avg wait: {stats['avg_wait_time']:.2f}s, max wait: {stats['max_wait_time']:.2f}s") return stats def get_processing_stats(self): """Get statistics about recent processing times""" if not self.processing_times: return { 'count': 0, 'avg': 0, 'max': 0, 'min': 0, 'median': 0 } stats = { 'count': len(self.processing_times), 'avg': sum(self.processing_times) / len(self.processing_times), 'max': max(self.processing_times), 'min': min(self.processing_times), 'median': statistics.median(self.processing_times) if len(self.processing_times) > 0 else 0 } self.logger.info(f"Processing stats: {stats['count']} items, avg: {stats['avg']:.4f}s, median: {stats['median']:.4f}s, min: {stats['min']:.4f}s, max: {stats['max']:.4f}s") return stats