File size: 6,568 Bytes
c40c447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
Servicio de dominio para forecasting.

Este servicio orquesta la l贸gica de negocio de forecasting,
cumpliendo con SRP y DIP.
"""

from typing import List
from app.domain.interfaces.forecast_model import IForecastModel
from app.domain.interfaces.data_transformer import IDataTransformer
from app.domain.models.time_series import TimeSeries
from app.domain.models.forecast_config import ForecastConfig
from app.domain.models.forecast_result import ForecastResult
from app.utils.logger import setup_logger

logger = setup_logger(__name__)


class ForecastService:
    """
    Servicio de dominio para operaciones de forecasting.
    
    Este servicio encapsula la l贸gica de negocio para generar pron贸sticos,
    dependiendo de abstracciones (IForecastModel, IDataTransformer) en lugar
    de implementaciones concretas (DIP).
    
    Attributes:
        model: Modelo de forecasting (implementa IForecastModel)
        transformer: Transformador de datos (implementa IDataTransformer)
    
    Example:
        >>> from app.infrastructure.ml.chronos_model import ChronosModel
        >>> from app.utils.dataframe_builder import DataFrameBuilder
        >>> 
        >>> model = ChronosModel("amazon/chronos-2")
        >>> transformer = DataFrameBuilder()
        >>> service = ForecastService(model, transformer)
        >>> 
        >>> series = TimeSeries(values=[100, 102, 105])
        >>> config = ForecastConfig(prediction_length=3)
        >>> result = service.forecast_univariate(series, config)
    """
    
    def __init__(
        self,
        model: IForecastModel,
        transformer: IDataTransformer
    ):
        """
        Inicializa el servicio con sus dependencias.
        
        Args:
            model: Implementaci贸n de IForecastModel
            transformer: Implementaci贸n de IDataTransformer
        """
        self.model = model
        self.transformer = transformer
        
        model_info = self.model.get_model_info()
        logger.info(
            f"ForecastService initialized with model: {model_info.get('type', 'unknown')}"
        )
    
    def forecast_univariate(
        self,
        series: TimeSeries,
        config: ForecastConfig
    ) -> ForecastResult:
        """
        Genera pron贸stico para una serie univariada.
        
        Args:
            series: Serie temporal a pronosticar
            config: Configuraci贸n del forecast
        
        Returns:
            ForecastResult: Resultado con pron贸sticos
        
        Raises:
            ValueError: Si la serie o configuraci贸n son inv谩lidas
            RuntimeError: Si el modelo falla al predecir
        
        Example:
            >>> series = TimeSeries(values=[100, 102, 105, 103, 108])
            >>> config = ForecastConfig(prediction_length=3)
            >>> result = service.forecast_univariate(series, config)
            >>> len(result.median)
            3
        """
        logger.info(
            f"Forecasting univariate series '{series.series_id}' "
            f"(length={series.length}, horizon={config.prediction_length})"
        )
        
        # Validar entrada
        series.validate()
        config.validate()
        
        # Transformar serie a DataFrame
        context_df = self.transformer.build_context_df(
            values=series.values,
            timestamps=series.timestamps,
            series_id=series.series_id,
            freq=config.freq
        )
        
        logger.debug(f"Context DataFrame shape: {context_df.shape}")
        
        # Validar DataFrame
        self.model.validate_context(context_df)
        
        # Predecir
        try:
            pred_df = self.model.predict(
                context_df=context_df,
                prediction_length=config.prediction_length,
                quantile_levels=config.quantile_levels
            )
        except Exception as e:
            logger.error(f"Model prediction failed: {e}", exc_info=True)
            raise RuntimeError(f"Error al predecir: {e}") from e
        
        logger.debug(f"Prediction DataFrame shape: {pred_df.shape}")
        
        # Parsear resultado
        result_dict = self.transformer.parse_prediction_result(
            pred_df=pred_df,
            quantile_levels=config.quantile_levels
        )
        
        # Crear ForecastResult
        result = ForecastResult(
            timestamps=result_dict["timestamps"],
            median=result_dict["median"],
            quantiles=result_dict["quantiles"],
            series_id=series.series_id,
            metadata={
                "prediction_length": config.prediction_length,
                "quantile_levels": config.quantile_levels,
                "freq": config.freq,
                "model": self.model.get_model_info()
            }
        )
        
        logger.info(
            f"Forecast completed: {result.length} periods generated "
            f"for series '{series.series_id}'"
        )
        
        return result
    
    def forecast_multi_series(
        self,
        series_list: List[TimeSeries],
        config: ForecastConfig
    ) -> List[ForecastResult]:
        """
        Genera pron贸sticos para m煤ltiples series.
        
        Args:
            series_list: Lista de series temporales
            config: Configuraci贸n del forecast (misma para todas)
        
        Returns:
            List[ForecastResult]: Lista de resultados (uno por serie)
        
        Example:
            >>> series1 = TimeSeries(values=[100, 102], series_id="A")
            >>> series2 = TimeSeries(values=[200, 205], series_id="B")
            >>> results = service.forecast_multi_series([series1, series2], config)
            >>> len(results)
            2
        """
        logger.info(f"Forecasting {len(series_list)} series")
        
        if not series_list:
            raise ValueError("series_list no puede estar vac铆a")
        
        results = []
        for i, series in enumerate(series_list):
            logger.debug(f"Processing series {i+1}/{len(series_list)}: {series.series_id}")
            
            try:
                result = self.forecast_univariate(series, config)
                results.append(result)
            except Exception as e:
                logger.error(
                    f"Failed to forecast series '{series.series_id}': {e}",
                    exc_info=True
                )
                raise
        
        logger.info(f"Multi-series forecast completed: {len(results)} series processed")
        return results