Rootsystem2101's picture
Upload 2 files
a78e7ed verified
raw
history blame
6.35 kB
import gradio as gr
import torch
import pandas as pd
import io
import os
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
# ----------------------------------------------------------------------
# 1. MODEL SETUP (Load only once)
# ----------------------------------------------------------------------
BASE_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2"
LORA_ADAPTER_ID = "RootSystem2101/ZeroCyber-SLM-LoRA-Adapter"
def load_zerocyber_model():
print("Loading Tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(LORA_ADAPTER_ID)
print("Loading Base Model in 4-bit...")
# التحذيرات حول load_in_4bit طبيعية وسيتم تجاهلها
model = AutoModelForCausalLM.from_pretrained(
BASE_MODEL_ID,
load_in_4bit=True,
torch_dtype=torch.float16,
device_map="auto"
)
print("Merging LoRA Adapter...")
model = PeftModel.from_pretrained(model, LORA_ADAPTER_ID)
# ملاحظة: التحذير حول Merge لـ 4-bit طبيعي
model = model.merge_and_unload()
model.eval()
return tokenizer, model
try:
ZEROCYBER_TOKENIZER, ZEROCYBER_MODEL = load_zerocyber_model()
except Exception as e:
print(f"FATAL ERROR during model loading: {e}")
ZEROCYBER_TOKENIZER = None
ZEROCYBER_MODEL = None
# ----------------------------------------------------------------------
# 2. CORE INFERENCE FUNCTIONS (FASTEST GENERATION MODE)
# ----------------------------------------------------------------------
def generate_response(prompt_text: str):
"""وظيفة توليد الاستجابة المُسرَّعة القصوى (Greedy Search)."""
if ZEROCYBER_MODEL is None:
return "❌ Model loading failed. Please check the command line for errors."
formatted_prompt = f"<s>[INST] {prompt_text} [/INST]"
inputs = ZEROCYBER_TOKENIZER(formatted_prompt, return_tensors="pt").to(ZEROCYBER_MODEL.device)
try:
with torch.no_grad():
outputs = ZEROCYBER_MODEL.generate(
**inputs,
max_new_tokens=1024, # تقليل الكلمات لضمان سرعة عالية جداً
do_sample=False, # إيقاف أخذ العينات العشوائية (أسرع طريقة)
pad_token_id=ZEROCYBER_TOKENIZER.eos_token_id
)
response = ZEROCYBER_TOKENIZER.decode(outputs[0], skip_special_tokens=True)
return response.split("[/INST]")[1].strip()
except Exception as e:
return f"❌ Internal Error during Inference: {e}"
def analyze_log_file(file_path: str):
"""وظيفة تحليل ملف Log/CSV بأمان ضد مشاكل الترميز."""
# 1. Safely read file content using common encodings
try:
with open(file_path, 'r', encoding='utf-8', errors='strict') as f:
log_content = f.read()
except UnicodeDecodeError:
try:
with open(file_path, 'r', encoding='latin-1', errors='strict') as f:
log_content = f.read()
except Exception as e:
return f"❌ File Reading Error: {e}\nCould not read the file using common text encodings."
if not log_content.strip():
return "⚠️ Uploaded file is empty or does not contain readable text content."
# 2. Prompt Engineering for Cybersecurity Report (Arabic language enforced)
truncated_content = log_content[:5000]
prompt = f"""
You are a specialized cybersecurity analyst. Analyze the following log file content.
Your task is to:
1. Identify the most critical security events or errors.
2. Pinpoint suspicious patterns or explicit attack attempts.
3. **Generate a structured report in ARABIC (اللغة العربية)** including a clear summary and recommendations.
4. Provide immediate, actionable steps for defenders (Defenders) in a bulleted list.
Log Content (Truncated):
---
{truncated_content}
---
"""
print(f"Analyzing log content from file: {os.path.basename(file_path)}")
return generate_response(prompt)
# ----------------------------------------------------------------------
# 3. UNIFIED GRADIO INTERFACE LOGIC
# ----------------------------------------------------------------------
def unified_interface(question: str, log_file):
"""Handles either text input or file upload."""
if log_file is not None:
return analyze_log_file(log_file.name)
elif question.strip():
print(f"Received question: {question}")
# Language steering
if any(c in question for c in 'ءآأبتثجحخدذرزسشصضطظعغفقكلمنهويى'):
prompt_with_lang = f"أجب باللغة العربية. السؤال هو: {question}"
else:
prompt_with_lang = f"Answer in English. The question is: {question}"
return generate_response(prompt_with_lang)
else:
return "Please submit a question or upload a file for analysis."
# ----------------------------------------------------------------------
# 4. GRADIO INTERFACE BUILD (Professional English Titles)
# ----------------------------------------------------------------------
if __name__ == "__main__":
input_components = [
gr.Textbox(label="1. Ask your Cybersecurity Inquiry:", placeholder="Example: What are the steps to secure a web server?"),
gr.File(label="2. Or Upload any Log/Text File for Analysis:", file_types=None)
]
output_component = gr.Markdown(label="ZeroCyber-SLM Report / Response")
interface = gr.Interface(
fn=unified_interface,
inputs=input_components,
outputs=output_component,
# العناوين المطلوبة باللغة الإنجليزية
title="ZeroCyber-SLM: Security analysis and response platform",
description="A specialized application for responding to security inquiries and analyzing Log/CSV files to identify incidents and provide actionable recommendations for defenders.",
allow_flagging="never"
)
if ZEROCYBER_MODEL is not None:
interface.launch(share=True)
else:
print("\n❌ Interface failed to start due to model loading failure.")