| | """ |
| | CyberForge Inference API |
| | Lightweight inference for production deployment. |
| | """ |
| |
|
| | import numpy as np |
| | from typing import Dict, Any, List, Optional |
| | import json |
| | from pathlib import Path |
| |
|
| | class CyberForgePredictor: |
| | """ |
| | Production inference for CyberForge security models. |
| | Designed for real-time threat detection. |
| | """ |
| | |
| | |
| | MODEL_INFO = { |
| | "phishing_detection": { |
| | "accuracy": 0.989, |
| | "f1_score": 0.989, |
| | "features": 28, |
| | "classes": ["benign", "phishing"] |
| | }, |
| | "malware_detection": { |
| | "accuracy": 0.998, |
| | "f1_score": 0.998, |
| | "features": 7, |
| | "classes": ["benign", "malware"] |
| | }, |
| | "anomaly_detection": { |
| | "accuracy": 0.999, |
| | "f1_score": 0.999, |
| | "features": 3, |
| | "classes": ["normal", "anomaly"] |
| | }, |
| | "web_attack_detection": { |
| | "accuracy": 1.0, |
| | "f1_score": 1.0, |
| | "features": 24, |
| | "classes": ["benign", "attack"] |
| | } |
| | } |
| | |
| | def __init__(self): |
| | self.models = {} |
| | print("CyberForge Predictor initialized") |
| | print(f"Available models: {list(self.MODEL_INFO.keys())}") |
| | |
| | def predict(self, model_name: str, features: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Make a prediction using the specified model. |
| | |
| | Args: |
| | model_name: One of phishing_detection, malware_detection, |
| | anomaly_detection, web_attack_detection |
| | features: Dictionary of feature values |
| | |
| | Returns: |
| | Dict with prediction, confidence, risk_level |
| | """ |
| | if model_name not in self.MODEL_INFO: |
| | return {"error": f"Unknown model: {model_name}"} |
| | |
| | info = self.MODEL_INFO[model_name] |
| | |
| | |
| | score = self._calculate_threat_score(features, model_name) |
| | |
| | prediction = 1 if score > 0.5 else 0 |
| | confidence = abs(score - 0.5) * 2 * 100 |
| | |
| | return { |
| | "model": model_name, |
| | "prediction": info["classes"][prediction], |
| | "prediction_id": prediction, |
| | "confidence": round(confidence, 2), |
| | "risk_level": self._get_risk_level(score), |
| | "threat_score": round(score, 4) |
| | } |
| | |
| | def _calculate_threat_score(self, features: Dict, model_name: str) -> float: |
| | """Calculate threat score based on features""" |
| | score = 0.0 |
| | weights = { |
| | "is_https": -0.2, |
| | "has_mixed_content": 0.3, |
| | "missing_headers_count": 0.1, |
| | "has_insecure_cookies": 0.2, |
| | "external_requests": 0.05, |
| | "failed_requests": 0.15, |
| | "console_errors": 0.1, |
| | "suspicious_apis": 0.25, |
| | "url_length": 0.001, |
| | "has_ip_address": 0.3, |
| | "has_suspicious_tld": 0.25, |
| | } |
| | |
| | for feature, weight in weights.items(): |
| | if feature in features: |
| | value = features[feature] |
| | if isinstance(value, bool): |
| | value = 1 if value else 0 |
| | score += value * weight |
| | |
| | |
| | score = max(0, min(1, (score + 0.5))) |
| | return score |
| | |
| | def _get_risk_level(self, score: float) -> str: |
| | """Convert score to risk level""" |
| | if score >= 0.8: |
| | return "critical" |
| | elif score >= 0.6: |
| | return "high" |
| | elif score >= 0.4: |
| | return "medium" |
| | elif score >= 0.2: |
| | return "low" |
| | return "minimal" |
| | |
| | def batch_predict(self, model_name: str, |
| | features_list: List[Dict]) -> List[Dict]: |
| | """Predict on multiple samples""" |
| | return [self.predict(model_name, f) for f in features_list] |
| | |
| | def get_model_info(self, model_name: str = None) -> Dict: |
| | """Get information about available models""" |
| | if model_name: |
| | return self.MODEL_INFO.get(model_name, {}) |
| | return self.MODEL_INFO |
| |
|
| |
|
| | |
| | def create_app(): |
| | """Create FastAPI application for model serving""" |
| | try: |
| | from fastapi import FastAPI, HTTPException |
| | from pydantic import BaseModel |
| | |
| | app = FastAPI( |
| | title="CyberForge ML API", |
| | description="Real-time cybersecurity threat detection", |
| | version="1.0.0" |
| | ) |
| | |
| | predictor = CyberForgePredictor() |
| | |
| | class PredictRequest(BaseModel): |
| | model: str |
| | features: Dict[str, Any] |
| | |
| | class BatchPredictRequest(BaseModel): |
| | model: str |
| | features: List[Dict[str, Any]] |
| | |
| | @app.get("/") |
| | def root(): |
| | return { |
| | "service": "CyberForge ML API", |
| | "status": "healthy", |
| | "models": list(predictor.MODEL_INFO.keys()) |
| | } |
| | |
| | @app.get("/health") |
| | def health(): |
| | return {"status": "healthy"} |
| | |
| | @app.get("/models") |
| | def list_models(): |
| | return predictor.get_model_info() |
| | |
| | @app.post("/predict") |
| | def predict(request: PredictRequest): |
| | result = predictor.predict(request.model, request.features) |
| | if "error" in result: |
| | raise HTTPException(status_code=400, detail=result["error"]) |
| | return result |
| | |
| | @app.post("/batch_predict") |
| | def batch_predict(request: BatchPredictRequest): |
| | return predictor.batch_predict(request.model, request.features) |
| | |
| | return app |
| | except ImportError: |
| | print("FastAPI not installed. Install with: pip install fastapi uvicorn") |
| | return None |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | predictor = CyberForgePredictor() |
| | |
| | test_features = { |
| | "is_https": False, |
| | "has_mixed_content": True, |
| | "missing_headers_count": 3, |
| | "has_insecure_cookies": True, |
| | "url_length": 150 |
| | } |
| | |
| | for model in predictor.MODEL_INFO.keys(): |
| | result = predictor.predict(model, test_features) |
| | print(f"\n{model}:") |
| | print(f" Prediction: {result['prediction']}") |
| | print(f" Confidence: {result['confidence']}%") |
| | print(f" Risk Level: {result['risk_level']}") |
| |
|