|
|
""" |
|
|
Vector Autoregression (VAR), Structural VAR, and Dynamic Factor Models |
|
|
|
|
|
These econometric models are critical for: |
|
|
- Multi-country time-series forecasting |
|
|
- Structural identification of shocks |
|
|
- Granger causality testing |
|
|
- Impulse response analysis |
|
|
- Nowcasting with common factors |
|
|
|
|
|
VAR models capture interdependencies between multiple time series, |
|
|
while SVAR adds structural (causal) interpretation. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from scipy import linalg, stats |
|
|
from typing import Dict, List, Tuple, Optional, Union |
|
|
from dataclasses import dataclass |
|
|
import warnings |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class VARResults: |
|
|
"""Results from VAR estimation.""" |
|
|
coefficients: np.ndarray |
|
|
intercept: np.ndarray |
|
|
residuals: np.ndarray |
|
|
sigma_u: np.ndarray |
|
|
log_likelihood: float |
|
|
aic: float |
|
|
bic: float |
|
|
fitted_values: np.ndarray |
|
|
n_lags: int |
|
|
variable_names: List[str] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class IRFResult: |
|
|
"""Impulse Response Function results.""" |
|
|
irf: np.ndarray |
|
|
lower_bound: Optional[np.ndarray] = None |
|
|
upper_bound: Optional[np.ndarray] = None |
|
|
shock_names: Optional[List[str]] = None |
|
|
response_names: Optional[List[str]] = None |
|
|
|
|
|
|
|
|
class VARModel: |
|
|
""" |
|
|
Vector Autoregression Model |
|
|
|
|
|
Y_t = c + A_1 Y_{t-1} + A_2 Y_{t-2} + ... + A_p Y_{t-p} + u_t |
|
|
|
|
|
where: |
|
|
- Y_t is a k-dimensional vector of endogenous variables |
|
|
- A_i are k×k coefficient matrices |
|
|
- u_t ~ N(0, Σ_u) are white noise innovations |
|
|
|
|
|
Example: |
|
|
>>> var = VARModel(n_lags=2) |
|
|
>>> results = var.fit(data) # data shape: (T, k) |
|
|
>>> forecast = var.forecast(results, steps=10) |
|
|
>>> granger = var.granger_causality(results, 'GDP', 'interest_rate') |
|
|
""" |
|
|
|
|
|
def __init__(self, n_lags: int = 1, trend: str = 'c'): |
|
|
""" |
|
|
Initialize VAR model. |
|
|
|
|
|
Args: |
|
|
n_lags: Number of lags to include |
|
|
trend: Trend specification ('n'=none, 'c'=constant, 'ct'=constant+trend) |
|
|
""" |
|
|
self.n_lags = n_lags |
|
|
self.trend = trend |
|
|
|
|
|
def fit(self, data: np.ndarray, variable_names: Optional[List[str]] = None) -> VARResults: |
|
|
""" |
|
|
Estimate VAR model using OLS equation-by-equation. |
|
|
|
|
|
Args: |
|
|
data: Time series data, shape (n_obs, n_vars) |
|
|
variable_names: Optional names for variables |
|
|
|
|
|
Returns: |
|
|
VARResults object with estimated parameters |
|
|
""" |
|
|
data = np.asarray(data) |
|
|
n_obs, n_vars = data.shape |
|
|
|
|
|
if variable_names is None: |
|
|
variable_names = [f"var{i}" for i in range(n_vars)] |
|
|
|
|
|
|
|
|
Y, X = self._create_lag_matrix(data) |
|
|
|
|
|
|
|
|
XtX = X.T @ X |
|
|
XtY = X.T @ Y |
|
|
|
|
|
try: |
|
|
beta = linalg.solve(XtX, XtY, assume_a='pos') |
|
|
except linalg.LinAlgError: |
|
|
|
|
|
beta = linalg.lstsq(X, Y)[0] |
|
|
|
|
|
|
|
|
if self.trend in ['c', 'ct']: |
|
|
intercept = beta[0, :] |
|
|
coefficients = beta[1:, :].T |
|
|
else: |
|
|
intercept = np.zeros(n_vars) |
|
|
coefficients = beta.T |
|
|
|
|
|
|
|
|
fitted_values = X @ beta |
|
|
residuals = Y - fitted_values |
|
|
n_effective = Y.shape[0] |
|
|
n_params = X.shape[1] |
|
|
|
|
|
|
|
|
sigma_u = (residuals.T @ residuals) / (n_effective - n_params) |
|
|
|
|
|
|
|
|
log_likelihood = self._log_likelihood(residuals, sigma_u, n_effective) |
|
|
aic = -2 * log_likelihood + 2 * n_params * n_vars |
|
|
bic = -2 * log_likelihood + np.log(n_effective) * n_params * n_vars |
|
|
|
|
|
return VARResults( |
|
|
coefficients=coefficients, |
|
|
intercept=intercept, |
|
|
residuals=residuals, |
|
|
sigma_u=sigma_u, |
|
|
log_likelihood=log_likelihood, |
|
|
aic=aic, |
|
|
bic=bic, |
|
|
fitted_values=fitted_values, |
|
|
n_lags=self.n_lags, |
|
|
variable_names=variable_names |
|
|
) |
|
|
|
|
|
def _create_lag_matrix(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Create lagged design matrix for VAR.""" |
|
|
n_obs, n_vars = data.shape |
|
|
|
|
|
|
|
|
Y = data[self.n_lags:, :] |
|
|
|
|
|
|
|
|
X_lags = [] |
|
|
for lag in range(1, self.n_lags + 1): |
|
|
X_lags.append(data[self.n_lags - lag: n_obs - lag, :]) |
|
|
|
|
|
X = np.hstack(X_lags) |
|
|
|
|
|
|
|
|
n_effective = Y.shape[0] |
|
|
if self.trend == 'c': |
|
|
X = np.hstack([np.ones((n_effective, 1)), X]) |
|
|
elif self.trend == 'ct': |
|
|
X = np.hstack([ |
|
|
np.ones((n_effective, 1)), |
|
|
np.arange(1, n_effective + 1).reshape(-1, 1), |
|
|
X |
|
|
]) |
|
|
|
|
|
return Y, X |
|
|
|
|
|
def forecast(self, results: VARResults, steps: int, last_obs: Optional[np.ndarray] = None) -> np.ndarray: |
|
|
""" |
|
|
Generate multi-step forecasts. |
|
|
|
|
|
Args: |
|
|
results: Fitted VAR results |
|
|
steps: Number of steps to forecast |
|
|
last_obs: Last n_lags observations to condition on |
|
|
If None, uses last observations from fitted data |
|
|
|
|
|
Returns: |
|
|
Forecasts, shape (steps, n_vars) |
|
|
""" |
|
|
n_vars = results.coefficients.shape[0] |
|
|
|
|
|
|
|
|
if last_obs is None: |
|
|
|
|
|
last_obs = results.fitted_values[-self.n_lags:] + results.residuals[-self.n_lags:] |
|
|
else: |
|
|
last_obs = np.asarray(last_obs) |
|
|
|
|
|
|
|
|
A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) |
|
|
|
|
|
forecasts = np.zeros((steps, n_vars)) |
|
|
history = last_obs.copy() |
|
|
|
|
|
for step in range(steps): |
|
|
|
|
|
forecast = results.intercept.copy() |
|
|
|
|
|
for lag in range(self.n_lags): |
|
|
if lag < history.shape[0]: |
|
|
forecast += A_matrices[:, lag, :] @ history[-(lag + 1), :] |
|
|
|
|
|
forecasts[step, :] = forecast |
|
|
|
|
|
|
|
|
history = np.vstack([history, forecast]) |
|
|
|
|
|
return forecasts |
|
|
|
|
|
def granger_causality(self, results: VARResults, caused_var: Union[int, str], |
|
|
causing_var: Union[int, str]) -> Dict[str, float]: |
|
|
""" |
|
|
Test Granger causality: Does causing_var help predict caused_var? |
|
|
|
|
|
H0: Lags of causing_var do not help predict caused_var |
|
|
|
|
|
Args: |
|
|
results: Fitted VAR results |
|
|
caused_var: Index or name of caused variable |
|
|
causing_var: Index or name of causing variable |
|
|
|
|
|
Returns: |
|
|
Dictionary with F-statistic, p-value, and conclusion |
|
|
""" |
|
|
|
|
|
if isinstance(caused_var, str): |
|
|
caused_var = results.variable_names.index(caused_var) |
|
|
if isinstance(causing_var, str): |
|
|
causing_var = results.variable_names.index(causing_var) |
|
|
|
|
|
n_vars = results.coefficients.shape[0] |
|
|
A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) |
|
|
|
|
|
|
|
|
relevant_coeffs = A_matrices[caused_var, :, causing_var] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n_obs = results.residuals.shape[0] |
|
|
n_restrictions = self.n_lags |
|
|
|
|
|
|
|
|
wald = np.sum(relevant_coeffs ** 2) / results.sigma_u[caused_var, caused_var] |
|
|
f_stat = wald / n_restrictions |
|
|
|
|
|
|
|
|
p_value = 1 - stats.f.cdf(f_stat, n_restrictions, n_obs - n_vars * self.n_lags - 1) |
|
|
|
|
|
return { |
|
|
'f_statistic': f_stat, |
|
|
'p_value': p_value, |
|
|
'causing_var': results.variable_names[causing_var], |
|
|
'caused_var': results.variable_names[caused_var], |
|
|
'granger_causes': p_value < 0.05 |
|
|
} |
|
|
|
|
|
def impulse_response(self, results: VARResults, steps: int = 10, |
|
|
orthogonalized: bool = True) -> IRFResult: |
|
|
""" |
|
|
Compute Impulse Response Functions. |
|
|
|
|
|
IRFs show the effect of a one-time shock to one variable on all variables. |
|
|
|
|
|
Args: |
|
|
results: Fitted VAR results |
|
|
steps: Number of steps to compute |
|
|
orthogonalized: If True, use Cholesky orthogonalization |
|
|
|
|
|
Returns: |
|
|
IRFResult with impulse responses |
|
|
""" |
|
|
n_vars = results.coefficients.shape[0] |
|
|
A_matrices = results.coefficients.reshape(n_vars, self.n_lags, n_vars) |
|
|
|
|
|
|
|
|
irf = np.zeros((n_vars, n_vars, steps)) |
|
|
|
|
|
|
|
|
if orthogonalized: |
|
|
|
|
|
|
|
|
P = linalg.cholesky(results.sigma_u, lower=True) |
|
|
else: |
|
|
P = np.eye(n_vars) |
|
|
|
|
|
|
|
|
irf[:, :, 0] = P |
|
|
|
|
|
|
|
|
companion = self._companion_matrix(A_matrices) |
|
|
|
|
|
for step in range(1, steps): |
|
|
|
|
|
response = np.zeros((n_vars, n_vars)) |
|
|
|
|
|
for lag in range(1, min(step + 1, self.n_lags + 1)): |
|
|
response += A_matrices[:, lag - 1, :] @ irf[:, :, step - lag] |
|
|
|
|
|
irf[:, :, step] = response |
|
|
|
|
|
return IRFResult( |
|
|
irf=irf, |
|
|
shock_names=results.variable_names, |
|
|
response_names=results.variable_names |
|
|
) |
|
|
|
|
|
def forecast_error_variance_decomposition(self, results: VARResults, |
|
|
steps: int = 10) -> np.ndarray: |
|
|
""" |
|
|
Compute Forecast Error Variance Decomposition (FEVD). |
|
|
|
|
|
Shows the proportion of forecast error variance in variable i |
|
|
attributable to shocks in variable j. |
|
|
|
|
|
Args: |
|
|
results: Fitted VAR results |
|
|
steps: Number of forecast horizons |
|
|
|
|
|
Returns: |
|
|
FEVD array, shape (n_vars, n_vars, steps) |
|
|
fevd[i, j, s] = contribution of shock j to variance of variable i at horizon s |
|
|
""" |
|
|
irf_result = self.impulse_response(results, steps=steps, orthogonalized=True) |
|
|
irf = irf_result.irf |
|
|
|
|
|
n_vars = irf.shape[0] |
|
|
fevd = np.zeros((n_vars, n_vars, steps)) |
|
|
|
|
|
for step in range(steps): |
|
|
|
|
|
mse = np.sum(irf[:, :, :step + 1] ** 2, axis=2) |
|
|
|
|
|
for i in range(n_vars): |
|
|
|
|
|
total_mse = mse[i, :].sum() |
|
|
if total_mse > 0: |
|
|
fevd[i, :, step] = mse[i, :] / total_mse |
|
|
|
|
|
return fevd |
|
|
|
|
|
def _companion_matrix(self, A_matrices: np.ndarray) -> np.ndarray: |
|
|
"""Construct companion form matrix for VAR.""" |
|
|
n_vars, n_lags, _ = A_matrices.shape |
|
|
size = n_vars * n_lags |
|
|
|
|
|
companion = np.zeros((size, size)) |
|
|
|
|
|
|
|
|
for lag in range(n_lags): |
|
|
companion[:n_vars, lag * n_vars:(lag + 1) * n_vars] = A_matrices[:, lag, :] |
|
|
|
|
|
|
|
|
if n_lags > 1: |
|
|
companion[n_vars:, :size - n_vars] = np.eye(size - n_vars) |
|
|
|
|
|
return companion |
|
|
|
|
|
def _log_likelihood(self, residuals: np.ndarray, sigma_u: np.ndarray, n_obs: int) -> float: |
|
|
"""Compute log-likelihood for VAR model.""" |
|
|
k = residuals.shape[1] |
|
|
|
|
|
sign, logdet = np.linalg.slogdet(sigma_u) |
|
|
if sign <= 0: |
|
|
return -np.inf |
|
|
|
|
|
ll = -0.5 * n_obs * (k * np.log(2 * np.pi) + logdet) |
|
|
return ll |
|
|
|
|
|
|
|
|
class SVARModel: |
|
|
""" |
|
|
Structural Vector Autoregression |
|
|
|
|
|
Imposes structure on the reduced-form VAR to identify structural shocks: |
|
|
|
|
|
A_0 Y_t = c + A_1 Y_{t-1} + ... + A_p Y_{t-p} + B_0 ε_t |
|
|
|
|
|
where ε_t are structural shocks with E[ε_t ε_t'] = I |
|
|
|
|
|
Identification schemes: |
|
|
- 'cholesky': Recursive (triangular) identification |
|
|
- 'short_run': Short-run restrictions on A_0 |
|
|
- 'long_run': Long-run restrictions on cumulative IRF |
|
|
- 'sign': Sign restrictions on IRFs |
|
|
|
|
|
Example: |
|
|
>>> svar = SVARModel(n_lags=2, identification='cholesky') |
|
|
>>> results = svar.fit(data) |
|
|
>>> structural_irf = svar.structural_impulse_response(results, steps=20) |
|
|
""" |
|
|
|
|
|
def __init__(self, n_lags: int = 1, identification: str = 'cholesky'): |
|
|
""" |
|
|
Initialize SVAR model. |
|
|
|
|
|
Args: |
|
|
n_lags: Number of lags |
|
|
identification: Identification scheme |
|
|
""" |
|
|
self.n_lags = n_lags |
|
|
self.identification = identification |
|
|
self.var_model = VARModel(n_lags=n_lags) |
|
|
|
|
|
def fit(self, data: np.ndarray, variable_names: Optional[List[str]] = None, |
|
|
restrictions: Optional[np.ndarray] = None) -> VARResults: |
|
|
""" |
|
|
Estimate SVAR model. |
|
|
|
|
|
Args: |
|
|
data: Time series data |
|
|
variable_names: Variable names |
|
|
restrictions: Optional restriction matrix for identification |
|
|
|
|
|
Returns: |
|
|
VARResults with structural parameters |
|
|
""" |
|
|
|
|
|
var_results = self.var_model.fit(data, variable_names) |
|
|
|
|
|
|
|
|
if self.identification == 'cholesky': |
|
|
|
|
|
structural_matrix = linalg.cholesky(var_results.sigma_u, lower=True) |
|
|
elif self.identification == 'short_run' and restrictions is not None: |
|
|
|
|
|
structural_matrix = self._identify_short_run(var_results.sigma_u, restrictions) |
|
|
else: |
|
|
warnings.warn(f"Identification '{self.identification}' not fully implemented, using Cholesky") |
|
|
structural_matrix = linalg.cholesky(var_results.sigma_u, lower=True) |
|
|
|
|
|
|
|
|
var_results.structural_matrix = structural_matrix |
|
|
|
|
|
return var_results |
|
|
|
|
|
def structural_impulse_response(self, results: VARResults, steps: int = 10) -> IRFResult: |
|
|
""" |
|
|
Compute structural (identified) impulse response functions. |
|
|
|
|
|
Args: |
|
|
results: Fitted SVAR results with structural_matrix |
|
|
steps: Number of steps |
|
|
|
|
|
Returns: |
|
|
Structural IRF |
|
|
""" |
|
|
|
|
|
irf_result = self.var_model.impulse_response(results, steps=steps, orthogonalized=True) |
|
|
|
|
|
|
|
|
return irf_result |
|
|
|
|
|
def _identify_short_run(self, sigma_u: np.ndarray, restrictions: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Identify structural matrix using short-run restrictions. |
|
|
|
|
|
This is a placeholder for more sophisticated identification. |
|
|
Full implementation would use iterative algorithms. |
|
|
""" |
|
|
|
|
|
return linalg.cholesky(sigma_u, lower=True) |
|
|
|
|
|
|
|
|
class DynamicFactorModel: |
|
|
""" |
|
|
Dynamic Factor Model for high-dimensional time series. |
|
|
|
|
|
Model: |
|
|
X_t = Λ F_t + e_t (observation equation) |
|
|
F_t = Φ F_{t-1} + η_t (state equation) |
|
|
|
|
|
where: |
|
|
- X_t: n-dimensional observed variables |
|
|
- F_t: r-dimensional common factors (r << n) |
|
|
- Λ: n×r factor loading matrix |
|
|
- Φ: r×r factor dynamics matrix |
|
|
|
|
|
Used for: |
|
|
- Nowcasting with many indicators |
|
|
- Dimension reduction |
|
|
- Extracting common trends |
|
|
|
|
|
Example: |
|
|
>>> dfm = DynamicFactorModel(n_factors=3, n_lags=1) |
|
|
>>> results = dfm.fit(data) # data: (T, 100) many indicators |
|
|
>>> factors = dfm.extract_factors(data, results) |
|
|
>>> forecast = dfm.forecast(results, steps=10) |
|
|
""" |
|
|
|
|
|
def __init__(self, n_factors: int = 1, n_lags: int = 1, max_iter: int = 100): |
|
|
""" |
|
|
Initialize DFM. |
|
|
|
|
|
Args: |
|
|
n_factors: Number of common factors |
|
|
n_lags: Number of lags in factor dynamics |
|
|
max_iter: Maximum EM iterations |
|
|
""" |
|
|
self.n_factors = n_factors |
|
|
self.n_lags = n_lags |
|
|
self.max_iter = max_iter |
|
|
|
|
|
def fit(self, data: np.ndarray) -> Dict: |
|
|
""" |
|
|
Estimate DFM using EM algorithm or principal components. |
|
|
|
|
|
Args: |
|
|
data: Observed data, shape (n_obs, n_vars) |
|
|
|
|
|
Returns: |
|
|
Dictionary with estimated parameters |
|
|
""" |
|
|
data = np.asarray(data) |
|
|
n_obs, n_vars = data.shape |
|
|
|
|
|
|
|
|
data_mean = np.nanmean(data, axis=0) |
|
|
data_std = np.nanstd(data, axis=0) |
|
|
data_std[data_std == 0] = 1.0 |
|
|
data_normalized = (data - data_mean) / data_std |
|
|
|
|
|
|
|
|
|
|
|
data_imputed = data_normalized.copy() |
|
|
data_imputed[np.isnan(data_imputed)] = 0 |
|
|
|
|
|
|
|
|
U, S, Vt = linalg.svd(data_imputed, full_matrices=False) |
|
|
|
|
|
|
|
|
factors = U[:, :self.n_factors] * S[:self.n_factors] |
|
|
loadings = Vt[:self.n_factors, :].T |
|
|
|
|
|
|
|
|
var_model = VARModel(n_lags=self.n_lags) |
|
|
factor_var = var_model.fit(factors) |
|
|
|
|
|
|
|
|
reconstruction = factors @ loadings.T |
|
|
residuals = data_normalized - reconstruction |
|
|
idiosyncratic_var = np.nanvar(residuals, axis=0) |
|
|
|
|
|
return { |
|
|
'factors': factors, |
|
|
'loadings': loadings, |
|
|
'factor_dynamics': factor_var, |
|
|
'idiosyncratic_var': idiosyncratic_var, |
|
|
'data_mean': data_mean, |
|
|
'data_std': data_std, |
|
|
'explained_variance_ratio': (S[:self.n_factors] ** 2).sum() / (S ** 2).sum() |
|
|
} |
|
|
|
|
|
def extract_factors(self, data: np.ndarray, model: Dict) -> np.ndarray: |
|
|
""" |
|
|
Extract factors from new data using fitted model. |
|
|
|
|
|
Args: |
|
|
data: New data to extract factors from |
|
|
model: Fitted model from fit() |
|
|
|
|
|
Returns: |
|
|
Extracted factors, shape (n_obs, n_factors) |
|
|
""" |
|
|
|
|
|
data_normalized = (data - model['data_mean']) / model['data_std'] |
|
|
|
|
|
|
|
|
loadings = model['loadings'] |
|
|
factors = data_normalized @ loadings @ linalg.inv(loadings.T @ loadings) |
|
|
|
|
|
return factors |
|
|
|
|
|
def forecast(self, model: Dict, steps: int = 1) -> np.ndarray: |
|
|
""" |
|
|
Forecast future values. |
|
|
|
|
|
Args: |
|
|
model: Fitted model |
|
|
steps: Number of steps to forecast |
|
|
|
|
|
Returns: |
|
|
Forecasted data, shape (steps, n_vars) |
|
|
""" |
|
|
|
|
|
var_model = VARModel(n_lags=self.n_lags) |
|
|
factor_forecast = var_model.forecast(model['factor_dynamics'], steps=steps) |
|
|
|
|
|
|
|
|
loadings = model['loadings'] |
|
|
data_forecast = factor_forecast @ loadings.T |
|
|
|
|
|
|
|
|
data_forecast = data_forecast * model['data_std'] + model['data_mean'] |
|
|
|
|
|
return data_forecast |
|
|
|
|
|
|
|
|
class GrangerCausality: |
|
|
""" |
|
|
Comprehensive Granger causality testing. |
|
|
|
|
|
Tests whether one time series helps predict another beyond its own history. |
|
|
|
|
|
Methods: |
|
|
- Pairwise Granger causality |
|
|
- Conditional Granger causality (controlling for other variables) |
|
|
- Block Granger causality (group of variables) |
|
|
- Instantaneous causality (contemporaneous correlation) |
|
|
""" |
|
|
|
|
|
@staticmethod |
|
|
def test(data: np.ndarray, caused_idx: int, causing_idx: int, |
|
|
max_lag: int = 10, criterion: str = 'bic') -> Dict: |
|
|
""" |
|
|
Test Granger causality with optimal lag selection. |
|
|
|
|
|
Args: |
|
|
data: Time series data, shape (n_obs, n_vars) |
|
|
caused_idx: Index of caused variable |
|
|
causing_idx: Index of causing variable |
|
|
max_lag: Maximum lag to consider |
|
|
criterion: Information criterion for lag selection ('aic' or 'bic') |
|
|
|
|
|
Returns: |
|
|
Dictionary with test results |
|
|
""" |
|
|
|
|
|
best_lag = 1 |
|
|
best_ic = np.inf |
|
|
|
|
|
for lag in range(1, max_lag + 1): |
|
|
var_model = VARModel(n_lags=lag) |
|
|
results = var_model.fit(data) |
|
|
|
|
|
ic = results.aic if criterion == 'aic' else results.bic |
|
|
if ic < best_ic: |
|
|
best_ic = ic |
|
|
best_lag = lag |
|
|
|
|
|
|
|
|
var_model = VARModel(n_lags=best_lag) |
|
|
results = var_model.fit(data) |
|
|
|
|
|
causality_result = var_model.granger_causality(results, caused_idx, causing_idx) |
|
|
causality_result['optimal_lag'] = best_lag |
|
|
|
|
|
return causality_result |
|
|
|
|
|
@staticmethod |
|
|
def pairwise_matrix(data: np.ndarray, max_lag: int = 10, |
|
|
variable_names: Optional[List[str]] = None) -> np.ndarray: |
|
|
""" |
|
|
Compute pairwise Granger causality matrix. |
|
|
|
|
|
Args: |
|
|
data: Time series data |
|
|
max_lag: Maximum lag |
|
|
variable_names: Variable names |
|
|
|
|
|
Returns: |
|
|
Causality matrix where entry (i,j) is p-value for "j Granger-causes i" |
|
|
""" |
|
|
n_vars = data.shape[1] |
|
|
if variable_names is None: |
|
|
variable_names = [f"var{i}" for i in range(n_vars)] |
|
|
|
|
|
causality_matrix = np.zeros((n_vars, n_vars)) |
|
|
|
|
|
for i in range(n_vars): |
|
|
for j in range(n_vars): |
|
|
if i != j: |
|
|
result = GrangerCausality.test(data, caused_idx=i, causing_idx=j, max_lag=max_lag) |
|
|
causality_matrix[i, j] = result['p_value'] |
|
|
|
|
|
return causality_matrix |
|
|
|