File size: 10,454 Bytes
484e3bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""
Optimal Transport Module - Wasserstein Distances

Provides geometric measures of how much "effort" is needed to move from
one geopolitical scenario to another using optimal transport theory.

Applications:
- Measure regime shifts
- Compare distributions of Monte Carlo futures
- Quantify shock impact
- Measure closeness of geopolitical scenarios
- Detect structural change
- Logistics modeling
"""

import numpy as np
from typing import Union, Tuple, Optional, List
from scipy.spatial.distance import cdist
from scipy.stats import wasserstein_distance

try:
    import ot  # Python Optimal Transport library
    HAS_POT = True
except ImportError:
    HAS_POT = False
    print("Warning: POT library not available. Some features will be limited.")


class WassersteinDistance:
    """
    Compute Wasserstein distances between probability distributions.

    The Wasserstein distance (also known as Earth Mover's Distance) provides
    a principled way to measure the distance between probability distributions,
    accounting for the geometry of the underlying space.
    """

    def __init__(self, metric: str = 'euclidean', p: int = 2):
        """
        Initialize Wasserstein distance calculator.

        Parameters
        ----------
        metric : str
            Distance metric to use for ground distance ('euclidean', 'cityblock', etc.)
        p : int
            Order of Wasserstein distance (1 or 2)
        """
        self.metric = metric
        self.p = p

    def compute_1d(
        self,
        u_values: np.ndarray,
        v_values: np.ndarray,
        u_weights: Optional[np.ndarray] = None,
        v_weights: Optional[np.ndarray] = None
    ) -> float:
        """
        Compute 1D Wasserstein distance between two distributions.

        Parameters
        ----------
        u_values : np.ndarray
            Values for first distribution
        v_values : np.ndarray
            Values for second distribution
        u_weights : np.ndarray, optional
            Weights for first distribution (defaults to uniform)
        v_weights : np.ndarray, optional
            Weights for second distribution (defaults to uniform)

        Returns
        -------
        float
            Wasserstein distance
        """
        return wasserstein_distance(u_values, v_values, u_weights, v_weights)

    def compute_nd(
        self,
        X_source: np.ndarray,
        X_target: np.ndarray,
        a: Optional[np.ndarray] = None,
        b: Optional[np.ndarray] = None,
        method: str = 'sinkhorn'
    ) -> float:
        """
        Compute n-dimensional Wasserstein distance.

        Parameters
        ----------
        X_source : np.ndarray, shape (n_samples_source, n_features)
            Source distribution samples
        X_target : np.ndarray, shape (n_samples_target, n_features)
            Target distribution samples
        a : np.ndarray, optional
            Weights for source distribution (defaults to uniform)
        b : np.ndarray, optional
            Weights for target distribution (defaults to uniform)
        method : str
            Method to use ('sinkhorn', 'emd', 'emd2')

        Returns
        -------
        float
            Wasserstein distance
        """
        if not HAS_POT:
            raise ImportError("POT library required for n-dimensional distances")

        n_source = X_source.shape[0]
        n_target = X_target.shape[0]

        # Default to uniform distributions
        if a is None:
            a = np.ones(n_source) / n_source
        if b is None:
            b = np.ones(n_target) / n_target

        # Compute cost matrix
        M = cdist(X_source, X_target, metric=self.metric)

        # Compute optimal transport
        if method == 'sinkhorn':
            # Sinkhorn algorithm (faster, approximate)
            distance = ot.sinkhorn2(a, b, M, reg=0.1)
        elif method == 'emd':
            # Exact EMD
            distance = ot.emd2(a, b, M)
        elif method == 'emd2':
            # Squared EMD
            distance = ot.emd2(a, b, M**2)
        else:
            raise ValueError(f"Unknown method: {method}")

        return float(distance)

    def compute_barycenter(
        self,
        distributions: List[np.ndarray],
        weights: Optional[np.ndarray] = None,
        method: str = 'sinkhorn'
    ) -> np.ndarray:
        """
        Compute Wasserstein barycenter of multiple distributions.

        This finds the "average" distribution in Wasserstein space.

        Parameters
        ----------
        distributions : list of np.ndarray
            List of distributions to average
        weights : np.ndarray, optional
            Weights for each distribution
        method : str
            Method to use ('sinkhorn')

        Returns
        -------
        np.ndarray
            Wasserstein barycenter
        """
        if not HAS_POT:
            raise ImportError("POT library required for barycenter computation")

        n_distributions = len(distributions)

        if weights is None:
            weights = np.ones(n_distributions) / n_distributions

        # Stack distributions
        A = np.column_stack(distributions)

        # Compute barycenter
        if method == 'sinkhorn':
            barycenter = ot.bregman.barycenter(A, M=None, reg=0.1, weights=weights)
        else:
            raise ValueError(f"Unknown method: {method}")

        return barycenter


class ScenarioComparator:
    """
    Compare geopolitical scenarios using optimal transport.

    This class provides high-level methods for comparing scenarios,
    detecting regime shifts, and quantifying shock impacts.
    """

    def __init__(self, metric: str = 'euclidean'):
        """
        Initialize scenario comparator.

        Parameters
        ----------
        metric : str
            Distance metric for ground distance
        """
        self.wasserstein = WassersteinDistance(metric=metric)

    def compare_scenarios(
        self,
        scenario1: np.ndarray,
        scenario2: np.ndarray,
        weights1: Optional[np.ndarray] = None,
        weights2: Optional[np.ndarray] = None
    ) -> float:
        """
        Compare two geopolitical scenarios.

        Parameters
        ----------
        scenario1 : np.ndarray
            First scenario (features x samples)
        scenario2 : np.ndarray
            Second scenario (features x samples)
        weights1 : np.ndarray, optional
            Weights for first scenario
        weights2 : np.ndarray, optional
            Weights for second scenario

        Returns
        -------
        float
            Distance between scenarios
        """
        return self.wasserstein.compute_nd(scenario1, scenario2, weights1, weights2)

    def detect_regime_shift(
        self,
        baseline: np.ndarray,
        current: np.ndarray,
        threshold: float = 0.1
    ) -> Tuple[bool, float]:
        """
        Detect if a regime shift has occurred.

        Parameters
        ----------
        baseline : np.ndarray
            Baseline scenario distribution
        current : np.ndarray
            Current scenario distribution
        threshold : float
            Threshold for detecting shift

        Returns
        -------
        tuple
            (shift_detected, distance)
        """
        distance = self.compare_scenarios(baseline, current)
        shift_detected = distance > threshold

        return shift_detected, distance

    def quantify_shock_impact(
        self,
        pre_shock: np.ndarray,
        post_shock: np.ndarray
    ) -> dict:
        """
        Quantify the impact of a shock event.

        Parameters
        ----------
        pre_shock : np.ndarray
            Pre-shock scenario distribution
        post_shock : np.ndarray
            Post-shock scenario distribution

        Returns
        -------
        dict
            Dictionary with impact metrics
        """
        distance = self.compare_scenarios(pre_shock, post_shock)

        # Compute additional metrics
        mean_shift = np.linalg.norm(np.mean(post_shock, axis=0) - np.mean(pre_shock, axis=0))
        variance_change = np.abs(np.var(post_shock) - np.var(pre_shock))

        return {
            'wasserstein_distance': distance,
            'mean_shift': mean_shift,
            'variance_change': variance_change,
            'impact_magnitude': distance * mean_shift
        }

    def compute_scenario_trajectory(
        self,
        scenarios: List[np.ndarray]
    ) -> np.ndarray:
        """
        Compute trajectory of scenarios over time.

        Parameters
        ----------
        scenarios : list of np.ndarray
            Time series of scenarios

        Returns
        -------
        np.ndarray
            Array of distances between consecutive scenarios
        """
        n_scenarios = len(scenarios)
        distances = np.zeros(n_scenarios - 1)

        for i in range(n_scenarios - 1):
            distances[i] = self.compare_scenarios(scenarios[i], scenarios[i + 1])

        return distances

    def logistics_optimal_transport(
        self,
        supply: np.ndarray,
        demand: np.ndarray,
        supply_locations: np.ndarray,
        demand_locations: np.ndarray
    ) -> Tuple[np.ndarray, float]:
        """
        Solve logistics problem using optimal transport.

        Parameters
        ----------
        supply : np.ndarray
            Supply amounts at each location
        demand : np.ndarray
            Demand amounts at each location
        supply_locations : np.ndarray
            Coordinates of supply locations
        demand_locations : np.ndarray
            Coordinates of demand locations

        Returns
        -------
        tuple
            (transport_plan, total_cost)
        """
        if not HAS_POT:
            raise ImportError("POT library required for logistics optimization")

        # Normalize supply and demand
        supply_norm = supply / supply.sum()
        demand_norm = demand / demand.sum()

        # Compute cost matrix (distances)
        M = cdist(supply_locations, demand_locations, metric=self.wasserstein.metric)

        # Compute optimal transport plan
        transport_plan = ot.emd(supply_norm, demand_norm, M)
        total_cost = np.sum(transport_plan * M)

        # Scale back to original quantities
        transport_plan *= supply.sum()

        return transport_plan, total_cost