|
|
""" |
|
|
Bayesian Forecasting Module for GeoBotv1 |
|
|
|
|
|
Provides Bayesian belief updating, prior construction, and probabilistic forecasting |
|
|
for geopolitical scenarios. Integrates with GeoBot 2.0 analytical framework. |
|
|
""" |
|
|
|
|
|
from dataclasses import dataclass, field |
|
|
from typing import Dict, List, Any, Optional, Callable, Tuple |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from scipy import stats |
|
|
from scipy.optimize import minimize |
|
|
|
|
|
|
|
|
class PriorType(Enum): |
|
|
"""Types of prior distributions.""" |
|
|
UNIFORM = "uniform" |
|
|
NORMAL = "normal" |
|
|
BETA = "beta" |
|
|
GAMMA = "gamma" |
|
|
EXPERT_INFORMED = "expert_informed" |
|
|
HISTORICAL = "historical" |
|
|
|
|
|
|
|
|
class EvidenceType(Enum): |
|
|
"""Types of evidence for belief updating.""" |
|
|
INTELLIGENCE_REPORT = "intelligence_report" |
|
|
SATELLITE_IMAGERY = "satellite_imagery" |
|
|
ECONOMIC_DATA = "economic_data" |
|
|
MILITARY_MOVEMENT = "military_movement" |
|
|
DIPLOMATIC_SIGNAL = "diplomatic_signal" |
|
|
OPEN_SOURCE = "open_source" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class GeopoliticalPrior: |
|
|
""" |
|
|
Prior distribution for geopolitical parameter. |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
parameter_name : str |
|
|
Name of the parameter |
|
|
prior_type : PriorType |
|
|
Type of prior distribution |
|
|
parameters : Dict[str, float] |
|
|
Distribution parameters |
|
|
description : str |
|
|
Description of what this parameter represents |
|
|
""" |
|
|
parameter_name: str |
|
|
prior_type: PriorType |
|
|
parameters: Dict[str, float] |
|
|
description: str = "" |
|
|
|
|
|
def sample(self, n_samples: int = 1000, random_state: Optional[int] = None) -> np.ndarray: |
|
|
""" |
|
|
Sample from prior distribution. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
n_samples : int |
|
|
Number of samples |
|
|
random_state : Optional[int] |
|
|
Random seed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
np.ndarray |
|
|
Samples from prior |
|
|
""" |
|
|
if random_state is not None: |
|
|
np.random.seed(random_state) |
|
|
|
|
|
if self.prior_type == PriorType.UNIFORM: |
|
|
low = self.parameters['low'] |
|
|
high = self.parameters['high'] |
|
|
return np.random.uniform(low, high, n_samples) |
|
|
|
|
|
elif self.prior_type == PriorType.NORMAL: |
|
|
mean = self.parameters['mean'] |
|
|
std = self.parameters['std'] |
|
|
return np.random.normal(mean, std, n_samples) |
|
|
|
|
|
elif self.prior_type == PriorType.BETA: |
|
|
alpha = self.parameters['alpha'] |
|
|
beta = self.parameters['beta'] |
|
|
return np.random.beta(alpha, beta, n_samples) |
|
|
|
|
|
elif self.prior_type == PriorType.GAMMA: |
|
|
shape = self.parameters['shape'] |
|
|
scale = self.parameters['scale'] |
|
|
return np.random.gamma(shape, scale, n_samples) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Sampling not implemented for {self.prior_type}") |
|
|
|
|
|
def pdf(self, x: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Compute probability density function. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
x : np.ndarray |
|
|
Points at which to evaluate PDF |
|
|
|
|
|
Returns |
|
|
------- |
|
|
np.ndarray |
|
|
PDF values |
|
|
""" |
|
|
if self.prior_type == PriorType.UNIFORM: |
|
|
low = self.parameters['low'] |
|
|
high = self.parameters['high'] |
|
|
return stats.uniform.pdf(x, loc=low, scale=high-low) |
|
|
|
|
|
elif self.prior_type == PriorType.NORMAL: |
|
|
mean = self.parameters['mean'] |
|
|
std = self.parameters['std'] |
|
|
return stats.norm.pdf(x, loc=mean, scale=std) |
|
|
|
|
|
elif self.prior_type == PriorType.BETA: |
|
|
alpha = self.parameters['alpha'] |
|
|
beta = self.parameters['beta'] |
|
|
return stats.beta.pdf(x, alpha, beta) |
|
|
|
|
|
elif self.prior_type == PriorType.GAMMA: |
|
|
shape = self.parameters['shape'] |
|
|
scale = self.parameters['scale'] |
|
|
return stats.gamma.pdf(x, shape, scale=scale) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"PDF not implemented for {self.prior_type}") |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class EvidenceUpdate: |
|
|
""" |
|
|
Evidence for Bayesian belief updating. |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
evidence_type : EvidenceType |
|
|
Type of evidence |
|
|
observation : Any |
|
|
Observed value or data |
|
|
likelihood_function : Callable |
|
|
Function mapping parameters to likelihood of observation |
|
|
reliability : float |
|
|
Reliability score [0, 1] |
|
|
source : str |
|
|
Source of evidence |
|
|
timestamp : Optional[str] |
|
|
When evidence was collected |
|
|
""" |
|
|
evidence_type: EvidenceType |
|
|
observation: Any |
|
|
likelihood_function: Callable[[np.ndarray], np.ndarray] |
|
|
reliability: float = 1.0 |
|
|
source: str = "" |
|
|
timestamp: Optional[str] = None |
|
|
|
|
|
def compute_likelihood(self, parameter_values: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Compute likelihood of observation given parameter values. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
parameter_values : np.ndarray |
|
|
Parameter values |
|
|
|
|
|
Returns |
|
|
------- |
|
|
np.ndarray |
|
|
Likelihood values |
|
|
""" |
|
|
base_likelihood = self.likelihood_function(parameter_values) |
|
|
|
|
|
return base_likelihood ** self.reliability |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BeliefState: |
|
|
""" |
|
|
Current belief state (posterior distribution). |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
parameter_name : str |
|
|
Name of parameter |
|
|
posterior_samples : np.ndarray |
|
|
Samples from posterior distribution |
|
|
prior : GeopoliticalPrior |
|
|
Original prior |
|
|
evidence_history : List[EvidenceUpdate] |
|
|
Evidence used to update beliefs |
|
|
""" |
|
|
parameter_name: str |
|
|
posterior_samples: np.ndarray |
|
|
prior: GeopoliticalPrior |
|
|
evidence_history: List[EvidenceUpdate] = field(default_factory=list) |
|
|
|
|
|
def mean(self) -> float: |
|
|
"""Posterior mean.""" |
|
|
return float(np.mean(self.posterior_samples)) |
|
|
|
|
|
def median(self) -> float: |
|
|
"""Posterior median.""" |
|
|
return float(np.median(self.posterior_samples)) |
|
|
|
|
|
def std(self) -> float: |
|
|
"""Posterior standard deviation.""" |
|
|
return float(np.std(self.posterior_samples)) |
|
|
|
|
|
def quantile(self, q: float) -> float: |
|
|
""" |
|
|
Posterior quantile. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
q : float |
|
|
Quantile in [0, 1] |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Quantile value |
|
|
""" |
|
|
return float(np.quantile(self.posterior_samples, q)) |
|
|
|
|
|
def credible_interval(self, alpha: float = 0.05) -> Tuple[float, float]: |
|
|
""" |
|
|
Compute credible interval. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
alpha : float |
|
|
Significance level (default 0.05 for 95% CI) |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Tuple[float, float] |
|
|
Lower and upper bounds |
|
|
""" |
|
|
lower = self.quantile(alpha / 2) |
|
|
upper = self.quantile(1 - alpha / 2) |
|
|
return (lower, upper) |
|
|
|
|
|
def probability_greater_than(self, threshold: float) -> float: |
|
|
""" |
|
|
Compute P(parameter > threshold | evidence). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
threshold : float |
|
|
Threshold value |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Probability |
|
|
""" |
|
|
return float(np.mean(self.posterior_samples > threshold)) |
|
|
|
|
|
def probability_in_range(self, low: float, high: float) -> float: |
|
|
""" |
|
|
Compute P(low < parameter < high | evidence). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
low : float |
|
|
Lower bound |
|
|
high : float |
|
|
Upper bound |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Probability |
|
|
""" |
|
|
return float(np.mean((self.posterior_samples > low) & |
|
|
(self.posterior_samples < high))) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CredibleInterval: |
|
|
"""Credible interval for forecast.""" |
|
|
lower: float |
|
|
upper: float |
|
|
alpha: float |
|
|
|
|
|
@property |
|
|
def width(self) -> float: |
|
|
"""Interval width.""" |
|
|
return self.upper - self.lower |
|
|
|
|
|
@property |
|
|
def credibility(self) -> float: |
|
|
"""Credibility level (e.g., 0.95 for 95% CI).""" |
|
|
return 1 - self.alpha |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ForecastDistribution: |
|
|
""" |
|
|
Predictive distribution for geopolitical forecast. |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
variable_name : str |
|
|
Name of forecasted variable |
|
|
samples : np.ndarray |
|
|
Samples from predictive distribution |
|
|
time_horizon : int |
|
|
Forecast horizon (days, months, etc.) |
|
|
conditioning_info : Dict[str, Any] |
|
|
Information conditioned on |
|
|
""" |
|
|
variable_name: str |
|
|
samples: np.ndarray |
|
|
time_horizon: int |
|
|
conditioning_info: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
def point_forecast(self, method: str = 'mean') -> float: |
|
|
""" |
|
|
Point forecast. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
method : str |
|
|
'mean', 'median', or 'mode' |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Point forecast |
|
|
""" |
|
|
if method == 'mean': |
|
|
return float(np.mean(self.samples)) |
|
|
elif method == 'median': |
|
|
return float(np.median(self.samples)) |
|
|
elif method == 'mode': |
|
|
|
|
|
from scipy.stats import gaussian_kde |
|
|
kde = gaussian_kde(self.samples) |
|
|
x = np.linspace(self.samples.min(), self.samples.max(), 1000) |
|
|
return float(x[np.argmax(kde(x))]) |
|
|
else: |
|
|
raise ValueError(f"Unknown method: {method}") |
|
|
|
|
|
def credible_interval(self, alpha: float = 0.05) -> CredibleInterval: |
|
|
""" |
|
|
Compute credible interval. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
alpha : float |
|
|
Significance level |
|
|
|
|
|
Returns |
|
|
------- |
|
|
CredibleInterval |
|
|
Credible interval |
|
|
""" |
|
|
lower = float(np.quantile(self.samples, alpha / 2)) |
|
|
upper = float(np.quantile(self.samples, 1 - alpha / 2)) |
|
|
return CredibleInterval(lower=lower, upper=upper, alpha=alpha) |
|
|
|
|
|
def probability_of_event(self, condition: Callable[[np.ndarray], np.ndarray]) -> float: |
|
|
""" |
|
|
Probability of event defined by condition. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
condition : Callable |
|
|
Function that returns True/False for each sample |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Probability |
|
|
""" |
|
|
return float(np.mean(condition(self.samples))) |
|
|
|
|
|
|
|
|
class BayesianForecaster: |
|
|
""" |
|
|
Bayesian forecasting engine for geopolitical analysis. |
|
|
|
|
|
Integrates with GeoBot 2.0 analytical framework to provide |
|
|
probabilistic forecasts with explicit uncertainty quantification. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize Bayesian forecaster.""" |
|
|
self.priors: Dict[str, GeopoliticalPrior] = {} |
|
|
self.beliefs: Dict[str, BeliefState] = {} |
|
|
|
|
|
def set_prior(self, prior: GeopoliticalPrior) -> None: |
|
|
""" |
|
|
Set prior distribution for parameter. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
prior : GeopoliticalPrior |
|
|
Prior distribution |
|
|
""" |
|
|
self.priors[prior.parameter_name] = prior |
|
|
|
|
|
def update_belief( |
|
|
self, |
|
|
parameter_name: str, |
|
|
evidence: EvidenceUpdate, |
|
|
n_samples: int = 10000, |
|
|
method: str = 'importance_sampling' |
|
|
) -> BeliefState: |
|
|
""" |
|
|
Update beliefs using Bayes' rule. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
parameter_name : str |
|
|
Parameter to update |
|
|
evidence : EvidenceUpdate |
|
|
New evidence |
|
|
n_samples : int |
|
|
Number of samples for approximation |
|
|
method : str |
|
|
'importance_sampling' or 'rejection_sampling' |
|
|
|
|
|
Returns |
|
|
------- |
|
|
BeliefState |
|
|
Updated belief state |
|
|
""" |
|
|
if parameter_name not in self.priors: |
|
|
raise ValueError(f"No prior set for {parameter_name}") |
|
|
|
|
|
|
|
|
if parameter_name in self.beliefs: |
|
|
|
|
|
prior_samples = self.beliefs[parameter_name].posterior_samples |
|
|
else: |
|
|
|
|
|
prior_samples = self.priors[parameter_name].sample(n_samples) |
|
|
|
|
|
|
|
|
likelihoods = evidence.compute_likelihood(prior_samples) |
|
|
|
|
|
if method == 'importance_sampling': |
|
|
|
|
|
weights = likelihoods / np.sum(likelihoods) |
|
|
|
|
|
|
|
|
indices = np.random.choice( |
|
|
len(prior_samples), |
|
|
size=n_samples, |
|
|
replace=True, |
|
|
p=weights |
|
|
) |
|
|
posterior_samples = prior_samples[indices] |
|
|
|
|
|
elif method == 'rejection_sampling': |
|
|
|
|
|
max_likelihood = np.max(likelihoods) |
|
|
accepted = [] |
|
|
|
|
|
for sample, likelihood in zip(prior_samples, likelihoods): |
|
|
if np.random.uniform(0, max_likelihood) < likelihood: |
|
|
accepted.append(sample) |
|
|
|
|
|
if len(accepted) < 100: |
|
|
raise ValueError("Rejection sampling failed - too few accepted samples") |
|
|
|
|
|
posterior_samples = np.array(accepted) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown method: {method}") |
|
|
|
|
|
|
|
|
if parameter_name in self.beliefs: |
|
|
belief = self.beliefs[parameter_name] |
|
|
belief.posterior_samples = posterior_samples |
|
|
belief.evidence_history.append(evidence) |
|
|
else: |
|
|
belief = BeliefState( |
|
|
parameter_name=parameter_name, |
|
|
posterior_samples=posterior_samples, |
|
|
prior=self.priors[parameter_name], |
|
|
evidence_history=[evidence] |
|
|
) |
|
|
|
|
|
self.beliefs[parameter_name] = belief |
|
|
return belief |
|
|
|
|
|
def sequential_update( |
|
|
self, |
|
|
parameter_name: str, |
|
|
evidence_sequence: List[EvidenceUpdate], |
|
|
n_samples: int = 10000 |
|
|
) -> BeliefState: |
|
|
""" |
|
|
Sequential belief updating with multiple pieces of evidence. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
parameter_name : str |
|
|
Parameter to update |
|
|
evidence_sequence : List[EvidenceUpdate] |
|
|
Sequence of evidence |
|
|
n_samples : int |
|
|
Number of samples |
|
|
|
|
|
Returns |
|
|
------- |
|
|
BeliefState |
|
|
Final belief state |
|
|
""" |
|
|
for evidence in evidence_sequence: |
|
|
self.update_belief(parameter_name, evidence, n_samples) |
|
|
|
|
|
return self.beliefs[parameter_name] |
|
|
|
|
|
def forecast( |
|
|
self, |
|
|
variable_name: str, |
|
|
predictive_function: Callable[[Dict[str, float]], float], |
|
|
time_horizon: int, |
|
|
n_samples: int = 10000, |
|
|
conditioning_info: Optional[Dict[str, Any]] = None |
|
|
) -> ForecastDistribution: |
|
|
""" |
|
|
Generate probabilistic forecast. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
variable_name : str |
|
|
Variable to forecast |
|
|
predictive_function : Callable |
|
|
Function mapping parameter values to prediction |
|
|
time_horizon : int |
|
|
Forecast horizon |
|
|
n_samples : int |
|
|
Number of forecast samples |
|
|
conditioning_info : Optional[Dict[str, Any]] |
|
|
Additional conditioning information |
|
|
|
|
|
Returns |
|
|
------- |
|
|
ForecastDistribution |
|
|
Forecast distribution |
|
|
""" |
|
|
|
|
|
parameter_samples = {} |
|
|
for param_name, belief in self.beliefs.items(): |
|
|
indices = np.random.choice(len(belief.posterior_samples), size=n_samples) |
|
|
parameter_samples[param_name] = belief.posterior_samples[indices] |
|
|
|
|
|
|
|
|
forecast_samples = np.zeros(n_samples) |
|
|
for i in range(n_samples): |
|
|
params = {name: samples[i] for name, samples in parameter_samples.items()} |
|
|
forecast_samples[i] = predictive_function(params) |
|
|
|
|
|
return ForecastDistribution( |
|
|
variable_name=variable_name, |
|
|
samples=forecast_samples, |
|
|
time_horizon=time_horizon, |
|
|
conditioning_info=conditioning_info or {} |
|
|
) |
|
|
|
|
|
def model_comparison( |
|
|
self, |
|
|
models: Dict[str, Callable], |
|
|
evidence: List[EvidenceUpdate], |
|
|
prior_model_probs: Optional[Dict[str, float]] = None |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Bayesian model comparison using evidence. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
models : Dict[str, Callable] |
|
|
Dictionary of models (name -> likelihood function) |
|
|
evidence : List[EvidenceUpdate] |
|
|
Evidence for comparison |
|
|
prior_model_probs : Optional[Dict[str, float]] |
|
|
Prior model probabilities |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, float] |
|
|
Posterior model probabilities |
|
|
""" |
|
|
if prior_model_probs is None: |
|
|
|
|
|
prior_model_probs = {name: 1.0 / len(models) for name in models} |
|
|
|
|
|
|
|
|
marginal_likelihoods = {} |
|
|
|
|
|
for model_name, model_fn in models.items(): |
|
|
|
|
|
|
|
|
likelihood = 1.0 |
|
|
for ev in evidence: |
|
|
|
|
|
likelihood *= np.mean(ev.compute_likelihood(model_fn)) |
|
|
|
|
|
marginal_likelihoods[model_name] = likelihood |
|
|
|
|
|
|
|
|
posterior_probs = {} |
|
|
total = 0.0 |
|
|
|
|
|
for model_name in models: |
|
|
unnormalized = (prior_model_probs[model_name] * |
|
|
marginal_likelihoods[model_name]) |
|
|
posterior_probs[model_name] = unnormalized |
|
|
total += unnormalized |
|
|
|
|
|
|
|
|
for model_name in posterior_probs: |
|
|
posterior_probs[model_name] /= total |
|
|
|
|
|
return posterior_probs |
|
|
|
|
|
def get_belief_summary(self, parameter_name: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get summary statistics for belief state. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
parameter_name : str |
|
|
Parameter name |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, Any] |
|
|
Summary statistics |
|
|
""" |
|
|
if parameter_name not in self.beliefs: |
|
|
raise ValueError(f"No beliefs for {parameter_name}") |
|
|
|
|
|
belief = self.beliefs[parameter_name] |
|
|
ci_95 = belief.credible_interval(alpha=0.05) |
|
|
ci_90 = belief.credible_interval(alpha=0.10) |
|
|
|
|
|
return { |
|
|
'parameter': parameter_name, |
|
|
'mean': belief.mean(), |
|
|
'median': belief.median(), |
|
|
'std': belief.std(), |
|
|
'95%_CI': ci_95, |
|
|
'90%_CI': ci_90, |
|
|
'n_evidence_updates': len(belief.evidence_history), |
|
|
'evidence_types': [ev.evidence_type.value for ev in belief.evidence_history] |
|
|
} |
|
|
|