chronos2-excel-forecasting-api / app /main_working_version.py
ttzzs's picture
Deploy Chronos2 Forecasting API v3.0.0 with new SOLID architecture
c40c447 verified
import os
from typing import List, Dict, Optional
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from chronos import Chronos2Pipeline
# =========================
# Configuraci贸n del modelo
# =========================
MODEL_ID = os.getenv("CHRONOS_MODEL_ID", "amazon/chronos-2")
DEVICE_MAP = os.getenv("DEVICE_MAP", "cpu") # "cpu" o "cuda"
app = FastAPI(
title="Chronos-2 Universal Forecasting API",
description=(
"Servidor local (Docker) para pron贸sticos con Chronos-2: univariante, "
"multivariante, covariables, escenarios, anomal铆as y backtesting."
),
version="1.0.0",
)
# Configurar CORS para Excel Add-in
app.add_middleware(
CORSMiddleware,
allow_origins=["https://localhost:3001", "https://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Carga 煤nica del modelo al iniciar el proceso
pipeline = Chronos2Pipeline.from_pretrained(MODEL_ID, device_map=DEVICE_MAP)
# =========================
# Modelos Pydantic comunes
# =========================
class BaseForecastConfig(BaseModel):
prediction_length: int = Field(
7, description="Horizonte de predicci贸n (n煤mero de pasos futuros)"
)
quantile_levels: List[float] = Field(
default_factory=lambda: [0.1, 0.5, 0.9],
description="Cuantiles para el pron贸stico probabil铆stico",
)
start_timestamp: Optional[str] = Field(
default=None,
description=(
"Fecha/hora inicial del hist贸rico (formato ISO). "
"Si no se especifica, se usan 铆ndices enteros."
),
)
freq: str = Field(
"D",
description="Frecuencia temporal (p.ej. 'D' diario, 'H' horario, 'W' semanal...).",
)
class UnivariateSeries(BaseModel):
values: List[float]
class MultiSeriesItem(BaseModel):
series_id: str
values: List[float]
class CovariatePoint(BaseModel):
"""
Punto temporal usado tanto para contexto (hist贸rico) como para covariables futuras.
"""
timestamp: Optional[str] = None # opcional si se usan 铆ndices enteros
id: Optional[str] = None # id de serie, por defecto 'series_0'
target: Optional[float] = None # valor de la variable objetivo (hist贸rico)
covariates: Dict[str, float] = Field(
default_factory=dict,
description="Nombre -> valor de cada covariable din谩mica.",
)
# =========================
# 1) Healthcheck
# =========================
@app.get("/health")
def health():
"""
Devuelve informaci贸n b谩sica del estado del servidor y el modelo cargado.
"""
return {
"status": "ok",
"model_id": MODEL_ID,
"device_map": DEVICE_MAP,
}
# =========================
# 2) Pron贸stico univariante
# =========================
class ForecastUnivariateRequest(BaseForecastConfig):
series: UnivariateSeries
class ForecastUnivariateResponse(BaseModel):
timestamps: List[str]
median: List[float]
quantiles: Dict[str, List[float]] # "0.1" -> [..], "0.9" -> [..]
@app.post("/forecast_univariate", response_model=ForecastUnivariateResponse)
def forecast_univariate(req: ForecastUnivariateRequest):
"""
Pron贸stico para una sola serie temporal (univariante, sin covariables).
Pensado para uso directo desde Excel u otras herramientas sencillas.
"""
values = req.series.values
n = len(values)
if n == 0:
raise HTTPException(status_code=400, detail="La serie no puede estar vac铆a.")
# Construimos contexto como DataFrame largo (id, timestamp, target)
if req.start_timestamp:
timestamps = pd.date_range(
start=pd.to_datetime(req.start_timestamp),
periods=n,
freq=req.freq,
)
else:
timestamps = pd.RangeIndex(start=0, stop=n, step=1)
context_df = pd.DataFrame(
{
"id": ["series_0"] * n,
"timestamp": timestamps,
"target": values,
}
)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values("timestamp")
timestamps_out = pred_df["timestamp"].astype(str).tolist()
median = pred_df["predictions"].astype(float).tolist()
quantiles_dict: Dict[str, List[float]] = {}
for q in req.quantile_levels:
key = f"{q:.3g}"
if key in pred_df.columns:
quantiles_dict[key] = pred_df[key].astype(float).tolist()
return ForecastUnivariateResponse(
timestamps=timestamps_out,
median=median,
quantiles=quantiles_dict,
)
# =========================
# 3) Multi-serie (multi-id)
# =========================
class ForecastMultiSeriesRequest(BaseForecastConfig):
series_list: List[MultiSeriesItem]
class SeriesForecast(BaseModel):
series_id: str
timestamps: List[str]
median: List[float]
quantiles: Dict[str, List[float]]
class ForecastMultiSeriesResponse(BaseModel):
forecasts: List[SeriesForecast]
@app.post("/forecast_multi_id", response_model=ForecastMultiSeriesResponse)
def forecast_multi_id(req: ForecastMultiSeriesRequest):
"""
Pron贸stico para m煤ltiples series (por ejemplo, varios SKU o tiendas).
"""
if not req.series_list:
raise HTTPException(status_code=400, detail="Debes enviar al menos una serie.")
frames = []
for item in req.series_list:
n = len(item.values)
if n == 0:
continue
if req.start_timestamp:
timestamps = pd.date_range(
start=pd.to_datetime(req.start_timestamp),
periods=n,
freq=req.freq,
)
else:
timestamps = pd.RangeIndex(start=0, stop=n, step=1)
frames.append(
pd.DataFrame(
{
"id": [item.series_id] * n,
"timestamp": timestamps,
"target": item.values,
}
)
)
if not frames:
raise HTTPException(status_code=400, detail="Todas las series est谩n vac铆as.")
context_df = pd.concat(frames, ignore_index=True)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
forecasts: List[SeriesForecast] = []
for series_id, group in pred_df.groupby("id"):
group = group.sort_values("timestamp")
timestamps_out = group["timestamp"].astype(str).tolist()
median = group["predictions"].astype(float).tolist()
quantiles_dict: Dict[str, List[float]] = {}
for q in req.quantile_levels:
key = f"{q:.3g}"
if key in group.columns:
quantiles_dict[key] = group[key].astype(float).tolist()
forecasts.append(
SeriesForecast(
series_id=series_id,
timestamps=timestamps_out,
median=median,
quantiles=quantiles_dict,
)
)
return ForecastMultiSeriesResponse(forecasts=forecasts)
# =========================
# 4) Pron贸stico con covariables
# =========================
class ForecastWithCovariatesRequest(BaseForecastConfig):
context: List[CovariatePoint]
future: Optional[List[CovariatePoint]] = None
class ForecastWithCovariatesResponse(BaseModel):
# filas con todas las columnas de pred_df serializadas como string
pred_df: List[Dict[str, str]]
@app.post("/forecast_with_covariates", response_model=ForecastWithCovariatesResponse)
def forecast_with_covariates(req: ForecastWithCovariatesRequest):
"""
Pron贸stico con informaci贸n de covariables (promos, precio, clima...) tanto
en el hist贸rico (context) como en futuros posibles (future).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vac铆o.")
ctx_rows = []
for p in req.context:
if p.target is None:
continue
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
"target": p.target,
}
for k, v in p.covariates.items():
row[k] = v
ctx_rows.append(row)
context_df = pd.DataFrame(ctx_rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
future_df = None
if req.future:
fut_rows = []
for p in req.future:
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for k, v in p.covariates.items():
row[k] = v
fut_rows.append(row)
future_df = pd.DataFrame(fut_rows)
if "timestamp" not in future_df or future_df["timestamp"].isna().any():
future_df["timestamp"] = pd.RangeIndex(
start=context_df["timestamp"].max() + 1,
stop=context_df["timestamp"].max() + 1 + len(future_df),
step=1,
)
pred_df = pipeline.predict_df(
context_df,
future_df=future_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records: List[Dict[str, str]] = []
for _, row in pred_df.iterrows():
record = {k: str(v) for k, v in row.items()}
out_records.append(record)
return ForecastWithCovariatesResponse(pred_df=out_records)
# =========================
# 5) Multivariante (varios targets)
# =========================
class MultivariateContextPoint(BaseModel):
timestamp: Optional[str] = None
id: Optional[str] = None
targets: Dict[str, float] # p.ej. {"demand": 100, "returns": 5}
covariates: Dict[str, float] = Field(default_factory=dict)
class ForecastMultivariateRequest(BaseForecastConfig):
context: List[MultivariateContextPoint]
target_columns: List[str] # nombres de columnas objetivo
class ForecastMultivariateResponse(BaseModel):
pred_df: List[Dict[str, str]]
@app.post("/forecast_multivariate", response_model=ForecastMultivariateResponse)
def forecast_multivariate(req: ForecastMultivariateRequest):
"""
Pron贸stico multivariante: m煤ltiples columnas objetivo (p.ej. demanda y devoluciones).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vac铆o.")
if not req.target_columns:
raise HTTPException(status_code=400, detail="Debes indicar columnas objetivo.")
rows = []
for p in req.context:
base = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for t_name, t_val in p.targets.items():
base[t_name] = t_val
for k, v in p.covariates.items():
base[k] = v
rows.append(base)
context_df = pd.DataFrame(rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target=req.target_columns,
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()]
return ForecastMultivariateResponse(pred_df=out_records)
# =========================
# 6) Escenarios (what-if)
# =========================
class ScenarioDefinition(BaseModel):
name: str
future_covariates: List[CovariatePoint]
class ScenarioForecast(BaseModel):
name: str
pred_df: List[Dict[str, str]]
class ForecastScenariosRequest(BaseForecastConfig):
context: List[CovariatePoint]
scenarios: List[ScenarioDefinition]
class ForecastScenariosResponse(BaseModel):
scenarios: List[ScenarioForecast]
@app.post("/forecast_scenarios", response_model=ForecastScenariosResponse)
def forecast_scenarios(req: ForecastScenariosRequest):
"""
Evaluaci贸n de m煤ltiples escenarios (what-if) cambiando las covariables futuras
(por ejemplo, promo ON/OFF, diferentes precios, etc.).
"""
if not req.context:
raise HTTPException(status_code=400, detail="El contexto no puede estar vac铆o.")
if not req.scenarios:
raise HTTPException(status_code=400, detail="Debes definir al menos un escenario.")
ctx_rows = []
for p in req.context:
if p.target is None:
continue
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
"target": p.target,
}
for k, v in p.covariates.items():
row[k] = v
ctx_rows.append(row)
context_df = pd.DataFrame(ctx_rows)
if "timestamp" not in context_df or context_df["timestamp"].isna().any():
context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1)
results: List[ScenarioForecast] = []
for scen in req.scenarios:
fut_rows = []
for p in scen.future_covariates:
row = {
"id": p.id or "series_0",
"timestamp": p.timestamp,
}
for k, v in p.covariates.items():
row[k] = v
fut_rows.append(row)
future_df = pd.DataFrame(fut_rows)
if "timestamp" not in future_df or future_df["timestamp"].isna().any():
future_df["timestamp"] = pd.RangeIndex(
start=context_df["timestamp"].max() + 1,
stop=context_df["timestamp"].max() + 1 + len(future_df),
step=1,
)
pred_df = pipeline.predict_df(
context_df,
future_df=future_df,
prediction_length=req.prediction_length,
quantile_levels=req.quantile_levels,
id_column="id",
timestamp_column="timestamp",
target="target",
)
pred_df = pred_df.sort_values(["id", "timestamp"])
out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()]
results.append(ScenarioForecast(name=scen.name, pred_df=out_records))
return ForecastScenariosResponse(scenarios=results)
# =========================
# 7) Detecci贸n de anomal铆as
# =========================
class AnomalyDetectionRequest(BaseModel):
context: UnivariateSeries
recent_observed: List[float]
prediction_length: int = 7
quantile_low: float = 0.05
quantile_high: float = 0.95
class AnomalyPoint(BaseModel):
index: int
value: float
predicted_median: float
lower: float
upper: float
is_anomaly: bool
class AnomalyDetectionResponse(BaseModel):
anomalies: List[AnomalyPoint]
@app.post("/detect_anomalies", response_model=AnomalyDetectionResponse)
def detect_anomalies(req: AnomalyDetectionRequest):
"""
Marca como anomal铆as los puntos observados recientes que caen fuera del
intervalo [quantile_low, quantile_high] del pron贸stico.
"""
n_hist = len(req.context.values)
if n_hist == 0:
raise HTTPException(status_code=400, detail="La serie hist贸rica no puede estar vac铆a.")
if len(req.recent_observed) != req.prediction_length:
raise HTTPException(
status_code=400,
detail="recent_observed debe tener la misma longitud que prediction_length.",
)
context_df = pd.DataFrame(
{
"id": ["series_0"] * n_hist,
"timestamp": pd.RangeIndex(start=0, stop=n_hist, step=1),
"target": req.context.values,
}
)
quantiles = sorted({req.quantile_low, 0.5, req.quantile_high})
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.prediction_length,
quantile_levels=quantiles,
id_column="id",
timestamp_column="timestamp",
target="target",
).sort_values("timestamp")
q_low_col = f"{req.quantile_low:.3g}"
q_high_col = f"{req.quantile_high:.3g}"
anomalies: List[AnomalyPoint] = []
for i, (obs, (_, row)) in enumerate(zip(req.recent_observed, pred_df.iterrows())):
lower = float(row[q_low_col])
upper = float(row[q_high_col])
median = float(row["predictions"])
is_anom = (obs < lower) or (obs > upper)
anomalies.append(
AnomalyPoint(
index=i,
value=obs,
predicted_median=median,
lower=lower,
upper=upper,
is_anomaly=is_anom,
)
)
return AnomalyDetectionResponse(anomalies=anomalies)
# =========================
# 8) Backtest simple
# =========================
class BacktestRequest(BaseModel):
series: UnivariateSeries
prediction_length: int = 7
test_length: int = 28
class BacktestMetrics(BaseModel):
mae: float
mape: float
wql: float # Weighted Quantile Loss aproximada para el cuantil 0.5
class BacktestResponse(BaseModel):
metrics: BacktestMetrics
forecast_median: List[float]
forecast_timestamps: List[str]
actuals: List[float]
@app.post("/backtest_simple", response_model=BacktestResponse)
def backtest_simple(req: BacktestRequest):
"""
Backtest sencillo: separamos un tramo final de la serie como test, pronosticamos
ese tramo y calculamos m茅tricas MAE / MAPE / WQL.
"""
values = np.array(req.series.values, dtype=float)
n = len(values)
if n <= req.test_length:
raise HTTPException(
status_code=400,
detail="La serie debe ser m谩s larga que test_length.",
)
train = values[: n - req.test_length]
test = values[n - req.test_length :]
context_df = pd.DataFrame(
{
"id": ["series_0"] * len(train),
"timestamp": pd.RangeIndex(start=0, stop=len(train), step=1),
"target": train.tolist(),
}
)
pred_df = pipeline.predict_df(
context_df,
prediction_length=req.test_length,
quantile_levels=[0.5],
id_column="id",
timestamp_column="timestamp",
target="target",
).sort_values("timestamp")
forecast = pred_df["predictions"].to_numpy(dtype=float)
timestamps = pred_df["timestamp"].astype(str).tolist()
mae = float(np.mean(np.abs(test - forecast)))
eps = 1e-8
mape = float(np.mean(np.abs((test - forecast) / (test + eps)))) * 100.0
tau = 0.5
diff = test - forecast
wql = float(np.mean(np.maximum(tau * diff, (tau - 1) * diff)))
metrics = BacktestMetrics(mae=mae, mape=mape, wql=wql)
return BacktestResponse(
metrics=metrics,
forecast_median=forecast.tolist(),
forecast_timestamps=timestamps,
actuals=test.tolist(),
)