tiny_factory / experimentation /statistical_tests.py
root
Import from HF Space harvesthealth/tiny_factory
6a42990
import numpy as np
import scipy.stats as stats
from typing import Dict, List, Union, Callable, Any, Optional
from tinytroupe.experimentation import logger
class StatisticalTester:
"""
A class to perform statistical tests on experiment results. To do so, a control is defined, and then one or
more treatments are compared to the control. The class supports various statistical tests, including t-tests,
Mann-Whitney U tests, and ANOVA. The user can specify the type of test to run, the significance level, and
the specific metrics to analyze. The results of the tests are returned in a structured format.
"""
def __init__(self, control_experiment_data: Dict[str, list],
treatments_experiment_data: Dict[str, Dict[str, list]],
results_key:str = None):
"""
Initialize with experiment results.
Args:
control_experiment_data (dict): Dictionary containing control experiment results with keys
as metric names and values as lists of values.
e.g.,{"control_exp": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4], ...}}
treatments_experiment_data (dict): Dictionary containing experiment results with keys
as experiment IDs and values as dicts of metric names to lists of values.
e.g., {"exp1": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]},
"exp2": {"metric1": [0.5, 0.6], "metric2": [0.7, 0.8]}, ...}
"""
# if results_key is provided, use it to extract the relevant data from the control and treatment data
# e.g., {"exp1": {"results": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}}
if results_key:
control_experiment_data = {k: v[results_key] for k, v in control_experiment_data.items()}
treatments_experiment_data = {k: v[results_key] for k, v in treatments_experiment_data.items()}
self.control_experiment_data = control_experiment_data
self.treatments_experiment_data = treatments_experiment_data
# Validate input data
self._validate_input_data()
def _validate_input_data(self):
"""Validate the input data formats and structure."""
# Check that control and treatments are dictionaries
if not isinstance(self.control_experiment_data, dict):
raise TypeError("Control experiment data must be a dictionary")
if not isinstance(self.treatments_experiment_data, dict):
raise TypeError("Treatments experiment data must be a dictionary")
# Check that control has at least one experiment
if not self.control_experiment_data:
raise ValueError("Control experiment data cannot be empty")
# Check only one control
if len(self.control_experiment_data) > 1:
raise ValueError("Only one control experiment is allowed")
# Validate control experiment structure
for control_id, control_metrics in self.control_experiment_data.items():
if not isinstance(control_metrics, dict):
raise TypeError(f"Metrics for control experiment '{control_id}' must be a dictionary")
# Check that the metrics dictionary is not empty
if not control_metrics:
raise ValueError(f"Control experiment '{control_id}' has no metrics")
# Validate that metric values are lists
for metric, values in control_metrics.items():
if not isinstance(values, list):
raise TypeError(f"Values for metric '{metric}' in control experiment '{control_id}' must be a list")
# Check treatments have at least one experiment
if not self.treatments_experiment_data:
raise ValueError("Treatments experiment data cannot be empty")
# Validate treatment experiment structure
for treatment_id, treatment_data in self.treatments_experiment_data.items():
if not isinstance(treatment_data, dict):
raise TypeError(f"Data for treatment '{treatment_id}' must be a dictionary")
# Check that the metrics dictionary is not empty
if not treatment_data:
raise ValueError(f"Treatment '{treatment_id}' has no metrics")
# Get all control metrics for overlap checking
all_control_metrics = set()
for control_metrics in self.control_experiment_data.values():
all_control_metrics.update(control_metrics.keys())
# Check if there's any overlap between control and treatment metrics
common_metrics = all_control_metrics.intersection(set(treatment_data.keys()))
if not common_metrics:
logger.warning(f"Treatment '{treatment_id}' has no metrics in common with any control experiment")
# Check that treatment metrics are lists
for metric, values in treatment_data.items():
if not isinstance(values, list):
raise TypeError(f"Values for metric '{metric}' in treatment '{treatment_id}' must be a list")
def run_test(self,
test_type: str="welch_t_test",
alpha: float = 0.05,
**kwargs) -> Dict[str, Dict[str, Any]]:
"""
Run the specified statistical test on the control and treatments data.
Args:
test_type (str): Type of statistical test to run.
Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square', 'ks_test'
alpha (float): Significance level, defaults to 0.05
**kwargs: Additional arguments for specific test types.
Returns:
dict: Dictionary containing the results of the statistical tests for each treatment (vs the one control).
Each key is the treatment ID and each value is a dictionary with test results.
"""
supported_tests = {
't_test': self._run_t_test,
'welch_t_test': self._run_welch_t_test,
'mann_whitney': self._run_mann_whitney,
'anova': self._run_anova,
'chi_square': self._run_chi_square,
'ks_test': self._run_ks_test
}
if test_type not in supported_tests:
raise ValueError(f"Unsupported test type: {test_type}. Supported types: {list(supported_tests.keys())}")
results = {}
for control_id, control_data in self.control_experiment_data.items():
# get all metrics from control data
metrics = set()
metrics.update(control_data.keys())
for treatment_id, treatment_data in self.treatments_experiment_data.items():
results[treatment_id] = {}
for metric in metrics:
# Skip metrics not in treatment data
if metric not in treatment_data:
logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
continue
control_values = control_data[metric]
treatment_values = treatment_data[metric]
# Skip if either control or treatment has no values
if len(control_values) == 0 or len(treatment_values) == 0:
logger.warning(f"Skipping metric '{metric}' for treatment '{treatment_id}' due to empty values")
continue
# Run the selected test and convert to JSON serializable types
test_result = supported_tests[test_type](control_values, treatment_values, alpha, **kwargs)
results[treatment_id][metric] = convert_to_serializable(test_result)
return results
def _run_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""Run Student's t-test (equal variance assumed)."""
# Convert to numpy arrays for calculations
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Calculate basic statistics
control_mean = np.mean(control)
treatment_mean = np.mean(treatment)
mean_diff = treatment_mean - control_mean
# Run the t-test
t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=True)
# Calculate confidence interval
control_std = np.std(control, ddof=1)
treatment_std = np.std(treatment, ddof=1)
pooled_std = np.sqrt(((len(control) - 1) * control_std**2 +
(len(treatment) - 1) * treatment_std**2) /
(len(control) + len(treatment) - 2))
se = pooled_std * np.sqrt(1/len(control) + 1/len(treatment))
critical_value = stats.t.ppf(1 - alpha/2, len(control) + len(treatment) - 2)
margin_error = critical_value * se
ci_lower = mean_diff - margin_error
ci_upper = mean_diff + margin_error
# Determine if the result is significant
significant = p_value < alpha
return {
'test_type': 'Student t-test (equal variance)',
'control_mean': control_mean,
'treatment_mean': treatment_mean,
'mean_difference': mean_diff,
'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
't_statistic': t_stat,
'p_value': p_value,
'confidence_interval': (ci_lower, ci_upper),
'confidence_level': 1 - alpha,
'significant': significant,
'control_sample_size': len(control),
'treatment_sample_size': len(treatment),
'control_std': control_std,
'treatment_std': treatment_std,
'effect_size': cohen_d(control, treatment)
}
def _run_welch_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""Run Welch's t-test (unequal variance)."""
# Convert to numpy arrays for calculations
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Calculate basic statistics
control_mean = np.mean(control)
treatment_mean = np.mean(treatment)
mean_diff = treatment_mean - control_mean
# Run Welch's t-test
t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=False)
# Calculate confidence interval (for Welch's t-test)
control_var = np.var(control, ddof=1)
treatment_var = np.var(treatment, ddof=1)
# Calculate effective degrees of freedom (Welch-Satterthwaite equation)
v_num = (control_var/len(control) + treatment_var/len(treatment))**2
v_denom = (control_var/len(control))**2/(len(control)-1) + (treatment_var/len(treatment))**2/(len(treatment)-1)
df = v_num / v_denom if v_denom > 0 else float('inf')
se = np.sqrt(control_var/len(control) + treatment_var/len(treatment))
critical_value = stats.t.ppf(1 - alpha/2, df)
margin_error = critical_value * se
ci_lower = mean_diff - margin_error
ci_upper = mean_diff + margin_error
control_std = np.std(control, ddof=1)
treatment_std = np.std(treatment, ddof=1)
# Determine if the result is significant
significant = p_value < alpha
return {
'test_type': 'Welch t-test (unequal variance)',
'control_mean': control_mean,
'treatment_mean': treatment_mean,
'mean_difference': mean_diff,
'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
't_statistic': t_stat,
'p_value': p_value,
'confidence_interval': (ci_lower, ci_upper),
'confidence_level': 1 - alpha,
'significant': significant,
'degrees_of_freedom': df,
'control_sample_size': len(control),
'treatment_sample_size': len(treatment),
'control_std': control_std,
'treatment_std': treatment_std,
'effect_size': cohen_d(control, treatment)
}
def _run_mann_whitney(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""Run Mann-Whitney U test (non-parametric test)."""
# Convert to numpy arrays
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Calculate basic statistics
control_median = np.median(control)
treatment_median = np.median(treatment)
median_diff = treatment_median - control_median
# Run the Mann-Whitney U test
u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided')
# Calculate common language effect size
# (probability that a randomly selected value from treatment is greater than control)
count = 0
for tc in treatment:
for cc in control:
if tc > cc:
count += 1
cles = count / (len(treatment) * len(control))
# Calculate approximate confidence interval using bootstrap
try:
from scipy.stats import bootstrap
def median_diff_func(x, y):
return np.median(x) - np.median(y)
res = bootstrap((control, treatment), median_diff_func,
confidence_level=1-alpha,
n_resamples=1000,
random_state=42)
ci_lower, ci_upper = res.confidence_interval
except ImportError:
# If bootstrap is not available, return None for confidence interval
ci_lower, ci_upper = None, None
logger.warning("SciPy bootstrap not available, skipping confidence interval calculation")
# Determine if the result is significant
significant = p_value < alpha
return {
'test_type': 'Mann-Whitney U test',
'control_median': control_median,
'treatment_median': treatment_median,
'median_difference': median_diff,
'percent_change': (median_diff / control_median * 100) if control_median != 0 else float('inf'),
'u_statistic': u_stat,
'p_value': p_value,
'confidence_interval': (ci_lower, ci_upper) if ci_lower is not None else None,
'confidence_level': 1 - alpha,
'significant': significant,
'control_sample_size': len(control),
'treatment_sample_size': len(treatment),
'effect_size': cles
}
def _run_anova(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""Run one-way ANOVA test."""
# For ANOVA, we typically need multiple groups, but we can still run it with just two
# Convert to numpy arrays
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Run one-way ANOVA
f_stat, p_value = stats.f_oneway(control, treatment)
# Calculate effect size (eta-squared)
total_values = np.concatenate([control, treatment])
grand_mean = np.mean(total_values)
ss_total = np.sum((total_values - grand_mean) ** 2)
ss_between = (len(control) * (np.mean(control) - grand_mean) ** 2 +
len(treatment) * (np.mean(treatment) - grand_mean) ** 2)
eta_squared = ss_between / ss_total if ss_total > 0 else 0
# Determine if the result is significant
significant = p_value < alpha
return {
'test_type': 'One-way ANOVA',
'f_statistic': f_stat,
'p_value': p_value,
'significant': significant,
'control_sample_size': len(control),
'treatment_sample_size': len(treatment),
'effect_size': eta_squared,
'effect_size_type': 'eta_squared'
}
def _run_chi_square(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""Run Chi-square test for categorical data."""
# For chi-square, we assume the values represent counts in different categories
# Convert to numpy arrays
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Check if the arrays are the same length (same number of categories)
if len(control) != len(treatment):
raise ValueError("Control and treatment must have the same number of categories for chi-square test")
# Run chi-square test
contingency_table = np.vstack([control, treatment])
chi2_stat, p_value, dof, expected = stats.chi2_contingency(contingency_table)
# Calculate Cramer's V as effect size
n = np.sum(contingency_table)
min_dim = min(contingency_table.shape) - 1
cramers_v = np.sqrt(chi2_stat / (n * min_dim)) if n * min_dim > 0 else 0
# Determine if the result is significant
significant = p_value < alpha
return {
'test_type': 'Chi-square test',
'chi2_statistic': chi2_stat,
'p_value': p_value,
'degrees_of_freedom': dof,
'significant': significant,
'effect_size': cramers_v,
'effect_size_type': 'cramers_v'
}
def check_assumptions(self, metric: str) -> Dict[str, Dict[str, Any]]:
"""
Check statistical assumptions for the given metric across all treatments.
Args:
metric (str): The metric to check assumptions for.
Returns:
dict: Dictionary with results of assumption checks for each treatment.
"""
if metric not in self.control_experiment_data:
raise ValueError(f"Metric '{metric}' not found in control data")
results = {}
control_values = np.array(self.control_experiment_data[metric], dtype=float)
# Check normality of control
control_shapiro = stats.shapiro(control_values)
control_normality = {
'test': 'Shapiro-Wilk',
'statistic': control_shapiro[0],
'p_value': control_shapiro[1],
'normal': control_shapiro[1] >= 0.05
}
for treatment_id, treatment_data in self.treatments_experiment_data.items():
if metric not in treatment_data:
logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
continue
treatment_values = np.array(treatment_data[metric], dtype=float)
# Check normality of treatment
treatment_shapiro = stats.shapiro(treatment_values)
treatment_normality = {
'test': 'Shapiro-Wilk',
'statistic': treatment_shapiro[0],
'p_value': treatment_shapiro[1],
'normal': treatment_shapiro[1] >= 0.05
}
# Check homogeneity of variance
levene_test = stats.levene(control_values, treatment_values)
variance_homogeneity = {
'test': 'Levene',
'statistic': levene_test[0],
'p_value': levene_test[1],
'equal_variance': levene_test[1] >= 0.05
}
# Store results and convert to JSON serializable types
results[treatment_id] = convert_to_serializable({
'control_normality': control_normality,
'treatment_normality': treatment_normality,
'variance_homogeneity': variance_homogeneity,
'recommended_test': self._recommend_test(control_normality['normal'],
treatment_normality['normal'],
variance_homogeneity['equal_variance'])
})
return results
def _recommend_test(self, control_normal: bool, treatment_normal: bool, equal_variance: bool) -> str:
"""Recommend a statistical test based on assumption checks."""
if control_normal and treatment_normal:
if equal_variance:
return 't_test'
else:
return 'welch_t_test'
else:
return 'mann_whitney'
def _run_ks_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
"""
Run Kolmogorov-Smirnov test to compare distributions.
This test compares the empirical cumulative distribution functions (ECDFs) of two samples
to determine if they come from the same distribution. It's particularly useful for:
- Categorical responses (e.g., "Yes"/"No"/"Maybe") when converted to ordinal values
- Continuous data where you want to compare entire distributions, not just means
- Detecting differences in distribution shape, spread, or location
"""
# Convert to numpy arrays
control = np.array(control_values, dtype=float)
treatment = np.array(treatment_values, dtype=float)
# Calculate basic statistics
control_median = np.median(control)
treatment_median = np.median(treatment)
control_mean = np.mean(control)
treatment_mean = np.mean(treatment)
# Run the Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(control, treatment)
# Calculate distribution characteristics
control_std = np.std(control, ddof=1)
treatment_std = np.std(treatment, ddof=1)
# Calculate effect size using the KS statistic itself as a measure
# KS statistic ranges from 0 (identical distributions) to 1 (completely different)
effect_size = ks_stat
# Additional distribution comparison metrics
# Calculate overlap coefficient (area under the minimum of two PDFs)
try:
# Create histograms for overlap calculation
combined_range = np.linspace(
min(np.min(control), np.min(treatment)),
max(np.max(control), np.max(treatment)),
50
)
control_hist, _ = np.histogram(control, bins=combined_range, density=True)
treatment_hist, _ = np.histogram(treatment, bins=combined_range, density=True)
# Calculate overlap (intersection over union-like metric)
overlap = np.sum(np.minimum(control_hist, treatment_hist)) / np.sum(np.maximum(control_hist, treatment_hist))
overlap = overlap if not np.isnan(overlap) else 0.0
except:
overlap = None
# Calculate percentile differences for additional insights
percentiles = [25, 50, 75, 90, 95]
percentile_diffs = {}
for p in percentiles:
control_p = np.percentile(control, p)
treatment_p = np.percentile(treatment, p)
percentile_diffs[f"p{p}_diff"] = treatment_p - control_p
# Determine significance
significant = p_value < alpha
return {
'test_type': 'Kolmogorov-Smirnov test',
'control_mean': control_mean,
'treatment_mean': treatment_mean,
'control_median': control_median,
'treatment_median': treatment_median,
'control_std': control_std,
'treatment_std': treatment_std,
'ks_statistic': ks_stat,
'p_value': p_value,
'significant': significant,
'control_sample_size': len(control),
'treatment_sample_size': len(treatment),
'effect_size': effect_size,
'overlap_coefficient': overlap,
'percentile_differences': percentile_diffs,
'interpretation': self._interpret_ks_result(ks_stat, significant),
'confidence_level': 1 - alpha
}
def _interpret_ks_result(self, ks_stat: float, significant: bool) -> str:
"""Provide interpretation of KS test results."""
if not significant:
return "No significant difference between distributions"
if ks_stat < 0.1:
return "Very small difference between distributions"
elif ks_stat < 0.25:
return "Small difference between distributions"
elif ks_stat < 0.5:
return "Moderate difference between distributions"
else:
return "Large difference between distributions"
def cohen_d(x: Union[list, np.ndarray], y: Union[list, np.ndarray]) -> float:
"""
Calculate Cohen's d effect size for two samples.
Args:
x: First sample
y: Second sample
Returns:
float: Cohen's d effect size
"""
nx = len(x)
ny = len(y)
# Convert to numpy arrays
x = np.array(x, dtype=float)
y = np.array(y, dtype=float)
# Calculate means
mx = np.mean(x)
my = np.mean(y)
# Calculate standard deviations
sx = np.std(x, ddof=1)
sy = np.std(y, ddof=1)
# Pooled standard deviation
pooled_sd = np.sqrt(((nx - 1) * sx**2 + (ny - 1) * sy**2) / (nx + ny - 2))
# Cohen's d
return (my - mx) / pooled_sd if pooled_sd > 0 else 0
def convert_to_serializable(obj):
"""
Convert NumPy types to native Python types recursively to ensure JSON serialization works.
Args:
obj: Any object that might contain NumPy types
Returns:
Object with NumPy types converted to Python native types
"""
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, (np.number, np.bool_)):
return obj.item()
elif isinstance(obj, dict):
return {k: convert_to_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_to_serializable(i) for i in obj]
elif isinstance(obj, tuple):
return tuple(convert_to_serializable(i) for i in obj)
else:
return obj