Spaces:
Build error
Build error
File size: 6,548 Bytes
c40c447 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
"""
Servicio de dominio para detecci贸n de anomal铆as.
Este servicio encapsula la l贸gica de detecci贸n de anomal铆as,
cumpliendo con SRP y DIP.
"""
from typing import List
from app.domain.interfaces.forecast_model import IForecastModel
from app.domain.interfaces.data_transformer import IDataTransformer
from app.domain.models.time_series import TimeSeries
from app.domain.models.forecast_config import ForecastConfig
from app.domain.models.anomaly import AnomalyPoint
from app.utils.logger import setup_logger
logger = setup_logger(__name__)
class AnomalyService:
"""
Servicio de dominio para detecci贸n de anomal铆as.
Detecta puntos an贸malos comparando valores observados con
pron贸sticos del modelo, usando intervalos de predicci贸n.
Attributes:
model: Modelo de forecasting
transformer: Transformador de datos
Example:
>>> service = AnomalyService(model, transformer)
>>> context = TimeSeries(values=[100, 102, 105, 103, 108])
>>> recent = [107, 200, 106] # 200 es anomal铆a
>>> anomalies = service.detect_anomalies(context, recent, config)
>>> sum(1 for a in anomalies if a.is_anomaly)
1
"""
def __init__(
self,
model: IForecastModel,
transformer: IDataTransformer
):
"""
Inicializa el servicio.
Args:
model: Implementaci贸n de IForecastModel
transformer: Implementaci贸n de IDataTransformer
"""
self.model = model
self.transformer = transformer
logger.info("AnomalyService initialized")
def detect_anomalies(
self,
context: TimeSeries,
recent_observed: List[float],
config: ForecastConfig,
quantile_low: float = 0.05,
quantile_high: float = 0.95
) -> List[AnomalyPoint]:
"""
Detecta anomal铆as comparando observaciones con pron贸stico.
Un punto se considera an贸malo si cae fuera del intervalo
[quantile_low, quantile_high] del pron贸stico.
Args:
context: Serie temporal hist贸rica (contexto)
recent_observed: Valores recientes a evaluar
config: Configuraci贸n del forecast
quantile_low: Cuantil inferior del intervalo (default: 0.05)
quantile_high: Cuantil superior del intervalo (default: 0.95)
Returns:
List[AnomalyPoint]: Lista de puntos con indicador de anomal铆a
Raises:
ValueError: Si las longitudes no coinciden
Example:
>>> context = TimeSeries(values=[100, 102, 105])
>>> recent = [106, 250, 104] # 250 es anomal铆a
>>> config = ForecastConfig(prediction_length=3)
>>> anomalies = service.detect_anomalies(context, recent, config)
>>> anomalies[1].is_anomaly
True
"""
logger.info(
f"Detecting anomalies in {len(recent_observed)} points "
f"(interval: [{quantile_low}, {quantile_high}])"
)
# Validar longitudes
if len(recent_observed) != config.prediction_length:
raise ValueError(
f"recent_observed length ({len(recent_observed)}) must equal "
f"prediction_length ({config.prediction_length})"
)
# Preparar config con cuantiles necesarios
quantiles = sorted(set([quantile_low, 0.5, quantile_high]))
config_anomaly = ForecastConfig(
prediction_length=config.prediction_length,
quantile_levels=quantiles,
freq=config.freq
)
# Construir DataFrame de contexto
context_df = self.transformer.build_context_df(
values=context.values,
timestamps=context.timestamps,
series_id=context.series_id,
freq=config.freq
)
# Predecir
pred_df = self.model.predict(
context_df=context_df,
prediction_length=config_anomaly.prediction_length,
quantile_levels=config_anomaly.quantile_levels
)
# Parsear resultado
result = self.transformer.parse_prediction_result(
pred_df=pred_df,
quantile_levels=quantiles
)
# Detectar anomal铆as
anomalies = []
q_low_key = f"{quantile_low:.3g}"
q_high_key = f"{quantile_high:.3g}"
for i, obs in enumerate(recent_observed):
expected = result["median"][i]
lower = result["quantiles"][q_low_key][i]
upper = result["quantiles"][q_high_key][i]
# Verificar si est谩 fuera del intervalo
is_anom = (obs < lower) or (obs > upper)
# Calcular z-score aproximado
spread = (upper - lower) / 2
z_score = abs(obs - expected) / (spread + 1e-8) if spread > 0 else 0
anomalies.append(AnomalyPoint(
index=i,
value=obs,
expected=expected,
lower_bound=lower,
upper_bound=upper,
is_anomaly=is_anom,
z_score=z_score
))
num_anomalies = sum(1 for a in anomalies if a.is_anomaly)
logger.info(
f"Anomaly detection completed: {num_anomalies}/{len(anomalies)} "
"anomalies detected"
)
return anomalies
def get_anomaly_summary(self, anomalies: List[AnomalyPoint]) -> dict:
"""
Genera un resumen de las anomal铆as detectadas.
Args:
anomalies: Lista de anomal铆as
Returns:
Dict con estad铆sticas de las anomal铆as
"""
total = len(anomalies)
detected = sum(1 for a in anomalies if a.is_anomaly)
severities = {"low": 0, "medium": 0, "high": 0}
for a in anomalies:
if a.is_anomaly and a.severity:
severities[a.severity] += 1
return {
"total_points": total,
"anomalies_detected": detected,
"anomaly_rate": (detected / total * 100) if total > 0 else 0,
"severities": severities,
"max_deviation": max((a.deviation for a in anomalies), default=0),
"max_z_score": max((abs(a.z_score) for a in anomalies), default=0)
}
|