|
|
""" |
|
|
Hidden Markov Model for regime detection and state estimation. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Optional, Tuple, List |
|
|
|
|
|
|
|
|
class HiddenMarkovModel: |
|
|
""" |
|
|
Hidden Markov Model for detecting regime changes. |
|
|
|
|
|
Useful for identifying transitions between: |
|
|
- Peace <-> Conflict |
|
|
- Stable <-> Unstable |
|
|
- Cooperative <-> Hostile |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
n_states: int, |
|
|
n_observations: int |
|
|
): |
|
|
""" |
|
|
Initialize HMM. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
n_states : int |
|
|
Number of hidden states |
|
|
n_observations : int |
|
|
Number of possible observations |
|
|
""" |
|
|
self.n_states = n_states |
|
|
self.n_observations = n_observations |
|
|
|
|
|
|
|
|
self.transition_matrix = np.random.dirichlet(np.ones(n_states), size=n_states) |
|
|
self.emission_matrix = np.random.dirichlet(np.ones(n_observations), size=n_states) |
|
|
self.initial_probs = np.ones(n_states) / n_states |
|
|
|
|
|
def set_parameters( |
|
|
self, |
|
|
transition_matrix: np.ndarray, |
|
|
emission_matrix: np.ndarray, |
|
|
initial_probs: np.ndarray |
|
|
) -> None: |
|
|
""" |
|
|
Set HMM parameters. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
transition_matrix : np.ndarray, shape (n_states, n_states) |
|
|
State transition probabilities |
|
|
emission_matrix : np.ndarray, shape (n_states, n_observations) |
|
|
Observation emission probabilities |
|
|
initial_probs : np.ndarray, shape (n_states,) |
|
|
Initial state probabilities |
|
|
""" |
|
|
self.transition_matrix = transition_matrix |
|
|
self.emission_matrix = emission_matrix |
|
|
self.initial_probs = initial_probs |
|
|
|
|
|
def forward(self, observations: np.ndarray) -> Tuple[np.ndarray, float]: |
|
|
""" |
|
|
Forward algorithm for computing state probabilities. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
observations : np.ndarray |
|
|
Sequence of observations |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
(forward_probabilities, log_likelihood) |
|
|
""" |
|
|
T = len(observations) |
|
|
alpha = np.zeros((T, self.n_states)) |
|
|
|
|
|
|
|
|
alpha[0] = self.initial_probs * self.emission_matrix[:, observations[0]] |
|
|
alpha[0] /= alpha[0].sum() |
|
|
|
|
|
|
|
|
for t in range(1, T): |
|
|
for j in range(self.n_states): |
|
|
alpha[t, j] = np.sum(alpha[t-1] * self.transition_matrix[:, j]) * \ |
|
|
self.emission_matrix[j, observations[t]] |
|
|
alpha[t] /= alpha[t].sum() |
|
|
|
|
|
log_likelihood = np.sum(np.log(alpha.sum(axis=1))) |
|
|
|
|
|
return alpha, log_likelihood |
|
|
|
|
|
def viterbi(self, observations: np.ndarray) -> Tuple[np.ndarray, float]: |
|
|
""" |
|
|
Viterbi algorithm for most likely state sequence. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
observations : np.ndarray |
|
|
Sequence of observations |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
(most_likely_states, log_probability) |
|
|
""" |
|
|
T = len(observations) |
|
|
delta = np.zeros((T, self.n_states)) |
|
|
psi = np.zeros((T, self.n_states), dtype=int) |
|
|
|
|
|
|
|
|
delta[0] = np.log(self.initial_probs) + \ |
|
|
np.log(self.emission_matrix[:, observations[0]] + 1e-10) |
|
|
|
|
|
|
|
|
for t in range(1, T): |
|
|
for j in range(self.n_states): |
|
|
temp = delta[t-1] + np.log(self.transition_matrix[:, j] + 1e-10) |
|
|
psi[t, j] = np.argmax(temp) |
|
|
delta[t, j] = np.max(temp) + \ |
|
|
np.log(self.emission_matrix[j, observations[t]] + 1e-10) |
|
|
|
|
|
|
|
|
states = np.zeros(T, dtype=int) |
|
|
states[-1] = np.argmax(delta[-1]) |
|
|
|
|
|
for t in range(T-2, -1, -1): |
|
|
states[t] = psi[t+1, states[t+1]] |
|
|
|
|
|
log_prob = np.max(delta[-1]) |
|
|
|
|
|
return states, log_prob |
|
|
|
|
|
def detect_regime_change( |
|
|
self, |
|
|
observations: np.ndarray, |
|
|
threshold: float = 0.7 |
|
|
) -> List[int]: |
|
|
""" |
|
|
Detect regime changes in observation sequence. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
observations : np.ndarray |
|
|
Observations |
|
|
threshold : float |
|
|
Confidence threshold for regime change |
|
|
|
|
|
Returns |
|
|
------- |
|
|
list |
|
|
Indices where regime changes occurred |
|
|
""" |
|
|
states, _ = self.viterbi(observations) |
|
|
|
|
|
changes = [] |
|
|
for t in range(1, len(states)): |
|
|
if states[t] != states[t-1]: |
|
|
changes.append(t) |
|
|
|
|
|
return changes |
|
|
|