""" Генератор больших тестовых лог-файлов и скрипт для тестирования системы. """ import random import os from datetime import datetime, timedelta from agents import LogParserAgent, AnomalyDetectionAgent, RootCauseAgent import time def generate_log_entry(timestamp, level, message_template, **kwargs): """Генерирует одну запись лога.""" message = message_template.format(**kwargs) return f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n" def generate_log_file_1(): """Лог-файл 1: Обычные логи с редкими ошибками (3000 строк)""" lines = [] base_time = datetime(2024, 1, 15, 10, 0, 0) messages = [ "User {user_id} logged in from IP {ip}", "Request GET /api/users/{user_id} processed successfully", "Database query executed in {time}ms", "Cache hit for key: {key}", "Request POST /api/data processed in {time}ms", "Session {session_id} created", "File {filename} uploaded successfully", "Processing job {job_id} started", "Background task {task_id} completed", ] error_messages = [ "Connection timeout to external API: {api_url}", "Database connection lost, retrying...", "Invalid token received from user {user_id}", ] for i in range(3000): timestamp = base_time + timedelta(seconds=i * 2) if i % 100 == 0: # Каждая 100-я строка - ошибка level = random.choice(["ERROR", "WARNING"]) template = random.choice(error_messages) message = template.format( api_url=f"api-{random.randint(1,5)}.example.com", user_id=random.randint(1000, 9999), ) else: level = "INFO" template = random.choice(messages) message = template.format( user_id=random.randint(1000, 9999), ip=f"192.168.{random.randint(1,255)}.{random.randint(1,255)}", time=random.randint(10, 500), key=f"cache_key_{random.randint(1,100)}", session_id=f"session_{random.randint(10000,99999)}", filename=f"file_{random.randint(1,1000)}.txt", job_id=random.randint(1000, 9999), task_id=random.randint(10000, 99999), ) lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_2(): """Лог-файл 2: Burst errors (5000 строк с всплеском ошибок)""" lines = [] base_time = datetime(2024, 1, 15, 14, 0, 0) for i in range(5000): timestamp = base_time + timedelta(seconds=i) # Всплеск ошибок между 2000-2050 строками if 2000 <= i < 2050: level = random.choice(["ERROR", "ERROR", "ERROR", "CRITICAL"]) message = f"Database connection failed: unable to connect to host db-{random.randint(1,3)}.internal" elif 2050 <= i < 2060: level = "WARNING" message = f"High latency detected: {random.randint(5000, 15000)}ms response time" else: level = "INFO" message = f"Request processed: {random.choice(['GET', 'POST', 'PUT'])} /api/v1/{random.choice(['users', 'data', 'files'])}" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_3(): """Лог-файл 3: Повторяющиеся ошибки (4000 строк)""" lines = [] base_time = datetime(2024, 1, 15, 16, 0, 0) repeated_error = "Authentication failed for user admin@example.com: invalid credentials" for i in range(4000): timestamp = base_time + timedelta(seconds=i * 3) # Одна и та же ошибка повторяется каждые 50 строк if i % 50 == 0: level = "ERROR" message = repeated_error elif i % 75 == 0: level = "WARNING" message = f"Rate limit approaching: {random.randint(80, 95)}% of limit used" else: level = "INFO" message = f"HTTP {random.choice([200, 200, 200, 201, 304])} {random.choice(['GET', 'POST'])} /api/{random.choice(['users', 'orders', 'products'])}" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_4(): """Лог-файл 4: Stack traces (3500 строк)""" lines = [] base_time = datetime(2024, 1, 15, 18, 0, 0) stack_trace = """Traceback (most recent call last): File "/app/services/api.py", line {line}, in process_request result = external_api.call(data) File "/app/lib/external_api.py", line {line2}, in call raise ConnectionError("Service unavailable") ConnectionError: Service unavailable""" for i in range(3500): timestamp = base_time + timedelta(seconds=i * 2) if i % 200 == 0: level = "ERROR" message = stack_trace.format( line=random.randint(100, 500), line2=random.randint(50, 300) ) else: level = random.choice(["INFO", "DEBUG"]) message = f"Processing request {random.randint(10000, 99999)}" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_5(): """Лог-файл 5: Временные всплески (4500 строк)""" lines = [] base_time = datetime(2024, 1, 16, 8, 0, 0) for i in range(4500): # Группируем по минутам для создания всплесков timestamp = base_time + timedelta(minutes=i // 60, seconds=i % 60) # Всплески в определённые минуты minute = (i // 60) % 60 if minute in [5, 15, 25, 35, 45]: # Много событий в эти минуты level = random.choice(["INFO", "INFO", "INFO", "WARNING", "ERROR"]) message = f"High traffic: {random.randint(100, 1000)} requests/min" else: level = "INFO" message = f"Normal traffic: {random.randint(10, 50)} requests/min" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_6(): """Лог-файл 6: Ошибка перед крашем (3000 строк)""" lines = [] base_time = datetime(2024, 1, 16, 12, 0, 0) for i in range(3000): timestamp = base_time + timedelta(seconds=i) # Последние 50 строк - критические ошибки if i >= 2950: level = random.choice(["CRITICAL", "ERROR"]) messages = [ "Out of memory: cannot allocate additional resources", "Fatal error: database connection pool exhausted", "Critical: unable to process requests, system overloaded", "ERROR: Service unavailable, shutting down", ] message = random.choice(messages) elif i >= 2900: level = "ERROR" message = f"System resource exhaustion detected: memory usage {random.randint(95, 99)}%" else: level = random.choice(["INFO", "DEBUG"]) message = f"System operation: {random.choice(['cache_update', 'db_query', 'api_call'])}" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_7(): """Лог-файл 7: Разнообразные форматы логов (4000 строк)""" lines = [] base_time = datetime(2024, 1, 16, 14, 30, 0) formats = [ "{timestamp} [{level}] {message}", "[{timestamp}] {level}: {message}", "{timestamp} {level} - {message}", ] for i in range(4000): timestamp = base_time + timedelta(seconds=i * 2) level = random.choice(["INFO", "WARNING", "ERROR", "DEBUG"]) if level == "ERROR" and i % 100 == 0: message = f"Error processing transaction {random.randint(100000, 999999)}" else: message = f"Event {i}: {random.choice(['user_action', 'system_check', 'data_sync'])}" fmt = random.choice(formats) if fmt.startswith("["): lines.append(fmt.format( timestamp=timestamp.strftime('%Y-%m-%d %H:%M:%S'), level=level, message=message ) + "\n") else: lines.append(fmt.format( timestamp=timestamp.strftime('%Y-%m-%d %H:%M:%S'), level=level, message=message ) + "\n") return ''.join(lines) def generate_log_file_8(): """Лог-файл 8: Смешанные паттерны (5000 строк)""" lines = [] base_time = datetime(2024, 1, 17, 9, 0, 0) for i in range(5000): timestamp = base_time + timedelta(seconds=i) # Разные паттерны в разных секциях if 1000 <= i < 1100: # Burst errors level = "ERROR" message = f"API endpoint /api/data failed: {random.choice(['timeout', '500', 'connection refused'])}" elif 2000 <= i < 2100 and i % 10 == 0: # Repeated errors level = "ERROR" message = "Validation error: email format is invalid" elif 3000 <= i < 3050: # Stack traces level = "ERROR" message = f"Exception in handler: ValueError at line {random.randint(1, 500)}" elif i >= 4900: # Error before crash level = random.choice(["CRITICAL", "ERROR"]) message = "System failure: critical service unavailable" else: level = "INFO" message = f"Normal operation: {random.choice(['request', 'response', 'cache', 'db'])} processed" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_9(): """Лог-файл 9: Web server logs format (4500 строк)""" lines = [] base_time = datetime(2024, 1, 17, 15, 0, 0) ips = [f"192.168.{x}.{y}" for x in range(1, 10) for y in range(1, 50)] for i in range(4500): timestamp = base_time + timedelta(seconds=i) ip = random.choice(ips) method = random.choice(["GET", "POST", "PUT", "DELETE"]) endpoint = random.choice(["/api/users", "/api/orders", "/api/products", "/static/css", "/static/js"]) status = random.choice([200, 200, 200, 201, 404, 500, 503]) if status >= 500: level = "ERROR" elif status >= 400: level = "WARNING" else: level = "INFO" message = f'{ip} - - [{timestamp.strftime("%d/%b/%Y:%H:%M:%S")}] "{method} {endpoint} HTTP/1.1" {status} {random.randint(100, 5000)}' lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def generate_log_file_10(): """Лог-файл 10: Application logs с метаданными (4000 строк)""" lines = [] base_time = datetime(2024, 1, 18, 10, 0, 0) for i in range(4000): timestamp = base_time + timedelta(seconds=i * 2) # Периодические проблемы if i % 300 == 0: level = "ERROR" message = f"Service health check failed: service-{random.randint(1, 5)}.internal is down" elif i % 150 == 0: level = "WARNING" message = f"Performance degradation: p95 latency increased to {random.randint(1000, 5000)}ms" elif 3500 <= i < 3600: # Проблемы перед концом level = random.choice(["ERROR", "WARNING"]) message = f"Resource constraint: {random.choice(['CPU', 'Memory', 'Disk'])} usage critical" else: level = "INFO" message = f"[thread-{random.randint(1, 20)}] Processing job {random.randint(10000, 99999)}: status={random.choice(['completed', 'in_progress'])}" lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n") return ''.join(lines) def test_log_file(content, file_num): """Тестирует обработку одного лог-файла.""" print(f"\n{'='*60}") print(f"Testing log file {file_num}") print(f"{'='*60}") # Подсчёт строк line_count = len(content.split('\n')) print(f"Lines in file: {line_count}") # Замер времени start_time = time.time() # Agent 1: Парсинг parser = LogParserAgent() parsed_start = time.time() structured_data = parser.parse(content) parsed_time = time.time() - parsed_start events_count = len(structured_data.get('events', [])) errors_count = len(structured_data.get('errors', [])) warnings_count = len(structured_data.get('warnings', [])) print(f"\n[OK] Agent 1 (Parser): {parsed_time:.2f} sec") print(f" - Events: {events_count}") print(f" - Errors: {errors_count}") print(f" - Warnings: {warnings_count}") # Agent 2: Обнаружение аномалий anomaly_start = time.time() anomaly_agent = AnomalyDetectionAgent() anomaly_report = anomaly_agent.detect(structured_data) anomaly_time = time.time() - anomaly_start anomalies_count = len(anomaly_report.get('anomalies', [])) print(f"\n[OK] Agent 2 (Anomaly Detection): {anomaly_time:.2f} sec") print(f" - Anomalies detected: {anomalies_count}") if anomalies_count > 0: by_type = anomaly_report.get('statistics', {}).get('by_type', {}) for anomaly_type, count in by_type.items(): print(f" - {anomaly_type}: {count}") # Agent 3: Анализ первопричин rca_start = time.time() rca_agent = RootCauseAgent() recommendations = rca_agent.analyze(anomaly_report) rca_time = time.time() - rca_start print(f"\n[OK] Agent 3 (Root Cause Analysis): {rca_time:.2f} sec") print(f" - Report size: {len(recommendations)} characters") total_time = time.time() - start_time print(f"\n[TIME] Total processing time: {total_time:.2f} sec") print(f" Speed: {line_count / total_time:.0f} lines/sec") return { 'file_num': file_num, 'lines': line_count, 'events': events_count, 'errors': errors_count, 'warnings': warnings_count, 'anomalies': anomalies_count, 'parsed_time': parsed_time, 'anomaly_time': anomaly_time, 'rca_time': rca_time, 'total_time': total_time } def main(): """Главная функция для генерации и тестирования.""" print("=" * 60) print("ГЕНЕРАЦИЯ И ТЕСТИРОВАНИЕ БОЛЬШИХ ЛОГ-ФАЙЛОВ") print("=" * 60) # Создаём папку для тестовых файлов test_dir = "test_logs" os.makedirs(test_dir, exist_ok=True) # Генераторы лог-файлов generators = [ ("normal_logs.log", generate_log_file_1), ("burst_errors.log", generate_log_file_2), ("repeated_errors.log", generate_log_file_3), ("stack_traces.log", generate_log_file_4), ("temporal_spikes.log", generate_log_file_5), ("error_before_crash.log", generate_log_file_6), ("mixed_formats.log", generate_log_file_7), ("mixed_patterns.log", generate_log_file_8), ("web_server.log", generate_log_file_9), ("application_metadata.log", generate_log_file_10), ] # Генерируем файлы print(f"\n[GENERATING] Generating {len(generators)} test files...") files_data = [] for filename, generator in generators: filepath = os.path.join(test_dir, filename) print(f" Generating: {filename}...", end=" ") content = generator() with open(filepath, 'w', encoding='utf-8') as f: f.write(content) line_count = len(content.split('\n')) file_size = len(content.encode('utf-8')) / 1024 # KB print(f"OK ({line_count} lines, {file_size:.1f} KB)") files_data.append((filepath, content)) print(f"\n[SUCCESS] All files created in '{test_dir}' folder") # Тестируем каждый файл print(f"\n[TESTING] Starting tests...") results = [] for i, (filepath, content) in enumerate(files_data, 1): result = test_log_file(content, i) results.append(result) # Итоговая статистика print(f"\n\n{'='*60}") print("SUMMARY STATISTICS") print(f"{'='*60}") print(f"\n{'#':<3} {'Lines':<8} {'Time (sec)':<12} {'Lines/sec':<12} {'Anomalies':<10}") print("-" * 60) total_lines = 0 total_time = 0 for result in results: speed = result['lines'] / result['total_time'] if result['total_time'] > 0 else 0 print(f"{result['file_num']:<3} {result['lines']:<8} {result['total_time']:<12.2f} {speed:<12.0f} {result['anomalies']:<10}") total_lines += result['lines'] total_time += result['total_time'] print("-" * 60) avg_speed = total_lines / total_time if total_time > 0 else 0 print(f"{'TOTAL':<3} {total_lines:<8} {total_time:<12.2f} {avg_speed:<12.0f}") print(f"\n[SUCCESS] Testing completed!") print(f" Total processed: {total_lines} lines in {total_time:.2f} seconds") print(f" Average speed: {avg_speed:.0f} lines/sec") # Проверка производительности if total_time > 100: # Если больше 100 секунд для всех файлов print(f"\n[WARNING] Total processing time exceeds 100 seconds") else: print(f"\n[OK] Performance is within normal range (<100 sec for all files)") if __name__ == "__main__": main()