Spaces:
Running
Running
| import time | |
| import logging | |
| from datetime import datetime | |
| import statistics | |
| import os | |
| class Timer: | |
| """ | |
| Helper class to track and log the time it takes to process news items. | |
| Measures time from when an item is added to the queue until it's processed. | |
| """ | |
| def __init__(self, log_file_path=None): | |
| self.start_times = {} # Map of news_id -> start timestamp | |
| self.processing_times = [] # Store recent processing times for stats | |
| self.max_history = 100 # Maximum number of processing times to store | |
| self.log_file_path = log_file_path or "news_processing_times.log" | |
| self.logger = self._setup_logger() | |
| self.log_to_file = False # Flag to control logging to file | |
| def _setup_logger(self): | |
| """Set up a dedicated logger for timing statistics""" | |
| logger = logging.getLogger('news_processing_timer') | |
| if not logger.handlers: | |
| logger.setLevel(logging.INFO) | |
| # Create log directory if it doesn't exist | |
| log_dir = os.path.dirname(self.log_file_path) | |
| if log_dir and not os.path.exists(log_dir): | |
| os.makedirs(log_dir) | |
| # if self.log_to_file: | |
| # # Set up file handler for logging to a file | |
| # file_handler = logging.FileHandler(self.log_file_path) | |
| # file_handler.setFormatter(logging.Formatter( | |
| # '%(asctime)s - %(levelname)s - %(message)s' | |
| # )) | |
| # logger.addHandler(file_handler) | |
| # Console handler for immediate visibility | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| )) | |
| logger.addHandler(console_handler) | |
| return logger | |
| def start_timing(self, news_item): | |
| """Start timing for a news item when it enters the queue""" | |
| # Get a unique ID for the news item - prefer id attribute if available | |
| news_id = getattr(news_item, 'id', str(id(news_item))) | |
| # Store the start time | |
| self.start_times[news_id] = time.time() | |
| # Log the event at debug level | |
| item_title = getattr(news_item, 'headline', | |
| getattr(news_item, 'title', f"ID: {news_id}")) | |
| self.logger.debug(f"Started timing for news item: {item_title}") | |
| return news_id | |
| def stop_timing(self, news_item): | |
| """Stop timing for a news item and log the processing time""" | |
| end_time = time.time() | |
| # Get a unique ID for the news item | |
| news_id = getattr(news_item, 'id', str(id(news_item))) | |
| if news_id in self.start_times: | |
| start_time = self.start_times[news_id] | |
| process_time = end_time - start_time | |
| # Get title for better logging | |
| item_title = getattr(news_item, 'headline', | |
| getattr(news_item, 'title', f"ID: {news_id}")) | |
| # Get symbols/ticker if available | |
| symbols = getattr(news_item, 'symbols', | |
| getattr(news_item, 'ticker', '')) | |
| symbols_str = ', '.join(symbols) if isinstance(symbols, list) else str(symbols) | |
| # Log the processing time | |
| self.logger.info(f"Processing time: {process_time:.4f}s - {item_title} - Symbols: {symbols_str}") | |
| # Store for statistics | |
| self.processing_times.append(process_time) | |
| if len(self.processing_times) > self.max_history: | |
| self.processing_times.pop(0) # Remove oldest | |
| # Remove the entry from the start times dictionary | |
| del self.start_times[news_id] | |
| return process_time | |
| else: | |
| self.logger.warning(f"No start time found for news item: {item_title}") | |
| return None | |
| def get_queue_stats(self): | |
| """Get statistics about items currently in the queue""" | |
| current_time = time.time() | |
| waiting_times = [] | |
| for news_id, start_time in self.start_times.items(): | |
| waiting_time = current_time - start_time | |
| waiting_times.append(waiting_time) | |
| stats = { | |
| 'queue_size': len(waiting_times), | |
| 'avg_wait_time': sum(waiting_times) / len(waiting_times) if waiting_times else 0, | |
| 'max_wait_time': max(waiting_times) if waiting_times else 0, | |
| 'min_wait_time': min(waiting_times) if waiting_times else 0 | |
| } | |
| self.logger.info(f"Queue stats: {stats['queue_size']} items, avg wait: {stats['avg_wait_time']:.2f}s, max wait: {stats['max_wait_time']:.2f}s") | |
| return stats | |
| def get_processing_stats(self): | |
| """Get statistics about recent processing times""" | |
| if not self.processing_times: | |
| return { | |
| 'count': 0, | |
| 'avg': 0, | |
| 'max': 0, | |
| 'min': 0, | |
| 'median': 0 | |
| } | |
| stats = { | |
| 'count': len(self.processing_times), | |
| 'avg': sum(self.processing_times) / len(self.processing_times), | |
| 'max': max(self.processing_times), | |
| 'min': min(self.processing_times), | |
| 'median': statistics.median(self.processing_times) if len(self.processing_times) > 0 else 0 | |
| } | |
| self.logger.info(f"Processing stats: {stats['count']} items, avg: {stats['avg']:.4f}s, median: {stats['median']:.4f}s, min: {stats['min']:.4f}s, max: {stats['max']:.4f}s") | |
| return stats | |