clarkkitchen22's picture
Initial GeoBot Forecasting Framework commit
484e3bc
"""
Stochastic Differential Equation (SDE) Solvers
Implements numerical methods for SDEs:
dx_t = f(x_t, t)dt + g(x_t, t)dW_t
where W_t is a Wiener process (Brownian motion).
Methods:
- Euler-Maruyama (order 0.5 strong convergence)
- Milstein (order 1.0 strong convergence)
- Runge-Kutta for SDEs
- Jump-diffusion processes (Merton, Kou)
Applications:
- Continuous-time geopolitical dynamics
- Financial contagion models
- Regime transitions with stochastic shocks
"""
import numpy as np
from typing import Callable, Optional, Tuple, List, Dict, Any
from dataclasses import dataclass
from scipy.stats import poisson, norm
@dataclass
class SDESolution:
"""
Solution to an SDE.
Attributes
----------
t : np.ndarray
Time points
x : np.ndarray
State trajectories
method : str
Integration method used
"""
t: np.ndarray
x: np.ndarray
method: str
class SDESolver:
"""
Base class for SDE solvers.
Solves: dx_t = f(x_t, t)dt + g(x_t, t)dW_t
"""
def __init__(
self,
drift: Callable[[np.ndarray, float], np.ndarray],
diffusion: Callable[[np.ndarray, float], np.ndarray],
x0: np.ndarray,
t0: float = 0.0
):
"""
Initialize SDE solver.
Parameters
----------
drift : callable
Drift function f(x, t)
diffusion : callable
Diffusion function g(x, t)
x0 : np.ndarray
Initial condition
t0 : float
Initial time
"""
self.drift = drift
self.diffusion = diffusion
self.x0 = np.asarray(x0)
self.t0 = t0
self.dim = len(self.x0)
def integrate(
self,
T: float,
dt: float,
n_paths: int = 1
) -> SDESolution:
"""
Integrate SDE.
Parameters
----------
T : float
Final time
dt : float
Time step
n_paths : int
Number of sample paths
Returns
-------
SDESolution
Solution object
"""
raise NotImplementedError("Subclasses must implement integrate()")
class EulerMaruyama(SDESolver):
"""
Euler-Maruyama method for SDEs.
Simplest method with order 0.5 strong convergence.
x_{n+1} = x_n + f(x_n, t_n)Δt + g(x_n, t_n)ΔW_n
where ΔW_n ~ N(0, Δt)
"""
def integrate(
self,
T: float,
dt: float,
n_paths: int = 1
) -> SDESolution:
"""
Integrate using Euler-Maruyama.
Parameters
----------
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths to simulate
Returns
-------
SDESolution
Solution
"""
# Time grid
n_steps = int((T - self.t0) / dt)
t = np.linspace(self.t0, T, n_steps + 1)
# Initialize paths
x = np.zeros((n_paths, n_steps + 1, self.dim))
x[:, 0, :] = self.x0
# Brownian increments
sqrt_dt = np.sqrt(dt)
# Simulate paths
for i in range(n_steps):
t_current = t[i]
for path in range(n_paths):
x_current = x[path, i, :]
# Drift term
drift_term = self.drift(x_current, t_current) * dt
# Diffusion term
dW = np.random.randn(self.dim) * sqrt_dt
diffusion_term = self.diffusion(x_current, t_current) * dW
# Update
x[path, i + 1, :] = x_current + drift_term + diffusion_term
return SDESolution(t=t, x=x, method='euler_maruyama')
class Milstein(SDESolver):
"""
Milstein method for SDEs.
Higher-order method with order 1.0 strong convergence.
Requires derivative of diffusion term.
x_{n+1} = x_n + f(x_n)Δt + g(x_n)ΔW_n
+ 0.5 * g(x_n) * g'(x_n) * ((ΔW_n)^2 - Δt)
where g'(x) = ∂g/∂x
"""
def __init__(
self,
drift: Callable,
diffusion: Callable,
diffusion_derivative: Callable,
x0: np.ndarray,
t0: float = 0.0
):
"""
Initialize Milstein solver.
Parameters
----------
drift : callable
Drift function
diffusion : callable
Diffusion function
diffusion_derivative : callable
Derivative of diffusion: ∂g/∂x
x0 : np.ndarray
Initial condition
t0 : float
Initial time
"""
super().__init__(drift, diffusion, x0, t0)
self.diffusion_derivative = diffusion_derivative
def integrate(
self,
T: float,
dt: float,
n_paths: int = 1
) -> SDESolution:
"""
Integrate using Milstein method.
Parameters
----------
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths
Returns
-------
SDESolution
Solution
"""
n_steps = int((T - self.t0) / dt)
t = np.linspace(self.t0, T, n_steps + 1)
x = np.zeros((n_paths, n_steps + 1, self.dim))
x[:, 0, :] = self.x0
sqrt_dt = np.sqrt(dt)
for i in range(n_steps):
t_current = t[i]
for path in range(n_paths):
x_current = x[path, i, :]
# Drift
drift_term = self.drift(x_current, t_current) * dt
# Diffusion
dW = np.random.randn(self.dim) * sqrt_dt
g = self.diffusion(x_current, t_current)
diffusion_term = g * dW
# Milstein correction term
g_prime = self.diffusion_derivative(x_current, t_current)
correction = 0.5 * g * g_prime * ((dW**2) - dt)
# Update
x[path, i + 1, :] = x_current + drift_term + diffusion_term + correction
return SDESolution(t=t, x=x, method='milstein')
class StochasticRungeKutta(SDESolver):
"""
Stochastic Runge-Kutta method.
Higher-order method for SDEs with better accuracy.
"""
def integrate(
self,
T: float,
dt: float,
n_paths: int = 1
) -> SDESolution:
"""
Integrate using stochastic Runge-Kutta.
Parameters
----------
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths
Returns
-------
SDESolution
Solution
"""
n_steps = int((T - self.t0) / dt)
t = np.linspace(self.t0, T, n_steps + 1)
x = np.zeros((n_paths, n_steps + 1, self.dim))
x[:, 0, :] = self.x0
sqrt_dt = np.sqrt(dt)
for i in range(n_steps):
t_current = t[i]
for path in range(n_paths):
x_current = x[path, i, :]
# Generate Wiener increments
dW = np.random.randn(self.dim) * sqrt_dt
# Stage 1
k1_drift = self.drift(x_current, t_current)
k1_diff = self.diffusion(x_current, t_current)
# Stage 2 (predictor)
x_pred = x_current + k1_drift * dt + k1_diff * dW
k2_drift = self.drift(x_pred, t_current + dt)
k2_diff = self.diffusion(x_pred, t_current + dt)
# Update (corrector)
drift_term = 0.5 * (k1_drift + k2_drift) * dt
diffusion_term = 0.5 * (k1_diff + k2_diff) * dW
x[path, i + 1, :] = x_current + drift_term + diffusion_term
return SDESolution(t=t, x=x, method='stochastic_rk')
class JumpDiffusionProcess:
"""
Jump-diffusion process (Merton model).
Combines continuous diffusion with discrete jumps:
dx_t = μ x_t dt + σ x_t dW_t + x_t dJ_t
where J_t is a compound Poisson process:
- Jumps occur with intensity λ
- Jump sizes Y ~ N(μ_J, σ_J^2)
"""
def __init__(
self,
drift: float,
diffusion: float,
jump_intensity: float,
jump_mean: float,
jump_std: float,
x0: np.ndarray
):
"""
Initialize jump-diffusion process.
Parameters
----------
drift : float
Drift coefficient μ
diffusion : float
Diffusion coefficient σ
jump_intensity : float
Jump intensity λ (expected number of jumps per unit time)
jump_mean : float
Mean jump size (log-normal)
jump_std : float
Jump size standard deviation
x0 : np.ndarray
Initial condition
"""
self.drift = drift
self.diffusion = diffusion
self.jump_intensity = jump_intensity
self.jump_mean = jump_mean
self.jump_std = jump_std
self.x0 = np.asarray(x0)
self.dim = len(self.x0)
def simulate(
self,
T: float,
dt: float,
n_paths: int = 1
) -> SDESolution:
"""
Simulate jump-diffusion paths.
Parameters
----------
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths
Returns
-------
SDESolution
Solution
"""
n_steps = int(T / dt)
t = np.linspace(0, T, n_steps + 1)
x = np.zeros((n_paths, n_steps + 1, self.dim))
x[:, 0, :] = self.x0
sqrt_dt = np.sqrt(dt)
for i in range(n_steps):
for path in range(n_paths):
x_current = x[path, i, :]
# Continuous part (Euler-Maruyama)
dW = np.random.randn(self.dim) * sqrt_dt
continuous = self.drift * x_current * dt + self.diffusion * x_current * dW
# Jump part (Poisson process)
n_jumps = poisson.rvs(self.jump_intensity * dt)
jump_total = 0.0
if n_jumps > 0:
# Sample jump sizes
jump_sizes = norm.rvs(
loc=self.jump_mean,
scale=self.jump_std,
size=n_jumps
)
# Convert to multiplicative jumps
jump_total = x_current * np.sum(np.exp(jump_sizes) - 1)
# Update
x[path, i + 1, :] = x_current + continuous + jump_total
# Ensure non-negative (if applicable)
x[path, i + 1, :] = np.maximum(x[path, i + 1, :], 0)
return SDESolution(t=t, x=x, method='jump_diffusion')
class GeopoliticalSDE:
"""
Geopolitical system as continuous-time SDE.
Models geopolitical variables as SDEs with:
- Continuous dynamics (drift + diffusion)
- Discrete shocks (jumps)
- Regime-dependent parameters
"""
def __init__(
self,
variable_names: List[str],
drift_functions: Dict[str, Callable],
diffusion_functions: Dict[str, Callable],
jump_intensities: Optional[Dict[str, float]] = None
):
"""
Initialize geopolitical SDE.
Parameters
----------
variable_names : list
Names of state variables
drift_functions : dict
Drift function for each variable
diffusion_functions : dict
Diffusion function for each variable
jump_intensities : dict, optional
Jump intensities for discrete shocks
"""
self.variable_names = variable_names
self.drift_functions = drift_functions
self.diffusion_functions = diffusion_functions
self.jump_intensities = jump_intensities or {}
self.dim = len(variable_names)
def simulate(
self,
x0: Dict[str, float],
T: float,
dt: float,
n_paths: int = 1
) -> Dict[str, np.ndarray]:
"""
Simulate geopolitical dynamics.
Parameters
----------
x0 : dict
Initial conditions {variable: value}
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths
Returns
-------
dict
Simulated trajectories {variable: array}
"""
# Convert to array
x0_array = np.array([x0[var] for var in self.variable_names])
# Time grid
n_steps = int(T / dt)
t = np.linspace(0, T, n_steps + 1)
# Storage
trajectories = {var: np.zeros((n_paths, n_steps + 1)) for var in self.variable_names}
# Initialize
for i, var in enumerate(self.variable_names):
trajectories[var][:, 0] = x0_array[i]
sqrt_dt = np.sqrt(dt)
# Simulate
for step in range(n_steps):
t_current = t[step]
for path in range(n_paths):
# Current state
x_current = {
var: trajectories[var][path, step]
for var in self.variable_names
}
# Update each variable
for i, var in enumerate(self.variable_names):
# Drift
drift = self.drift_functions[var](x_current, t_current) * dt
# Diffusion
dW = np.random.randn() * sqrt_dt
diffusion = self.diffusion_functions[var](x_current, t_current) * dW
# Jumps
jump = 0.0
if var in self.jump_intensities:
n_jumps = poisson.rvs(self.jump_intensities[var] * dt)
if n_jumps > 0:
# Simple additive jump
jump = np.random.normal(0, 0.1) * n_jumps
# Update
new_value = x_current[var] + drift + diffusion + jump
# Constraints (e.g., probabilities in [0,1])
new_value = np.clip(new_value, 0, 1)
trajectories[var][path, step + 1] = new_value
return trajectories
def ornstein_uhlenbeck_process(
theta: float,
mu: float,
sigma: float,
x0: float,
T: float,
dt: float,
n_paths: int = 1
) -> Tuple[np.ndarray, np.ndarray]:
"""
Simulate Ornstein-Uhlenbeck process (mean-reverting).
dx_t = θ(μ - x_t)dt + σ dW_t
Parameters
----------
theta : float
Mean reversion speed
mu : float
Long-term mean
sigma : float
Volatility
x0 : float
Initial value
T : float
Final time
dt : float
Time step
n_paths : int
Number of paths
Returns
-------
tuple
(time_grid, paths)
"""
n_steps = int(T / dt)
t = np.linspace(0, T, n_steps + 1)
x = np.zeros((n_paths, n_steps + 1))
x[:, 0] = x0
sqrt_dt = np.sqrt(dt)
for i in range(n_steps):
dW = np.random.randn(n_paths) * sqrt_dt
x[:, i + 1] = x[:, i] + theta * (mu - x[:, i]) * dt + sigma * dW
return t, x