clarkkitchen22's picture
Initial GeoBot Forecasting Framework commit
484e3bc
"""
Structural Causal Models for GeoBotv1
Implements Structural Causal Models (SCMs) for geopolitical analysis,
intervention simulation, and counterfactual reasoning. Integrates with
GeoBot 2.0 analytical framework.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable, Set, Tuple
from enum import Enum
import numpy as np
import networkx as nx
class IdentificationStrategy(Enum):
"""Strategies for identifying causal effects."""
BACKDOOR_ADJUSTMENT = "backdoor"
FRONTDOOR_ADJUSTMENT = "frontdoor"
INSTRUMENTAL_VARIABLES = "iv"
DO_CALCULUS = "do_calculus"
STRUCTURAL_EQUATIONS = "structural"
@dataclass
class StructuralEquation:
"""
Structural equation for a variable in SCM.
X := f(Pa_X, U_X)
Attributes
----------
variable : str
Variable name
parents : List[str]
Parent variables in causal graph
function : Callable
Structural function f
noise_dist : Callable
Distribution of exogenous noise U_X
description : str
Description of equation
"""
variable: str
parents: List[str]
function: Callable[[Dict[str, float]], float]
noise_dist: Callable[[int], np.ndarray]
description: str = ""
def evaluate(self, parent_values: Dict[str, float], noise: Optional[float] = None) -> float:
"""
Evaluate structural equation.
Parameters
----------
parent_values : Dict[str, float]
Values of parent variables
noise : Optional[float]
Noise value (if None, sample from distribution)
Returns
-------
float
Value of variable
"""
if noise is None:
noise = self.noise_dist(1)[0]
return self.function(parent_values) + noise
@dataclass
class Intervention:
"""
Causal intervention do(X = x).
Attributes
----------
variable : str
Variable being intervened on
value : float
Value set by intervention
description : str
Description of intervention
"""
variable: str
value: float
description: str = ""
def __repr__(self) -> str:
return f"do({self.variable} = {self.value})"
@dataclass
class Counterfactual:
"""
Counterfactual query.
"What would Y be if we had done X = x, given that we observed Z = z?"
Attributes
----------
query_variable : str
Variable being queried
intervention : Intervention
Counterfactual intervention
observations : Dict[str, float]
Observed values
"""
query_variable: str
intervention: Intervention
observations: Dict[str, float] = field(default_factory=dict)
def __repr__(self) -> str:
obs_str = ", ".join([f"{k}={v}" for k, v in self.observations.items()])
return f"{self.query_variable}_{{{self.intervention}}} | {obs_str}"
@dataclass
class CausalEffect:
"""
Estimated causal effect.
Attributes
----------
treatment : str
Treatment variable
outcome : str
Outcome variable
effect : float
Estimated average causal effect
std_error : Optional[float]
Standard error of estimate
confidence_interval : Optional[Tuple[float, float]]
Confidence interval
identification_strategy : IdentificationStrategy
How effect was identified
"""
treatment: str
outcome: str
effect: float
std_error: Optional[float] = None
confidence_interval: Optional[Tuple[float, float]] = None
identification_strategy: Optional[IdentificationStrategy] = None
def __repr__(self) -> str:
ci_str = ""
if self.confidence_interval:
ci_str = f", 95% CI: {self.confidence_interval}"
return f"ACE({self.treatment}{self.outcome}) = {self.effect:.3f}{ci_str}"
class StructuralCausalModel:
"""
Structural Causal Model for geopolitical analysis.
An SCM consists of:
1. Causal graph G (DAG)
2. Structural equations for each variable
3. Exogenous noise distributions
Enables:
- Intervention simulation (do-operator)
- Counterfactual reasoning
- Causal effect identification
"""
def __init__(self, name: str = "GeopoliticalSCM"):
"""
Initialize SCM.
Parameters
----------
name : str
Name of SCM
"""
self.name = name
self.graph = nx.DiGraph()
self.equations: Dict[str, StructuralEquation] = {}
self.exogenous_variables: Set[str] = set()
def add_equation(self, equation: StructuralEquation) -> None:
"""
Add structural equation to model.
Parameters
----------
equation : StructuralEquation
Structural equation
"""
self.equations[equation.variable] = equation
# Add to graph
self.graph.add_node(equation.variable)
for parent in equation.parents:
self.graph.add_edge(parent, equation.variable)
def add_exogenous(self, variable: str, distribution: Callable[[int], np.ndarray]) -> None:
"""
Add exogenous variable.
Parameters
----------
variable : str
Variable name
distribution : Callable
Distribution for sampling
"""
self.exogenous_variables.add(variable)
self.graph.add_node(variable)
def topological_order(self) -> List[str]:
"""
Get topological ordering of variables.
Returns
-------
List[str]
Topologically sorted variables
"""
try:
return list(nx.topological_sort(self.graph))
except nx.NetworkXError:
raise ValueError("Graph contains cycles - not a valid DAG")
def simulate(
self,
n_samples: int = 1000,
interventions: Optional[List[Intervention]] = None,
random_state: Optional[int] = None
) -> Dict[str, np.ndarray]:
"""
Simulate from SCM.
Parameters
----------
n_samples : int
Number of samples
interventions : Optional[List[Intervention]]
Interventions to apply
random_state : Optional[int]
Random seed
Returns
-------
Dict[str, np.ndarray]
Simulated data for each variable
"""
if random_state is not None:
np.random.seed(random_state)
# Intervention variables
intervention_dict = {}
if interventions:
intervention_dict = {iv.variable: iv.value for iv in interventions}
# Initialize data
data = {}
# Topological order
order = self.topological_order()
# Simulate each variable in order
for var in order:
if var in intervention_dict:
# Variable is intervened on - set to intervention value
data[var] = np.full(n_samples, intervention_dict[var])
elif var in self.exogenous_variables:
# Exogenous variable - sample from distribution
# For now, assume standard normal if not specified
data[var] = np.random.randn(n_samples)
elif var in self.equations:
# Endogenous variable - evaluate structural equation
eq = self.equations[var]
values = np.zeros(n_samples)
for i in range(n_samples):
parent_vals = {p: data[p][i] for p in eq.parents}
values[i] = eq.evaluate(parent_vals)
data[var] = values
else:
raise ValueError(f"No equation for variable {var}")
return data
def intervene(
self,
interventions: List[Intervention],
n_samples: int = 1000,
random_state: Optional[int] = None
) -> Dict[str, np.ndarray]:
"""
Simulate interventions using do-operator.
Parameters
----------
interventions : List[Intervention]
Interventions to apply
n_samples : int
Number of samples
random_state : Optional[int]
Random seed
Returns
-------
Dict[str, np.ndarray]
Post-intervention data
"""
return self.simulate(n_samples, interventions, random_state)
def estimate_causal_effect(
self,
treatment: str,
outcome: str,
n_samples: int = 10000,
treatment_values: Optional[List[float]] = None
) -> CausalEffect:
"""
Estimate average causal effect of treatment on outcome.
Parameters
----------
treatment : str
Treatment variable
outcome : str
Outcome variable
n_samples : int
Number of simulation samples
treatment_values : Optional[List[float]]
Treatment values to compare (default [0, 1])
Returns
-------
CausalEffect
Estimated causal effect
"""
if treatment_values is None:
treatment_values = [0.0, 1.0]
# Simulate under different treatment values
outcomes = []
for t_val in treatment_values:
intervention = Intervention(variable=treatment, value=t_val)
data = self.intervene([intervention], n_samples)
outcomes.append(np.mean(data[outcome]))
# Average causal effect
ace = outcomes[1] - outcomes[0]
# Bootstrap for standard error
bootstrap_effects = []
for _ in range(100):
boot_outcomes = []
for t_val in treatment_values:
intervention = Intervention(variable=treatment, value=t_val)
data = self.intervene([intervention], n_samples=1000)
boot_outcomes.append(np.mean(data[outcome]))
bootstrap_effects.append(boot_outcomes[1] - boot_outcomes[0])
std_error = np.std(bootstrap_effects)
ci = (
ace - 1.96 * std_error,
ace + 1.96 * std_error
)
return CausalEffect(
treatment=treatment,
outcome=outcome,
effect=ace,
std_error=std_error,
confidence_interval=ci,
identification_strategy=IdentificationStrategy.STRUCTURAL_EQUATIONS
)
def counterfactual_query(
self,
query: Counterfactual,
n_samples: int = 10000
) -> Dict[str, Any]:
"""
Answer counterfactual query.
Three-step process:
1. Abduction: Infer exogenous variables from observations
2. Action: Apply intervention
3. Prediction: Compute outcome
Parameters
----------
query : Counterfactual
Counterfactual query
n_samples : int
Number of samples for approximation
Returns
-------
Dict[str, Any]
Counterfactual results
"""
# Simplified counterfactual reasoning
# Full implementation would do proper abduction step
# For now, simulate with intervention
data = self.intervene([query.intervention], n_samples)
return {
'query': str(query),
'expected_value': float(np.mean(data[query.query_variable])),
'std': float(np.std(data[query.query_variable])),
'median': float(np.median(data[query.query_variable])),
'quantiles': {
'5%': float(np.quantile(data[query.query_variable], 0.05)),
'25%': float(np.quantile(data[query.query_variable], 0.25)),
'75%': float(np.quantile(data[query.query_variable], 0.75)),
'95%': float(np.quantile(data[query.query_variable], 0.95)),
}
}
def find_backdoor_paths(self, treatment: str, outcome: str) -> List[List[str]]:
"""
Find backdoor paths from treatment to outcome.
Parameters
----------
treatment : str
Treatment variable
outcome : str
Outcome variable
Returns
-------
List[List[str]]
List of backdoor paths
"""
# Create undirected version of graph
undirected = self.graph.to_undirected()
# Find all paths
try:
all_paths = list(nx.all_simple_paths(undirected, treatment, outcome))
except nx.NodeNotFound:
return []
# Filter for backdoor paths (paths that go through parent of treatment)
backdoor_paths = []
treatment_parents = set(self.graph.predecessors(treatment))
for path in all_paths:
# Check if path starts with an edge into treatment
if len(path) > 1 and path[1] in treatment_parents:
backdoor_paths.append(path)
return backdoor_paths
def find_backdoor_adjustment_set(
self,
treatment: str,
outcome: str
) -> Optional[Set[str]]:
"""
Find minimal backdoor adjustment set.
Parameters
----------
treatment : str
Treatment variable
outcome : str
Outcome variable
Returns
-------
Optional[Set[str]]
Backdoor adjustment set, or None if no valid set exists
"""
backdoor_paths = self.find_backdoor_paths(treatment, outcome)
if not backdoor_paths:
return set() # No backdoor paths, empty set suffices
# Find minimal set that blocks all backdoor paths
# This is simplified - full implementation would use
# proper d-separation testing
# Collect all variables in backdoor paths
candidates = set()
for path in backdoor_paths:
candidates.update(path[1:-1]) # Exclude treatment and outcome
# Remove descendants of treatment (would create bias)
treatment_descendants = nx.descendants(self.graph, treatment)
candidates -= treatment_descendants
return candidates
def plot_graph(self, filename: Optional[str] = None) -> None:
"""
Plot causal graph.
Parameters
----------
filename : Optional[str]
File to save plot to
"""
try:
import matplotlib.pyplot as plt
pos = nx.spring_layout(self.graph)
nx.draw(
self.graph,
pos,
with_labels=True,
node_color='lightblue',
node_size=1500,
font_size=10,
font_weight='bold',
arrows=True,
arrowsize=20
)
if filename:
plt.savefig(filename)
else:
plt.show()
except ImportError:
print("matplotlib not available for plotting")
def estimate_causal_effect(
scm: StructuralCausalModel,
treatment: str,
outcome: str,
adjustment_set: Optional[Set[str]] = None,
n_samples: int = 10000
) -> CausalEffect:
"""
Estimate causal effect using appropriate identification strategy.
Parameters
----------
scm : StructuralCausalModel
Structural causal model
treatment : str
Treatment variable
outcome : str
Outcome variable
adjustment_set : Optional[Set[str]]
Variables to adjust for (if None, use backdoor criterion)
n_samples : int
Number of samples
Returns
-------
CausalEffect
Estimated causal effect
"""
# Find adjustment set if not provided
if adjustment_set is None:
adjustment_set = scm.find_backdoor_adjustment_set(treatment, outcome)
if adjustment_set is None:
# Try frontdoor or IV
raise ValueError("Cannot identify causal effect - no valid adjustment set")
# Use structural equations for direct estimation
return scm.estimate_causal_effect(treatment, outcome, n_samples)
# ============================================================================
# Predefined SCMs for Geopolitical Analysis
# ============================================================================
def create_sanctions_scm() -> StructuralCausalModel:
"""
Create SCM for sanctions analysis.
Variables:
- sanctions: Binary sanctions imposed
- trade_disruption: Trade flow disruption
- economic_growth: Economic growth rate
- regime_stability: Regime stability score
Returns
-------
StructuralCausalModel
Sanctions SCM
"""
scm = StructuralCausalModel(name="SanctionsSCM")
# Exogenous noise (simplified as standard normal)
noise_dist = lambda n: np.random.randn(n) * 0.1
# Trade disruption = f(sanctions) + noise
scm.add_equation(StructuralEquation(
variable="trade_disruption",
parents=["sanctions"],
function=lambda p: 0.7 * p["sanctions"],
noise_dist=noise_dist,
description="Sanctions directly reduce trade"
))
# Economic growth = f(trade_disruption) + noise
scm.add_equation(StructuralEquation(
variable="economic_growth",
parents=["trade_disruption"],
function=lambda p: 0.05 - 0.4 * p["trade_disruption"],
noise_dist=noise_dist,
description="Trade disruption reduces growth"
))
# Regime stability = f(economic_growth) + noise
scm.add_equation(StructuralEquation(
variable="regime_stability",
parents=["economic_growth"],
function=lambda p: 0.7 + 0.5 * p["economic_growth"],
noise_dist=noise_dist,
description="Economic growth affects regime stability"
))
return scm
def create_conflict_escalation_scm() -> StructuralCausalModel:
"""
Create SCM for conflict escalation.
Variables:
- military_buildup: Military force buildup
- diplomatic_tension: Diplomatic relations tension
- conflict_risk: Risk of armed conflict
Returns
-------
StructuralCausalModel
Conflict escalation SCM
"""
scm = StructuralCausalModel(name="ConflictEscalationSCM")
noise_dist = lambda n: np.random.randn(n) * 0.05
# Diplomatic tension = f(military_buildup) + noise
scm.add_equation(StructuralEquation(
variable="diplomatic_tension",
parents=["military_buildup"],
function=lambda p: 0.3 + 0.6 * p["military_buildup"],
noise_dist=noise_dist,
description="Military buildup increases diplomatic tension"
))
# Conflict risk = f(military_buildup, diplomatic_tension) + noise
scm.add_equation(StructuralEquation(
variable="conflict_risk",
parents=["military_buildup", "diplomatic_tension"],
function=lambda p: 0.1 + 0.4 * p["military_buildup"] + 0.3 * p["diplomatic_tension"],
noise_dist=noise_dist,
description="Both military buildup and tension increase conflict risk"
))
return scm