import gradio as gr import torch import pandas as pd import io import os from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel # ---------------------------------------------------------------------- # 1. MODEL SETUP (Load only once) # ---------------------------------------------------------------------- BASE_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" LORA_ADAPTER_ID = "RootSystem2101/ZeroCyber-SLM-LoRA-Adapter" def load_zerocyber_model(): print("Loading Tokenizer...") tokenizer = AutoTokenizer.from_pretrained(LORA_ADAPTER_ID) print("Loading Base Model in 4-bit...") # التحذيرات حول load_in_4bit طبيعية وسيتم تجاهلها model = AutoModelForCausalLM.from_pretrained( BASE_MODEL_ID, load_in_4bit=True, torch_dtype=torch.float16, device_map="auto" ) print("Merging LoRA Adapter...") model = PeftModel.from_pretrained(model, LORA_ADAPTER_ID) # ملاحظة: التحذير حول Merge لـ 4-bit طبيعي model = model.merge_and_unload() model.eval() return tokenizer, model try: ZEROCYBER_TOKENIZER, ZEROCYBER_MODEL = load_zerocyber_model() except Exception as e: print(f"FATAL ERROR during model loading: {e}") ZEROCYBER_TOKENIZER = None ZEROCYBER_MODEL = None # ---------------------------------------------------------------------- # 2. CORE INFERENCE FUNCTIONS (FASTEST GENERATION MODE) # ---------------------------------------------------------------------- def generate_response(prompt_text: str): """وظيفة توليد الاستجابة المُسرَّعة القصوى (Greedy Search).""" if ZEROCYBER_MODEL is None: return "❌ Model loading failed. Please check the command line for errors." formatted_prompt = f"[INST] {prompt_text} [/INST]" inputs = ZEROCYBER_TOKENIZER(formatted_prompt, return_tensors="pt").to(ZEROCYBER_MODEL.device) try: with torch.no_grad(): outputs = ZEROCYBER_MODEL.generate( **inputs, max_new_tokens=1024, # تقليل الكلمات لضمان سرعة عالية جداً do_sample=False, # إيقاف أخذ العينات العشوائية (أسرع طريقة) pad_token_id=ZEROCYBER_TOKENIZER.eos_token_id ) response = ZEROCYBER_TOKENIZER.decode(outputs[0], skip_special_tokens=True) return response.split("[/INST]")[1].strip() except Exception as e: return f"❌ Internal Error during Inference: {e}" def analyze_log_file(file_path: str): """وظيفة تحليل ملف Log/CSV بأمان ضد مشاكل الترميز.""" # 1. Safely read file content using common encodings try: with open(file_path, 'r', encoding='utf-8', errors='strict') as f: log_content = f.read() except UnicodeDecodeError: try: with open(file_path, 'r', encoding='latin-1', errors='strict') as f: log_content = f.read() except Exception as e: return f"❌ File Reading Error: {e}\nCould not read the file using common text encodings." if not log_content.strip(): return "⚠️ Uploaded file is empty or does not contain readable text content." # 2. Prompt Engineering for Cybersecurity Report (Arabic language enforced) truncated_content = log_content[:5000] prompt = f""" You are a specialized cybersecurity analyst. Analyze the following log file content. Your task is to: 1. Identify the most critical security events or errors. 2. Pinpoint suspicious patterns or explicit attack attempts. 3. **Generate a structured report in ARABIC (اللغة العربية)** including a clear summary and recommendations. 4. Provide immediate, actionable steps for defenders (Defenders) in a bulleted list. Log Content (Truncated): --- {truncated_content} --- """ print(f"Analyzing log content from file: {os.path.basename(file_path)}") return generate_response(prompt) # ---------------------------------------------------------------------- # 3. UNIFIED GRADIO INTERFACE LOGIC # ---------------------------------------------------------------------- def unified_interface(question: str, log_file): """Handles either text input or file upload.""" if log_file is not None: return analyze_log_file(log_file.name) elif question.strip(): print(f"Received question: {question}") # Language steering if any(c in question for c in 'ءآأبتثجحخدذرزسشصضطظعغفقكلمنهويى'): prompt_with_lang = f"أجب باللغة العربية. السؤال هو: {question}" else: prompt_with_lang = f"Answer in English. The question is: {question}" return generate_response(prompt_with_lang) else: return "Please submit a question or upload a file for analysis." # ---------------------------------------------------------------------- # 4. GRADIO INTERFACE BUILD (Professional English Titles) # ---------------------------------------------------------------------- if __name__ == "__main__": input_components = [ gr.Textbox(label="1. Ask your Cybersecurity Inquiry:", placeholder="Example: What are the steps to secure a web server?"), gr.File(label="2. Or Upload any Log/Text File for Analysis:", file_types=None) ] output_component = gr.Markdown(label="ZeroCyber-SLM Report / Response") interface = gr.Interface( fn=unified_interface, inputs=input_components, outputs=output_component, # العناوين المطلوبة باللغة الإنجليزية title="ZeroCyber-SLM: Security analysis and response platform", description="A specialized application for responding to security inquiries and analyzing Log/CSV files to identify incidents and provide actionable recommendations for defenders.", allow_flagging="never" ) if ZEROCYBER_MODEL is not None: interface.launch(share=True) else: print("\n❌ Interface failed to start due to model loading failure.")