File size: 4,695 Bytes
484e3bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
"""
Hidden Markov Model for regime detection and state estimation.
"""
import numpy as np
from typing import Optional, Tuple, List
class HiddenMarkovModel:
"""
Hidden Markov Model for detecting regime changes.
Useful for identifying transitions between:
- Peace <-> Conflict
- Stable <-> Unstable
- Cooperative <-> Hostile
"""
def __init__(
self,
n_states: int,
n_observations: int
):
"""
Initialize HMM.
Parameters
----------
n_states : int
Number of hidden states
n_observations : int
Number of possible observations
"""
self.n_states = n_states
self.n_observations = n_observations
# Initialize parameters randomly
self.transition_matrix = np.random.dirichlet(np.ones(n_states), size=n_states)
self.emission_matrix = np.random.dirichlet(np.ones(n_observations), size=n_states)
self.initial_probs = np.ones(n_states) / n_states
def set_parameters(
self,
transition_matrix: np.ndarray,
emission_matrix: np.ndarray,
initial_probs: np.ndarray
) -> None:
"""
Set HMM parameters.
Parameters
----------
transition_matrix : np.ndarray, shape (n_states, n_states)
State transition probabilities
emission_matrix : np.ndarray, shape (n_states, n_observations)
Observation emission probabilities
initial_probs : np.ndarray, shape (n_states,)
Initial state probabilities
"""
self.transition_matrix = transition_matrix
self.emission_matrix = emission_matrix
self.initial_probs = initial_probs
def forward(self, observations: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Forward algorithm for computing state probabilities.
Parameters
----------
observations : np.ndarray
Sequence of observations
Returns
-------
tuple
(forward_probabilities, log_likelihood)
"""
T = len(observations)
alpha = np.zeros((T, self.n_states))
# Initialize
alpha[0] = self.initial_probs * self.emission_matrix[:, observations[0]]
alpha[0] /= alpha[0].sum()
# Forward pass
for t in range(1, T):
for j in range(self.n_states):
alpha[t, j] = np.sum(alpha[t-1] * self.transition_matrix[:, j]) * \
self.emission_matrix[j, observations[t]]
alpha[t] /= alpha[t].sum() # Normalize to prevent underflow
log_likelihood = np.sum(np.log(alpha.sum(axis=1)))
return alpha, log_likelihood
def viterbi(self, observations: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Viterbi algorithm for most likely state sequence.
Parameters
----------
observations : np.ndarray
Sequence of observations
Returns
-------
tuple
(most_likely_states, log_probability)
"""
T = len(observations)
delta = np.zeros((T, self.n_states))
psi = np.zeros((T, self.n_states), dtype=int)
# Initialize
delta[0] = np.log(self.initial_probs) + \
np.log(self.emission_matrix[:, observations[0]] + 1e-10)
# Forward pass
for t in range(1, T):
for j in range(self.n_states):
temp = delta[t-1] + np.log(self.transition_matrix[:, j] + 1e-10)
psi[t, j] = np.argmax(temp)
delta[t, j] = np.max(temp) + \
np.log(self.emission_matrix[j, observations[t]] + 1e-10)
# Backtrack
states = np.zeros(T, dtype=int)
states[-1] = np.argmax(delta[-1])
for t in range(T-2, -1, -1):
states[t] = psi[t+1, states[t+1]]
log_prob = np.max(delta[-1])
return states, log_prob
def detect_regime_change(
self,
observations: np.ndarray,
threshold: float = 0.7
) -> List[int]:
"""
Detect regime changes in observation sequence.
Parameters
----------
observations : np.ndarray
Observations
threshold : float
Confidence threshold for regime change
Returns
-------
list
Indices where regime changes occurred
"""
states, _ = self.viterbi(observations)
changes = []
for t in range(1, len(states)):
if states[t] != states[t-1]:
changes.append(t)
return changes
|