clarkkitchen22's picture
Initial GeoBot Forecasting Framework commit
484e3bc
"""
Hawkes Processes and Point Process Models for Conflict Modeling
Hawkes processes are self-exciting point processes where past events increase
the probability of future events. Critical for modeling:
- Conflict contagion and escalation dynamics
- Terrorist attack clustering
- Diplomatic incident cascades
- Arms race dynamics
- Protest contagion
Mathematical foundation:
λ(t) = μ + ∫_{-∞}^t φ(t - s) dN(s)
where:
- λ(t): instantaneous event rate (intensity)
- μ: baseline rate
- φ(t): excitation kernel (how past events affect current rate)
- N(s): counting process of past events
Key concepts:
- Branching ratio: Expected number of offspring events per parent
- If branching ratio < 1: process is stable (subcritical)
- If branching ratio ≥ 1: process is explosive (supercritical)
"""
import numpy as np
from scipy import optimize, stats, integrate
from typing import List, Tuple, Optional, Callable, Dict
from dataclasses import dataclass
import warnings
@dataclass
class HawkesParameters:
"""Parameters for a Hawkes process."""
mu: float # Baseline intensity
alpha: float # Excitation amplitude
beta: float # Decay rate
@property
def branching_ratio(self) -> float:
"""Expected number of offspring per event."""
return self.alpha / self.beta
@property
def is_stable(self) -> bool:
"""Check if process is subcritical (stable)."""
return self.branching_ratio < 1.0
@dataclass
class HawkesFitResult:
"""Results from fitting a Hawkes process."""
params: HawkesParameters
log_likelihood: float
aic: float
bic: float
n_events: int
time_span: float
intensity_trace: Optional[np.ndarray] = None
times: Optional[np.ndarray] = None
class UnivariateHawkesProcess:
"""
Univariate (1-dimensional) Hawkes Process.
Intensity function:
λ(t) = μ + α ∑_{t_i < t} exp(-β(t - t_i))
This is a self-exciting process where each event increases future intensity.
Example:
>>> hawkes = UnivariateHawkesProcess()
>>> events = hawkes.simulate(mu=0.5, alpha=0.8, beta=1.5, T=100.0)
>>> result = hawkes.fit(events, T=100.0)
>>> print(f"Branching ratio: {result.params.branching_ratio:.3f}")
>>> prediction = hawkes.predict_intensity(events, result.params, t=105.0)
"""
def __init__(self, kernel: str = 'exponential'):
"""
Initialize Hawkes process.
Args:
kernel: Excitation kernel type ('exponential', 'power_law')
"""
self.kernel = kernel
def simulate(self, mu: float, alpha: float, beta: float, T: float,
max_events: int = 10000) -> np.ndarray:
"""
Simulate Hawkes process using Ogata's thinning algorithm.
Args:
mu: Baseline intensity
alpha: Excitation amplitude
beta: Decay rate
T: Time horizon
max_events: Maximum number of events to generate
Returns:
Array of event times
"""
events = []
t = 0.0
lambda_star = mu # Upper bound on intensity
while t < T and len(events) < max_events:
# Generate candidate event
lambda_star = self._compute_intensity(t, events, mu, alpha, beta)
# Add safety margin
lambda_star = lambda_star * 1.1 + 0.01
# Draw inter-event time from exponential
u = np.random.uniform()
if lambda_star <= 0:
break
t = t - np.log(u) / lambda_star
if t > T:
break
# Acceptance-rejection
lambda_t = self._compute_intensity(t, events, mu, alpha, beta)
D = np.random.uniform()
if D * lambda_star <= lambda_t:
events.append(t)
return np.array(events)
def fit(self, events: np.ndarray, T: float,
initial_guess: Optional[Tuple[float, float, float]] = None) -> HawkesFitResult:
"""
Fit Hawkes process parameters using maximum likelihood.
Args:
events: Array of event times
T: Time horizon (observation period end)
initial_guess: Initial parameter guess (mu, alpha, beta)
Returns:
HawkesFitResult with estimated parameters
"""
events = np.asarray(events)
events = np.sort(events)
n_events = len(events)
if initial_guess is None:
# Initialize with reasonable defaults
mu_init = n_events / T # Average rate
alpha_init = mu_init * 0.5 # Conservative excitation
beta_init = 1.0
initial_guess = (mu_init, alpha_init, beta_init)
# Define negative log-likelihood
def neg_log_likelihood(params):
mu, alpha, beta = params
# Constrain to positive values
if mu <= 0 or alpha <= 0 or beta <= 0:
return 1e10
# Check stability
if alpha / beta >= 1.0:
return 1e10 # Penalize explosive processes
return -self._log_likelihood(events, T, mu, alpha, beta)
# Optimize
bounds = [(1e-6, None), (1e-6, None), (1e-6, None)]
result = optimize.minimize(
neg_log_likelihood,
x0=initial_guess,
method='L-BFGS-B',
bounds=bounds
)
if not result.success:
warnings.warn(f"Optimization did not converge: {result.message}")
mu_opt, alpha_opt, beta_opt = result.x
log_likelihood = -result.fun
# Compute information criteria
n_params = 3
aic = -2 * log_likelihood + 2 * n_params
bic = -2 * log_likelihood + np.log(n_events) * n_params
params = HawkesParameters(mu=mu_opt, alpha=alpha_opt, beta=beta_opt)
return HawkesFitResult(
params=params,
log_likelihood=log_likelihood,
aic=aic,
bic=bic,
n_events=n_events,
time_span=T
)
def predict_intensity(self, events: np.ndarray, params: HawkesParameters,
t: float) -> float:
"""
Predict intensity at time t given past events.
Args:
events: Past event times (must be < t)
params: Hawkes parameters
t: Time to predict intensity
Returns:
Intensity λ(t)
"""
return self._compute_intensity(t, events, params.mu, params.alpha, params.beta)
def _compute_intensity(self, t: float, events: List[float],
mu: float, alpha: float, beta: float) -> float:
"""Compute intensity at time t."""
if len(events) == 0:
return mu
events_array = np.asarray(events)
past_events = events_array[events_array < t]
if len(past_events) == 0:
return mu
# Exponential kernel
excitation = alpha * np.sum(np.exp(-beta * (t - past_events)))
return mu + excitation
def _log_likelihood(self, events: np.ndarray, T: float,
mu: float, alpha: float, beta: float) -> float:
"""
Compute log-likelihood for Hawkes process.
LL = ∑_i log(λ(t_i)) - ∫_0^T λ(s) ds
"""
n_events = len(events)
if n_events == 0:
return -mu * T
# First term: ∑ log(λ(t_i))
log_sum = 0.0
for i, t_i in enumerate(events):
lambda_i = self._compute_intensity(t_i, events[:i], mu, alpha, beta)
if lambda_i <= 0:
return -np.inf
log_sum += np.log(lambda_i)
# Second term: ∫_0^T λ(s) ds
# For exponential kernel, this has closed form:
# ∫_0^T λ(s) ds = μT + α ∑_i (1 - exp(-β(T - t_i))) / β
integral = mu * T
integral += alpha * np.sum(1 - np.exp(-beta * (T - events))) / beta
return log_sum - integral
class MultivariateHawkesProcess:
"""
Multivariate Hawkes Process for multiple interacting event streams.
For K event types, the intensity of type k is:
λ_k(t) = μ_k + ∑_{j=1}^K α_{kj} ∑_{t_i^j < t} φ_{kj}(t - t_i^j)
This captures cross-excitation between different event types.
Example: Conflict in country A affects conflict probability in country B.
Example:
>>> # Model 3 countries with mutual excitation
>>> hawkes = MultivariateHawkesProcess(n_dimensions=3)
>>> events = hawkes.simulate(
... mu=np.array([0.5, 0.3, 0.4]),
... alpha=np.array([[0.2, 0.1, 0.05],
... [0.15, 0.3, 0.1],
... [0.1, 0.1, 0.25]]),
... beta=np.ones((3, 3)),
... T=100.0
... )
>>> result = hawkes.fit(events, T=100.0)
"""
def __init__(self, n_dimensions: int, kernel: str = 'exponential'):
"""
Initialize multivariate Hawkes process.
Args:
n_dimensions: Number of event types (dimensions)
kernel: Excitation kernel type
"""
self.n_dimensions = n_dimensions
self.kernel = kernel
def simulate(self, mu: np.ndarray, alpha: np.ndarray, beta: np.ndarray,
T: float, max_events: int = 10000) -> List[List[float]]:
"""
Simulate multivariate Hawkes process.
Args:
mu: Baseline intensities, shape (K,)
alpha: Excitation matrix, shape (K, K)
alpha[i,j] = effect of event type j on type i
beta: Decay rates, shape (K, K)
T: Time horizon
max_events: Maximum total events
Returns:
List of event lists, one per dimension
"""
K = self.n_dimensions
events = [[] for _ in range(K)]
total_events = 0
t = 0.0
lambda_star = np.sum(mu) * 2 # Initial upper bound
while t < T and total_events < max_events:
# Compute current intensities
intensities = self._compute_intensities(t, events, mu, alpha, beta)
lambda_star = max(np.sum(intensities) * 1.5, 0.01)
# Generate candidate event
u = np.random.uniform()
t = t - np.log(u) / lambda_star
if t > T:
break
# Which process?
intensities_t = self._compute_intensities(t, events, mu, alpha, beta)
total_intensity = np.sum(intensities_t)
D = np.random.uniform()
if D * lambda_star <= total_intensity:
# Accept event, determine which dimension
probs = intensities_t / total_intensity
dimension = np.random.choice(K, p=probs)
events[dimension].append(t)
total_events += 1
return events
def fit(self, events: List[List[float]], T: float) -> Dict:
"""
Fit multivariate Hawkes process.
Args:
events: List of event lists, one per dimension
T: Time horizon
Returns:
Dictionary with estimated parameters
"""
K = self.n_dimensions
# Convert to arrays
events_arrays = [np.asarray(e) for e in events]
# Initialize parameters
n_events = [len(e) for e in events_arrays]
mu_init = np.array([n / T for n in n_events])
# Simple initialization: assume weak cross-excitation
alpha_init = np.zeros((K, K))
for i in range(K):
alpha_init[i, i] = mu_init[i] * 0.3 # Self-excitation
for j in range(K):
if i != j:
alpha_init[i, j] = mu_init[i] * 0.1 # Cross-excitation
beta_init = np.ones((K, K))
# Flatten parameters for optimization
def pack_params(mu, alpha, beta):
return np.concatenate([mu.flatten(), alpha.flatten(), beta.flatten()])
def unpack_params(x):
mu = x[:K]
alpha = x[K:K + K*K].reshape(K, K)
beta = x[K + K*K:].reshape(K, K)
return mu, alpha, beta
# Negative log-likelihood
def neg_log_likelihood(x):
mu, alpha, beta = unpack_params(x)
# Constraints
if np.any(mu <= 0) or np.any(alpha < 0) or np.any(beta <= 0):
return 1e10
# Stability check (approximate)
branching_ratios = alpha / beta
if np.max(np.linalg.eigvals(branching_ratios).real) >= 0.99:
return 1e10
return -self._log_likelihood(events_arrays, T, mu, alpha, beta)
# Optimize
x0 = pack_params(mu_init, alpha_init, beta_init)
bounds = [(1e-6, None)] * len(x0) # All positive
result = optimize.minimize(
neg_log_likelihood,
x0=x0,
method='L-BFGS-B',
bounds=bounds
)
mu_opt, alpha_opt, beta_opt = unpack_params(result.x)
log_likelihood = -result.fun
# Information criteria
n_params = len(x0)
total_events = sum(n_events)
aic = -2 * log_likelihood + 2 * n_params
bic = -2 * log_likelihood + np.log(total_events) * n_params
return {
'mu': mu_opt,
'alpha': alpha_opt,
'beta': beta_opt,
'branching_matrix': alpha_opt / beta_opt,
'spectral_radius': np.max(np.abs(np.linalg.eigvals(alpha_opt / beta_opt))),
'log_likelihood': log_likelihood,
'aic': aic,
'bic': bic,
'n_events': n_events,
'converged': result.success
}
def predict_intensities(self, events: List[List[float]],
mu: np.ndarray, alpha: np.ndarray, beta: np.ndarray,
t: float) -> np.ndarray:
"""
Predict intensities for all dimensions at time t.
Args:
events: Past events
mu, alpha, beta: Parameters
t: Time to predict
Returns:
Intensity vector, shape (K,)
"""
return self._compute_intensities(t, events, mu, alpha, beta)
def _compute_intensities(self, t: float, events: List[List[float]],
mu: np.ndarray, alpha: np.ndarray,
beta: np.ndarray) -> np.ndarray:
"""Compute intensity vector at time t."""
K = self.n_dimensions
intensities = mu.copy()
for k in range(K):
for j in range(K):
if len(events[j]) > 0:
events_j = np.asarray(events[j])
past_events = events_j[events_j < t]
if len(past_events) > 0:
excitation = alpha[k, j] * np.sum(
np.exp(-beta[k, j] * (t - past_events))
)
intensities[k] += excitation
return intensities
def _log_likelihood(self, events: List[np.ndarray], T: float,
mu: np.ndarray, alpha: np.ndarray,
beta: np.ndarray) -> float:
"""Compute log-likelihood for multivariate process."""
K = self.n_dimensions
log_sum = 0.0
# First term: ∑_k ∑_i log(λ_k(t_i^k))
for k in range(K):
if len(events[k]) == 0:
continue
for i, t_i in enumerate(events[k]):
# Need to compute λ_k(t_i) considering all past events
events_up_to_i = [[] for _ in range(K)]
for j in range(K):
events_up_to_i[j] = events[j][events[j] < t_i].tolist()
intensities = self._compute_intensities(t_i, events_up_to_i, mu, alpha, beta)
lambda_k = intensities[k]
if lambda_k <= 0:
return -np.inf
log_sum += np.log(lambda_k)
# Second term: -∫_0^T ∑_k λ_k(s) ds
integral = np.sum(mu) * T
for k in range(K):
for j in range(K):
if len(events[j]) > 0:
integral += alpha[k, j] * np.sum(
(1 - np.exp(-beta[k, j] * (T - events[j]))) / beta[k, j]
)
return log_sum - integral
class ConflictContagionModel:
"""
Specialized Hawkes model for geopolitical conflict contagion.
Features:
- Models both self-excitation (conflict escalation within a country)
- Models cross-excitation (conflict spreading between countries)
- Incorporates spatial/network structure
- Estimates contagion risk and early warning indicators
Example:
>>> countries = ['Syria', 'Iraq', 'Turkey']
>>> model = ConflictContagionModel(countries=countries)
>>>
>>> # Fit to historical conflict events
>>> events = {
... 'Syria': [1.2, 5.3, 10.1, ...],
... 'Iraq': [3.4, 8.9, ...],
... 'Turkey': [12.3, ...]
... }
>>> result = model.fit(events, T=365.0) # 1 year
>>>
>>> # Predict contagion risk
>>> risk = model.contagion_risk(events, result, t=370.0)
>>> print(f"Syria conflict risk in next 5 days: {risk['Syria']:.2%}")
"""
def __init__(self, countries: List[str]):
"""
Initialize conflict contagion model.
Args:
countries: List of country names
"""
self.countries = countries
self.n_countries = len(countries)
self.hawkes = MultivariateHawkesProcess(n_dimensions=self.n_countries)
def fit(self, events: Dict[str, List[float]], T: float) -> Dict:
"""
Fit contagion model to conflict events.
Args:
events: Dictionary mapping country name to list of event times
T: Observation period
Returns:
Fitted parameters with interpretation
"""
# Convert to list format
events_list = [events[country] for country in self.countries]
# Fit multivariate Hawkes
result = self.hawkes.fit(events_list, T)
# Add interpretations
result['countries'] = self.countries
result['self_excitation'] = np.diag(result['alpha'])
result['cross_excitation_mean'] = np.mean(
result['alpha'][~np.eye(self.n_countries, dtype=bool)]
)
# Identify most contagious countries
outgoing_contagion = np.sum(result['alpha'], axis=0) - np.diag(result['alpha'])
incoming_contagion = np.sum(result['alpha'], axis=1) - np.diag(result['alpha'])
result['most_contagious_source'] = self.countries[np.argmax(outgoing_contagion)]
result['most_vulnerable_target'] = self.countries[np.argmax(incoming_contagion)]
return result
def contagion_risk(self, events: Dict[str, List[float]],
params: Dict, t: float, horizon: float = 5.0) -> Dict[str, float]:
"""
Estimate contagion risk over next time period.
Args:
events: Historical events
params: Fitted parameters
t: Current time
horizon: Risk horizon (time units)
Returns:
Dictionary mapping country to probability of conflict
"""
events_list = [events[country] for country in self.countries]
# Compute current intensities
intensities = self.hawkes.predict_intensities(
events_list, params['mu'], params['alpha'], params['beta'], t
)
# Probability of at least one event in [t, t+horizon]
# P(N(t+h) - N(t) ≥ 1) = 1 - P(N(t+h) - N(t) = 0)
# Approximate with constant intensity
risks = {}
for i, country in enumerate(self.countries):
# Poisson approximation
expected_events = intensities[i] * horizon
prob_no_event = np.exp(-expected_events)
prob_at_least_one = 1 - prob_no_event
risks[country] = prob_at_least_one
return risks
def identify_contagion_pathways(self, params: Dict, threshold: float = 0.1) -> List[Tuple[str, str, float]]:
"""
Identify significant contagion pathways between countries.
Args:
params: Fitted parameters
threshold: Minimum branching ratio to report
Returns:
List of (source, target, branching_ratio) tuples
"""
alpha = params['alpha']
beta = params['beta']
branching = alpha / beta
pathways = []
for i in range(self.n_countries):
for j in range(self.n_countries):
if i != j and branching[i, j] > threshold:
pathways.append((
self.countries[j], # Source
self.countries[i], # Target
branching[i, j]
))
# Sort by strength
pathways.sort(key=lambda x: x[2], reverse=True)
return pathways
def estimate_branching_ratio(events: np.ndarray, T: float) -> float:
"""
Quick estimate of branching ratio for stability assessment.
Args:
events: Event times
T: Time horizon
Returns:
Estimated branching ratio
"""
hawkes = UnivariateHawkesProcess()
result = hawkes.fit(events, T)
return result.params.branching_ratio
def detect_explosive_regime(events: np.ndarray, T: float, window: float = 10.0) -> List[Tuple[float, float]]:
"""
Detect time periods where process became explosive (supercritical).
Args:
events: Event times
T: Total time horizon
window: Rolling window size
Returns:
List of (start_time, branching_ratio) for explosive periods
"""
events = np.sort(events)
explosive_periods = []
t = window
while t <= T:
# Events in window [t-window, t]
window_events = events[(events >= t - window) & (events <= t)]
if len(window_events) > 5: # Need minimum events
br = estimate_branching_ratio(window_events - (t - window), window)
if br >= 0.9: # Near or above critical
explosive_periods.append((t, br))
t += window / 2 # Overlapping windows
return explosive_periods