Spaces:
Running
Running
File size: 5,749 Bytes
3fe0726 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import time
import logging
from datetime import datetime
import statistics
import os
class Timer:
"""
Helper class to track and log the time it takes to process news items.
Measures time from when an item is added to the queue until it's processed.
"""
def __init__(self, log_file_path=None):
self.start_times = {} # Map of news_id -> start timestamp
self.processing_times = [] # Store recent processing times for stats
self.max_history = 100 # Maximum number of processing times to store
self.log_file_path = log_file_path or "news_processing_times.log"
self.logger = self._setup_logger()
self.log_to_file = False # Flag to control logging to file
def _setup_logger(self):
"""Set up a dedicated logger for timing statistics"""
logger = logging.getLogger('news_processing_timer')
if not logger.handlers:
logger.setLevel(logging.INFO)
# Create log directory if it doesn't exist
log_dir = os.path.dirname(self.log_file_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# if self.log_to_file:
# # Set up file handler for logging to a file
# file_handler = logging.FileHandler(self.log_file_path)
# file_handler.setFormatter(logging.Formatter(
# '%(asctime)s - %(levelname)s - %(message)s'
# ))
# logger.addHandler(file_handler)
# Console handler for immediate visibility
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(console_handler)
return logger
def start_timing(self, news_item):
"""Start timing for a news item when it enters the queue"""
# Get a unique ID for the news item - prefer id attribute if available
news_id = getattr(news_item, 'id', str(id(news_item)))
# Store the start time
self.start_times[news_id] = time.time()
# Log the event at debug level
item_title = getattr(news_item, 'headline',
getattr(news_item, 'title', f"ID: {news_id}"))
self.logger.debug(f"Started timing for news item: {item_title}")
return news_id
def stop_timing(self, news_item):
"""Stop timing for a news item and log the processing time"""
end_time = time.time()
# Get a unique ID for the news item
news_id = getattr(news_item, 'id', str(id(news_item)))
if news_id in self.start_times:
start_time = self.start_times[news_id]
process_time = end_time - start_time
# Get title for better logging
item_title = getattr(news_item, 'headline',
getattr(news_item, 'title', f"ID: {news_id}"))
# Get symbols/ticker if available
symbols = getattr(news_item, 'symbols',
getattr(news_item, 'ticker', ''))
symbols_str = ', '.join(symbols) if isinstance(symbols, list) else str(symbols)
# Log the processing time
self.logger.info(f"Processing time: {process_time:.4f}s - {item_title} - Symbols: {symbols_str}")
# Store for statistics
self.processing_times.append(process_time)
if len(self.processing_times) > self.max_history:
self.processing_times.pop(0) # Remove oldest
# Remove the entry from the start times dictionary
del self.start_times[news_id]
return process_time
else:
self.logger.warning(f"No start time found for news item: {item_title}")
return None
def get_queue_stats(self):
"""Get statistics about items currently in the queue"""
current_time = time.time()
waiting_times = []
for news_id, start_time in self.start_times.items():
waiting_time = current_time - start_time
waiting_times.append(waiting_time)
stats = {
'queue_size': len(waiting_times),
'avg_wait_time': sum(waiting_times) / len(waiting_times) if waiting_times else 0,
'max_wait_time': max(waiting_times) if waiting_times else 0,
'min_wait_time': min(waiting_times) if waiting_times else 0
}
self.logger.info(f"Queue stats: {stats['queue_size']} items, avg wait: {stats['avg_wait_time']:.2f}s, max wait: {stats['max_wait_time']:.2f}s")
return stats
def get_processing_stats(self):
"""Get statistics about recent processing times"""
if not self.processing_times:
return {
'count': 0,
'avg': 0,
'max': 0,
'min': 0,
'median': 0
}
stats = {
'count': len(self.processing_times),
'avg': sum(self.processing_times) / len(self.processing_times),
'max': max(self.processing_times),
'min': min(self.processing_times),
'median': statistics.median(self.processing_times) if len(self.processing_times) > 0 else 0
}
self.logger.info(f"Processing stats: {stats['count']} items, avg: {stats['avg']:.4f}s, median: {stats['median']:.4f}s, min: {stats['min']:.4f}s, max: {stats['max']:.4f}s")
return stats
|