clarkkitchen22's picture
Initial GeoBot Forecasting Framework commit
484e3bc
"""
Monte Carlo Simulation Engine
Stochastic simulation for geopolitical forecasting with support for:
- Monte Carlo over causal graphs
- Agent-based Monte Carlo
- Stochastic war-gaming simulations
- Shock Monte Carlo (black swan simulation)
The more structural and stochastic your simulations, the more your engine
resembles a national-security world model.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Callable, Tuple, Any
from dataclasses import dataclass, field
from ..models.causal_graph import StructuralCausalModel
from ..core.scenario import Scenario, ScenarioDistribution
@dataclass
class SimulationConfig:
"""
Configuration for Monte Carlo simulation.
Attributes
----------
n_simulations : int
Number of Monte Carlo runs
time_horizon : int
Simulation time horizon
random_seed : int
Random seed for reproducibility
parallel : bool
Run simulations in parallel
"""
n_simulations: int = 1000
time_horizon: int = 100
random_seed: Optional[int] = None
parallel: bool = False
class MonteCarloEngine:
"""
Monte Carlo simulation engine for geopolitical forecasting.
Supports various types of stochastic simulation including:
- Basic Monte Carlo
- Causal graph-based simulation
- Shock simulation
- Path-dependent simulation
"""
def __init__(self, config: Optional[SimulationConfig] = None):
"""
Initialize Monte Carlo engine.
Parameters
----------
config : SimulationConfig, optional
Simulation configuration
"""
self.config = config or SimulationConfig()
if self.config.random_seed is not None:
np.random.seed(self.config.random_seed)
def run_basic_simulation(
self,
initial_state: Dict[str, float],
transition_fn: Callable,
noise_fn: Optional[Callable] = None
) -> List[Dict[str, np.ndarray]]:
"""
Run basic Monte Carlo simulation.
Parameters
----------
initial_state : dict
Initial state of the system
transition_fn : callable
State transition function
Signature: f(state, t, noise) -> new_state
noise_fn : callable, optional
Noise generation function
Signature: f(t) -> noise_dict
Returns
-------
list
List of simulation trajectories
"""
trajectories = []
for sim in range(self.config.n_simulations):
trajectory = {var: np.zeros(self.config.time_horizon) for var in initial_state}
# Initialize
state = initial_state.copy()
for var, val in state.items():
trajectory[var][0] = val
# Simulate forward
for t in range(1, self.config.time_horizon):
# Generate noise
noise = noise_fn(t) if noise_fn else {}
# Transition
state = transition_fn(state, t, noise)
# Record
for var, val in state.items():
trajectory[var][t] = val
trajectories.append(trajectory)
return trajectories
def run_causal_simulation(
self,
scm: StructuralCausalModel,
initial_conditions: Optional[Dict[str, float]] = None,
interventions: Optional[Dict[str, Dict[str, float]]] = None
) -> ScenarioDistribution:
"""
Run Monte Carlo simulation over causal graph.
Parameters
----------
scm : StructuralCausalModel
Structural causal model
initial_conditions : dict, optional
Initial conditions for some variables
interventions : dict, optional
Time-dependent interventions {time: {var: value}}
Returns
-------
ScenarioDistribution
Distribution of simulated scenarios
"""
scenarios = []
for sim in range(self.config.n_simulations):
scenario_features = {}
for t in range(self.config.time_horizon):
# Get interventions at this time
interv = interventions.get(t, {}) if interventions else {}
# Sample from SCM
samples = scm.sample(n_samples=1, interventions=interv)
# Store
for var, val in samples.items():
if var not in scenario_features:
scenario_features[var] = []
scenario_features[var].append(val[0])
# Convert to arrays
scenario_features = {k: np.array(v) for k, v in scenario_features.items()}
# Create scenario
scenario = Scenario(
name=f"sim_{sim}",
features=scenario_features,
probability=1.0 / self.config.n_simulations
)
scenarios.append(scenario)
return ScenarioDistribution(scenarios)
def run_path_dependent_simulation(
self,
initial_state: Dict[str, float],
transition_fn: Callable,
decision_points: List[int],
decision_fn: Callable
) -> Dict[str, Any]:
"""
Run path-dependent simulation with decision points.
This is useful for war-gaming and strategic scenarios where
decisions depend on the current state.
Parameters
----------
initial_state : dict
Initial state
transition_fn : callable
State transition function
decision_points : list
Time steps where decisions are made
decision_fn : callable
Decision function
Signature: f(state, t) -> decision_dict
Returns
-------
dict
Simulation results with decision branches
"""
trajectories = []
decisions = []
for sim in range(self.config.n_simulations):
trajectory = {var: np.zeros(self.config.time_horizon) for var in initial_state}
sim_decisions = []
state = initial_state.copy()
for var, val in state.items():
trajectory[var][0] = val
for t in range(1, self.config.time_horizon):
# Check for decision point
if t in decision_points:
decision = decision_fn(state, t)
sim_decisions.append((t, decision))
# Apply decision effects
for var, change in decision.items():
state[var] = state.get(var, 0) + change
# Transition
noise = {var: np.random.normal(0, 0.1) for var in state}
state = transition_fn(state, t, noise)
# Record
for var, val in state.items():
trajectory[var][t] = val
trajectories.append(trajectory)
decisions.append(sim_decisions)
return {
'trajectories': trajectories,
'decisions': decisions
}
def compute_statistics(
self,
trajectories: List[Dict[str, np.ndarray]]
) -> Dict[str, Dict[str, np.ndarray]]:
"""
Compute statistics across Monte Carlo trajectories.
Parameters
----------
trajectories : list
List of simulation trajectories
Returns
-------
dict
Statistics for each variable
"""
if len(trajectories) == 0:
return {}
variables = list(trajectories[0].keys())
stats = {}
for var in variables:
# Stack trajectories
data = np.array([traj[var] for traj in trajectories])
stats[var] = {
'mean': np.mean(data, axis=0),
'median': np.median(data, axis=0),
'std': np.std(data, axis=0),
'q5': np.percentile(data, 5, axis=0),
'q25': np.percentile(data, 25, axis=0),
'q75': np.percentile(data, 75, axis=0),
'q95': np.percentile(data, 95, axis=0),
'min': np.min(data, axis=0),
'max': np.max(data, axis=0)
}
return stats
def analyze_convergence(
self,
trajectories: List[Dict[str, np.ndarray]],
variable: str
) -> Dict[str, Any]:
"""
Analyze convergence of Monte Carlo simulation.
Parameters
----------
trajectories : list
Simulation trajectories
variable : str
Variable to analyze
Returns
-------
dict
Convergence metrics
"""
data = np.array([traj[variable] for traj in trajectories])
# Compute running mean
n_sims = len(trajectories)
running_means = []
for n in range(10, n_sims, 10):
running_means.append(np.mean(data[:n], axis=0))
# Compute standard error
se = np.std(data, axis=0) / np.sqrt(n_sims)
return {
'running_means': running_means,
'standard_error': se,
'converged': np.all(se < 0.01) # Arbitrary threshold
}
class ShockSimulator:
"""
Simulate black swan events and shocks in geopolitical scenarios.
This class specializes in modeling rare, high-impact events
that are critical for risk assessment.
"""
def __init__(self, mc_engine: Optional[MonteCarloEngine] = None):
"""
Initialize shock simulator.
Parameters
----------
mc_engine : MonteCarloEngine, optional
Monte Carlo engine to use
"""
self.mc_engine = mc_engine or MonteCarloEngine()
def generate_shock_scenarios(
self,
baseline_scenario: Dict[str, float],
shock_types: List[Dict[str, Any]],
shock_probabilities: List[float]
) -> ScenarioDistribution:
"""
Generate scenarios including shock events.
Parameters
----------
baseline_scenario : dict
Baseline scenario without shocks
shock_types : list
List of shock specifications
shock_probabilities : list
Probability of each shock type
Returns
-------
ScenarioDistribution
Distribution including shock scenarios
"""
scenarios = []
for sim in range(self.mc_engine.config.n_simulations):
# Check if shock occurs
shock_occurred = np.random.random() < sum(shock_probabilities)
if shock_occurred:
# Sample shock type
shock_idx = np.random.choice(len(shock_types), p=np.array(shock_probabilities) / sum(shock_probabilities))
shock = shock_types[shock_idx]
# Apply shock
scenario_state = baseline_scenario.copy()
for var, impact in shock['impacts'].items():
scenario_state[var] = scenario_state.get(var, 0) + impact
prob = shock_probabilities[shock_idx]
else:
scenario_state = baseline_scenario.copy()
prob = 1.0 - sum(shock_probabilities)
# Create scenario
scenario = Scenario(
name=f"shock_sim_{sim}",
features={k: np.array([v]) for k, v in scenario_state.items()},
probability=prob / self.mc_engine.config.n_simulations
)
scenarios.append(scenario)
return ScenarioDistribution(scenarios)
def simulate_cascading_failure(
self,
initial_failure: str,
dependency_graph: Dict[str, List[str]],
failure_probabilities: Dict[str, float],
n_simulations: int = 1000
) -> Dict[str, Any]:
"""
Simulate cascading failures in interconnected systems.
Parameters
----------
initial_failure : str
Initial component that fails
dependency_graph : dict
Dependency relationships {component: [dependent_components]}
failure_probabilities : dict
Conditional failure probabilities
n_simulations : int
Number of simulations
Returns
-------
dict
Cascading failure analysis
"""
cascade_results = []
for _ in range(n_simulations):
failed = {initial_failure}
newly_failed = {initial_failure}
# Simulate cascade
max_iterations = 100
for iteration in range(max_iterations):
if len(newly_failed) == 0:
break
current_newly_failed = set()
for failed_component in newly_failed:
# Get dependent components
dependents = dependency_graph.get(failed_component, [])
for dependent in dependents:
if dependent not in failed:
# Check if it fails
p_fail = failure_probabilities.get(dependent, 0.5)
if np.random.random() < p_fail:
current_newly_failed.add(dependent)
failed.add(dependent)
newly_failed = current_newly_failed
cascade_results.append({
'total_failures': len(failed),
'failed_components': failed,
'iterations': iteration + 1
})
return {
'simulations': cascade_results,
'mean_failures': np.mean([r['total_failures'] for r in cascade_results]),
'max_failures': max([r['total_failures'] for r in cascade_results]),
'failure_probability': {
comp: np.mean([comp in r['failed_components'] for r in cascade_results])
for comp in set().union(*[r['failed_components'] for r in cascade_results])
}
}
def simulate_tail_risk(
self,
distribution: Callable,
threshold: float,
n_samples: int = 100000
) -> Dict[str, float]:
"""
Simulate and analyze tail risk.
Parameters
----------
distribution : callable
Distribution to sample from
threshold : float
Threshold for tail event
n_samples : int
Number of samples
Returns
-------
dict
Tail risk metrics
"""
samples = distribution(n_samples)
# Compute tail metrics
exceed_threshold = samples > threshold
tail_probability = np.mean(exceed_threshold)
if tail_probability > 0:
tail_samples = samples[exceed_threshold]
conditional_mean = np.mean(tail_samples)
conditional_std = np.std(tail_samples)
else:
conditional_mean = None
conditional_std = None
return {
'tail_probability': tail_probability,
'var_95': np.percentile(samples, 95),
'var_99': np.percentile(samples, 99),
'cvar_95': np.mean(samples[samples > np.percentile(samples, 95)]),
'cvar_99': np.mean(samples[samples > np.percentile(samples, 99)]),
'conditional_mean': conditional_mean,
'conditional_std': conditional_std,
'max_loss': np.max(samples)
}
def stress_test(
self,
baseline_state: Dict[str, float],
transition_fn: Callable,
stress_scenarios: List[Dict[str, float]],
time_horizon: int = 50
) -> Dict[str, Any]:
"""
Perform stress testing under extreme scenarios.
Parameters
----------
baseline_state : dict
Baseline state
transition_fn : callable
State transition function
stress_scenarios : list
List of stress scenarios to test
time_horizon : int
Simulation horizon
Returns
-------
dict
Stress test results
"""
results = {}
for i, stress in enumerate(stress_scenarios):
# Simulate under stress
state = baseline_state.copy()
# Apply stress shock at t=0
for var, shock in stress.items():
state[var] = state.get(var, 0) + shock
# Simulate forward
trajectory = {var: [val] for var, val in state.items()}
for t in range(1, time_horizon):
noise = {var: np.random.normal(0, 0.05) for var in state}
state = transition_fn(state, t, noise)
for var, val in state.items():
trajectory[var].append(val)
results[f'stress_{i}'] = {
'scenario': stress,
'trajectory': trajectory,
'final_state': {var: vals[-1] for var, vals in trajectory.items()}
}
return results