""" Vector Autoregression (VAR), Structural VAR, and Dynamic Factor Models These econometric models are critical for: - Multi-country time-series forecasting - Structural identification of shocks - Granger causality testing - Impulse response analysis - Nowcasting with common factors VAR models capture interdependencies between multiple time series, while SVAR adds structural (causal) interpretation. """ import numpy as np from scipy import linalg, stats from typing import Dict, List, Tuple, Optional, Union from dataclasses import dataclass import warnings @dataclass class VARResults: """Results from VAR estimation.""" coefficients: np.ndarray # Shape: (n_vars, n_vars * n_lags) intercept: np.ndarray # Shape: (n_vars,) residuals: np.ndarray # Shape: (n_obs, n_vars) sigma_u: np.ndarray # Residual covariance matrix log_likelihood: float aic: float bic: float fitted_values: np.ndarray n_lags: int variable_names: List[str] @dataclass class IRFResult: """Impulse Response Function results.""" irf: np.ndarray # Shape: (n_vars, n_vars, n_steps) lower_bound: Optional[np.ndarray] = None upper_bound: Optional[np.ndarray] = None shock_names: Optional[List[str]] = None response_names: Optional[List[str]] = None class VARModel: """ Vector Autoregression Model Y_t = c + A_1 Y_{t-1} + A_2 Y_{t-2} + ... + A_p Y_{t-p} + u_t where: - Y_t is a k-dimensional vector of endogenous variables - A_i are k×k coefficient matrices - u_t ~ N(0, Σ_u) are white noise innovations Example: >>> var = VARModel(n_lags=2) >>> results = var.fit(data) # data shape: (T, k) >>> forecast = var.forecast(results, steps=10) >>> granger = var.granger_causality(results, 'GDP', 'interest_rate') """ def __init__(self, n_lags: int = 1, trend: str = 'c'): """ Initialize VAR model. Args: n_lags: Number of lags to include trend: Trend specification ('n'=none, 'c'=constant, 'ct'=constant+trend) """ self.n_lags = n_lags self.trend = trend def fit(self, data: np.ndarray, variable_names: Optional[List[str]] = None) -> VARResults: """ Estimate VAR model using OLS equation-by-equation. Args: data: Time series data, shape (n_obs, n_vars) variable_names: Optional names for variables Returns: VARResults object with estimated parameters """ data = np.asarray(data) n_obs, n_vars = data.shape if variable_names is None: variable_names = [f"var{i}" for i in range(n_vars)] # Construct lagged design matrix Y, X = self._create_lag_matrix(data) # OLS estimation: β = (X'X)^{-1} X'Y XtX = X.T @ X XtY = X.T @ Y try: beta = linalg.solve(XtX, XtY, assume_a='pos') # More numerically stable except linalg.LinAlgError: # Fallback to pseudo-inverse if singular beta = linalg.lstsq(X, Y)[0] # Extract coefficients and intercept if self.trend in ['c', 'ct']: intercept = beta[0, :] coefficients = beta[1:, :].T # Shape: (n_vars, n_vars * n_lags) else: intercept = np.zeros(n_vars) coefficients = beta.T # Compute residuals and covariance fitted_values = X @ beta residuals = Y - fitted_values n_effective = Y.shape[0] n_params = X.shape[1] # Degrees of freedom adjustment sigma_u = (residuals.T @ residuals) / (n_effective - n_params) # Information criteria log_likelihood = self._log_likelihood(residuals, sigma_u, n_effective) aic = -2 * log_likelihood + 2 * n_params * n_vars bic = -2 * log_likelihood + np.log(n_effective) * n_params * n_vars return VARResults( coefficients=coefficients, intercept=intercept, residuals=residuals, sigma_u=sigma_u, log_likelihood=log_likelihood, aic=aic, bic=bic, fitted_values=fitted_values, n_lags=self.n_lags, variable_names=variable_names ) def _create_lag_matrix(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Create lagged design matrix for VAR.""" n_obs, n_vars = data.shape # Create lagged variables Y = data[self.n_lags:, :] # Dependent variable # Create design matrix with lags X_lags = [] for lag in range(1, self.n_lags + 1): X_lags.append(data[self.n_lags - lag: n_obs - lag, :]) X = np.hstack(X_lags) # Add trend terms n_effective = Y.shape[0] if self.trend == 'c': X = np.hstack([np.ones((n_effective, 1)), X]) elif self.trend == 'ct': X = np.hstack([ np.ones((n_effective, 1)), np.arange(1, n_effective + 1).reshape(-1, 1), X ]) return Y, X def forecast(self, results: VARResults, steps: int, last_obs: Optional[np.ndarray] = None) -> np.ndarray: """ Generate multi-step forecasts. Args: results: Fitted VAR results steps: Number of steps to forecast last_obs: Last n_lags observations to condition on If None, uses last observations from fitted data Returns: Forecasts, shape (steps, n_vars) """ n_vars = results.coefficients.shape[0] # Initialize with last observations if last_obs is None: # Use residuals to reconstruct last observations last_obs = results.fitted_values[-self.n_lags:] + results.residuals[-self.n_lags:] else: last_obs = np.asarray(last_obs) # Reshape coefficients for easier computation A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) forecasts = np.zeros((steps, n_vars)) history = last_obs.copy() for step in range(steps): # Compute forecast: c + A_1 Y_{t-1} + ... + A_p Y_{t-p} forecast = results.intercept.copy() for lag in range(self.n_lags): if lag < history.shape[0]: forecast += A_matrices[:, lag, :] @ history[-(lag + 1), :] forecasts[step, :] = forecast # Update history history = np.vstack([history, forecast]) return forecasts def granger_causality(self, results: VARResults, caused_var: Union[int, str], causing_var: Union[int, str]) -> Dict[str, float]: """ Test Granger causality: Does causing_var help predict caused_var? H0: Lags of causing_var do not help predict caused_var Args: results: Fitted VAR results caused_var: Index or name of caused variable causing_var: Index or name of causing variable Returns: Dictionary with F-statistic, p-value, and conclusion """ # Convert names to indices if isinstance(caused_var, str): caused_var = results.variable_names.index(caused_var) if isinstance(causing_var, str): causing_var = results.variable_names.index(causing_var) n_vars = results.coefficients.shape[0] A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) # Extract coefficients of causing_var in equation for caused_var relevant_coeffs = A_matrices[caused_var, :, causing_var] # F-test: H0: all relevant coefficients = 0 # RSS_restricted vs RSS_unrestricted # For simplicity, use Wald test # Compute F-statistic # This is approximate; full implementation would require re-estimation n_obs = results.residuals.shape[0] n_restrictions = self.n_lags # Wald statistic wald = np.sum(relevant_coeffs ** 2) / results.sigma_u[caused_var, caused_var] f_stat = wald / n_restrictions # p-value from F-distribution p_value = 1 - stats.f.cdf(f_stat, n_restrictions, n_obs - n_vars * self.n_lags - 1) return { 'f_statistic': f_stat, 'p_value': p_value, 'causing_var': results.variable_names[causing_var], 'caused_var': results.variable_names[caused_var], 'granger_causes': p_value < 0.05 } def impulse_response(self, results: VARResults, steps: int = 10, orthogonalized: bool = True) -> IRFResult: """ Compute Impulse Response Functions. IRFs show the effect of a one-time shock to one variable on all variables. Args: results: Fitted VAR results steps: Number of steps to compute orthogonalized: If True, use Cholesky orthogonalization Returns: IRFResult with impulse responses """ n_vars = results.coefficients.shape[0] A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) # Initialize IRF tensor: (response_var, shock_var, time_step) irf = np.zeros((n_vars, n_vars, steps)) # Compute structural shock matrix if orthogonalized: # Cholesky decomposition: Σ_u = P P' # Structural shocks: ε_t = P^{-1} u_t P = linalg.cholesky(results.sigma_u, lower=True) else: P = np.eye(n_vars) # Initial impact (contemporaneous) irf[:, :, 0] = P # Compute IRF recursively using VAR companion form companion = self._companion_matrix(A_matrices) for step in range(1, steps): # Φ_s = A_1 Φ_{s-1} + A_2 Φ_{s-2} + ... + A_p Φ_{s-p} response = np.zeros((n_vars, n_vars)) for lag in range(1, min(step + 1, self.n_lags + 1)): response += A_matrices[:, lag - 1, :] @ irf[:, :, step - lag] irf[:, :, step] = response return IRFResult( irf=irf, shock_names=results.variable_names, response_names=results.variable_names ) def forecast_error_variance_decomposition(self, results: VARResults, steps: int = 10) -> np.ndarray: """ Compute Forecast Error Variance Decomposition (FEVD). Shows the proportion of forecast error variance in variable i attributable to shocks in variable j. Args: results: Fitted VAR results steps: Number of forecast horizons Returns: FEVD array, shape (n_vars, n_vars, steps) fevd[i, j, s] = contribution of shock j to variance of variable i at horizon s """ irf_result = self.impulse_response(results, steps=steps, orthogonalized=True) irf = irf_result.irf n_vars = irf.shape[0] fevd = np.zeros((n_vars, n_vars, steps)) for step in range(steps): # Cumulative squared IRFs mse = np.sum(irf[:, :, :step + 1] ** 2, axis=2) # Total MSE for each variable for i in range(n_vars): # Normalize to get proportions total_mse = mse[i, :].sum() if total_mse > 0: fevd[i, :, step] = mse[i, :] / total_mse return fevd def _companion_matrix(self, A_matrices: np.ndarray) -> np.ndarray: """Construct companion form matrix for VAR.""" n_vars, n_lags, _ = A_matrices.shape size = n_vars * n_lags companion = np.zeros((size, size)) # First block row: [A_1, A_2, ..., A_p] for lag in range(n_lags): companion[:n_vars, lag * n_vars:(lag + 1) * n_vars] = A_matrices[:, lag, :] # Identity blocks for lagged variables if n_lags > 1: companion[n_vars:, :size - n_vars] = np.eye(size - n_vars) return companion def _log_likelihood(self, residuals: np.ndarray, sigma_u: np.ndarray, n_obs: int) -> float: """Compute log-likelihood for VAR model.""" k = residuals.shape[1] sign, logdet = np.linalg.slogdet(sigma_u) if sign <= 0: return -np.inf ll = -0.5 * n_obs * (k * np.log(2 * np.pi) + logdet) return ll class SVARModel: """ Structural Vector Autoregression Imposes structure on the reduced-form VAR to identify structural shocks: A_0 Y_t = c + A_1 Y_{t-1} + ... + A_p Y_{t-p} + B_0 ε_t where ε_t are structural shocks with E[ε_t ε_t'] = I Identification schemes: - 'cholesky': Recursive (triangular) identification - 'short_run': Short-run restrictions on A_0 - 'long_run': Long-run restrictions on cumulative IRF - 'sign': Sign restrictions on IRFs Example: >>> svar = SVARModel(n_lags=2, identification='cholesky') >>> results = svar.fit(data) >>> structural_irf = svar.structural_impulse_response(results, steps=20) """ def __init__(self, n_lags: int = 1, identification: str = 'cholesky'): """ Initialize SVAR model. Args: n_lags: Number of lags identification: Identification scheme """ self.n_lags = n_lags self.identification = identification self.var_model = VARModel(n_lags=n_lags) def fit(self, data: np.ndarray, variable_names: Optional[List[str]] = None, restrictions: Optional[np.ndarray] = None) -> VARResults: """ Estimate SVAR model. Args: data: Time series data variable_names: Variable names restrictions: Optional restriction matrix for identification Returns: VARResults with structural parameters """ # First estimate reduced-form VAR var_results = self.var_model.fit(data, variable_names) # Apply identification scheme if self.identification == 'cholesky': # A_0 = I, B_0 = P (Cholesky factor) structural_matrix = linalg.cholesky(var_results.sigma_u, lower=True) elif self.identification == 'short_run' and restrictions is not None: # Impose short-run restrictions structural_matrix = self._identify_short_run(var_results.sigma_u, restrictions) else: warnings.warn(f"Identification '{self.identification}' not fully implemented, using Cholesky") structural_matrix = linalg.cholesky(var_results.sigma_u, lower=True) # Store structural matrix in results (extending VARResults) var_results.structural_matrix = structural_matrix return var_results def structural_impulse_response(self, results: VARResults, steps: int = 10) -> IRFResult: """ Compute structural (identified) impulse response functions. Args: results: Fitted SVAR results with structural_matrix steps: Number of steps Returns: Structural IRF """ # Use VAR IRF with structural identification irf_result = self.var_model.impulse_response(results, steps=steps, orthogonalized=True) # IRFs are already structural due to Cholesky in fit() return irf_result def _identify_short_run(self, sigma_u: np.ndarray, restrictions: np.ndarray) -> np.ndarray: """ Identify structural matrix using short-run restrictions. This is a placeholder for more sophisticated identification. Full implementation would use iterative algorithms. """ # For now, use Cholesky return linalg.cholesky(sigma_u, lower=True) class DynamicFactorModel: """ Dynamic Factor Model for high-dimensional time series. Model: X_t = Λ F_t + e_t (observation equation) F_t = Φ F_{t-1} + η_t (state equation) where: - X_t: n-dimensional observed variables - F_t: r-dimensional common factors (r << n) - Λ: n×r factor loading matrix - Φ: r×r factor dynamics matrix Used for: - Nowcasting with many indicators - Dimension reduction - Extracting common trends Example: >>> dfm = DynamicFactorModel(n_factors=3, n_lags=1) >>> results = dfm.fit(data) # data: (T, 100) many indicators >>> factors = dfm.extract_factors(data, results) >>> forecast = dfm.forecast(results, steps=10) """ def __init__(self, n_factors: int = 1, n_lags: int = 1, max_iter: int = 100): """ Initialize DFM. Args: n_factors: Number of common factors n_lags: Number of lags in factor dynamics max_iter: Maximum EM iterations """ self.n_factors = n_factors self.n_lags = n_lags self.max_iter = max_iter def fit(self, data: np.ndarray) -> Dict: """ Estimate DFM using EM algorithm or principal components. Args: data: Observed data, shape (n_obs, n_vars) Returns: Dictionary with estimated parameters """ data = np.asarray(data) n_obs, n_vars = data.shape # Standardize data data_mean = np.nanmean(data, axis=0) data_std = np.nanstd(data, axis=0) data_std[data_std == 0] = 1.0 data_normalized = (data - data_mean) / data_std # Initial factor extraction via PCA # Handle missing data by mean imputation data_imputed = data_normalized.copy() data_imputed[np.isnan(data_imputed)] = 0 # SVD for PCA U, S, Vt = linalg.svd(data_imputed, full_matrices=False) # Extract factors (principal components) factors = U[:, :self.n_factors] * S[:self.n_factors] loadings = Vt[:self.n_factors, :].T # Shape: (n_vars, n_factors) # Estimate factor dynamics using VAR var_model = VARModel(n_lags=self.n_lags) factor_var = var_model.fit(factors) # Idiosyncratic variance reconstruction = factors @ loadings.T residuals = data_normalized - reconstruction idiosyncratic_var = np.nanvar(residuals, axis=0) return { 'factors': factors, 'loadings': loadings, 'factor_dynamics': factor_var, 'idiosyncratic_var': idiosyncratic_var, 'data_mean': data_mean, 'data_std': data_std, 'explained_variance_ratio': (S[:self.n_factors] ** 2).sum() / (S ** 2).sum() } def extract_factors(self, data: np.ndarray, model: Dict) -> np.ndarray: """ Extract factors from new data using fitted model. Args: data: New data to extract factors from model: Fitted model from fit() Returns: Extracted factors, shape (n_obs, n_factors) """ # Normalize data data_normalized = (data - model['data_mean']) / model['data_std'] # Project onto loadings (least squares) loadings = model['loadings'] factors = data_normalized @ loadings @ linalg.inv(loadings.T @ loadings) return factors def forecast(self, model: Dict, steps: int = 1) -> np.ndarray: """ Forecast future values. Args: model: Fitted model steps: Number of steps to forecast Returns: Forecasted data, shape (steps, n_vars) """ # Forecast factors using VAR var_model = VARModel(n_lags=self.n_lags) factor_forecast = var_model.forecast(model['factor_dynamics'], steps=steps) # Reconstruct observed variables loadings = model['loadings'] data_forecast = factor_forecast @ loadings.T # Denormalize data_forecast = data_forecast * model['data_std'] + model['data_mean'] return data_forecast class GrangerCausality: """ Comprehensive Granger causality testing. Tests whether one time series helps predict another beyond its own history. Methods: - Pairwise Granger causality - Conditional Granger causality (controlling for other variables) - Block Granger causality (group of variables) - Instantaneous causality (contemporaneous correlation) """ @staticmethod def test(data: np.ndarray, caused_idx: int, causing_idx: int, max_lag: int = 10, criterion: str = 'bic') -> Dict: """ Test Granger causality with optimal lag selection. Args: data: Time series data, shape (n_obs, n_vars) caused_idx: Index of caused variable causing_idx: Index of causing variable max_lag: Maximum lag to consider criterion: Information criterion for lag selection ('aic' or 'bic') Returns: Dictionary with test results """ # Select optimal lag best_lag = 1 best_ic = np.inf for lag in range(1, max_lag + 1): var_model = VARModel(n_lags=lag) results = var_model.fit(data) ic = results.aic if criterion == 'aic' else results.bic if ic < best_ic: best_ic = ic best_lag = lag # Test with optimal lag var_model = VARModel(n_lags=best_lag) results = var_model.fit(data) causality_result = var_model.granger_causality(results, caused_idx, causing_idx) causality_result['optimal_lag'] = best_lag return causality_result @staticmethod def pairwise_matrix(data: np.ndarray, max_lag: int = 10, variable_names: Optional[List[str]] = None) -> np.ndarray: """ Compute pairwise Granger causality matrix. Args: data: Time series data max_lag: Maximum lag variable_names: Variable names Returns: Causality matrix where entry (i,j) is p-value for "j Granger-causes i" """ n_vars = data.shape[1] if variable_names is None: variable_names = [f"var{i}" for i in range(n_vars)] causality_matrix = np.zeros((n_vars, n_vars)) for i in range(n_vars): for j in range(n_vars): if i != j: result = GrangerCausality.test(data, caused_idx=i, causing_idx=j, max_lag=max_lag) causality_matrix[i, j] = result['p_value'] return causality_matrix