Spaces:
Sleeping
Sleeping
updated sound lib
Browse files- app.py +3 -11
- data_processor.py +44 -45
- sound_library.py +165 -154
app.py
CHANGED
|
@@ -585,26 +585,18 @@ def continue_automatic_composition():
|
|
| 585 |
print(f"DEBUG continue: Completed movements: {completed_movements}")
|
| 586 |
|
| 587 |
if len(completed_movements) > 0:
|
| 588 |
-
print
|
| 589 |
-
|
|
|
|
| 590 |
# Display each completed movement sound in its respective player
|
| 591 |
if 'left_hand' in completed_movements and 'left_hand' in sounds:
|
| 592 |
left_hand_audio = sounds['left_hand']
|
| 593 |
-
print(f"DEBUG continue: Left hand playing: {sounds['left_hand']}")
|
| 594 |
if 'right_hand' in completed_movements and 'right_hand' in sounds:
|
| 595 |
right_hand_audio = sounds['right_hand']
|
| 596 |
-
print(f"DEBUG continue: Right hand playing: {sounds['right_hand']}")
|
| 597 |
if 'left_leg' in completed_movements and 'left_leg' in sounds:
|
| 598 |
left_leg_audio = sounds['left_leg']
|
| 599 |
-
print(f"DEBUG continue: Left leg playing: {sounds['left_leg']}")
|
| 600 |
if 'right_leg' in completed_movements and 'right_leg' in sounds:
|
| 601 |
right_leg_audio = sounds['right_leg']
|
| 602 |
-
print(f"DEBUG continue: Right leg playing: {sounds['right_leg']}")
|
| 603 |
-
# if 'tongue' in completed_movements and 'tongue' in sounds:
|
| 604 |
-
# tongue_audio = sounds['tongue']
|
| 605 |
-
# print(f"DEBUG continue: Tongue playing: {sounds['tongue']}")
|
| 606 |
-
|
| 607 |
-
print(f"DEBUG continue: {len(completed_movements)} individual sounds will play together creating layered composition")
|
| 608 |
|
| 609 |
# Format display with progress information
|
| 610 |
completed_count = len(sound_manager.movements_completed)
|
|
|
|
| 585 |
print(f"DEBUG continue: Completed movements: {completed_movements}")
|
| 586 |
|
| 587 |
if len(completed_movements) > 0:
|
| 588 |
+
# Track and print only the sounds that have been added
|
| 589 |
+
sounds_added = [sounds[m] for m in completed_movements if m in sounds]
|
| 590 |
+
print(f"DEBUG: Sounds added to composition: {sounds_added}")
|
| 591 |
# Display each completed movement sound in its respective player
|
| 592 |
if 'left_hand' in completed_movements and 'left_hand' in sounds:
|
| 593 |
left_hand_audio = sounds['left_hand']
|
|
|
|
| 594 |
if 'right_hand' in completed_movements and 'right_hand' in sounds:
|
| 595 |
right_hand_audio = sounds['right_hand']
|
|
|
|
| 596 |
if 'left_leg' in completed_movements and 'left_leg' in sounds:
|
| 597 |
left_leg_audio = sounds['left_leg']
|
|
|
|
| 598 |
if 'right_leg' in completed_movements and 'right_leg' in sounds:
|
| 599 |
right_leg_audio = sounds['right_leg']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 600 |
|
| 601 |
# Format display with progress information
|
| 602 |
completed_count = len(sound_manager.movements_completed)
|
data_processor.py
CHANGED
|
@@ -146,9 +146,9 @@ class EEGDataProcessor:
|
|
| 146 |
|
| 147 |
return combined_raw, fs
|
| 148 |
|
| 149 |
-
def prepare_loso_split(self, file_paths: List[str],
|
| 150 |
"""
|
| 151 |
-
Prepare Leave-One-
|
| 152 |
|
| 153 |
Args:
|
| 154 |
file_paths: List of .mat file paths (one per subject)
|
|
@@ -157,8 +157,8 @@ class EEGDataProcessor:
|
|
| 157 |
Returns:
|
| 158 |
X_train, y_train, X_test, y_test, subject_info
|
| 159 |
"""
|
| 160 |
-
|
| 161 |
-
|
| 162 |
|
| 163 |
# Load each subject separately
|
| 164 |
for i, file_path in enumerate(file_paths):
|
|
@@ -170,9 +170,8 @@ class EEGDataProcessor:
|
|
| 170 |
# Convert to arrays
|
| 171 |
X_subject = epochs.get_data().astype("float32")
|
| 172 |
y_subject = (epochs.events[:, -1] - 1).astype("int64")
|
| 173 |
-
|
| 174 |
-
|
| 175 |
-
subject_info.append({
|
| 176 |
'file_path': file_path,
|
| 177 |
'subject_id': f"Subject_{i+1}",
|
| 178 |
'n_epochs': len(X_subject),
|
|
@@ -180,25 +179,25 @@ class EEGDataProcessor:
|
|
| 180 |
'fs': fs
|
| 181 |
})
|
| 182 |
|
| 183 |
-
# LOSO split: one
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
|
| 187 |
-
# Combine training
|
| 188 |
-
if len(
|
| 189 |
-
X_train = np.concatenate([
|
| 190 |
-
y_train = np.concatenate([
|
| 191 |
else:
|
| 192 |
-
X_train, y_train =
|
| 193 |
-
|
| 194 |
-
X_test, y_test =
|
| 195 |
-
|
| 196 |
print("LOSO Split:")
|
| 197 |
-
print(f" Test Subject: {
|
| 198 |
-
print(f" Train Subjects: {len(
|
| 199 |
-
|
| 200 |
-
return X_train, y_train, X_test, y_test,
|
| 201 |
-
|
| 202 |
def simulate_real_time_data(self, X: np.ndarray, y: np.ndarray, mode: str = "random") -> Tuple[np.ndarray, int]:
|
| 203 |
"""
|
| 204 |
Simulate real-time EEG data for demo purposes.
|
|
@@ -227,31 +226,31 @@ class EEGDataProcessor:
|
|
| 227 |
|
| 228 |
return X[idx], y[idx]
|
| 229 |
|
| 230 |
-
def simulate_continuous_stream(self, raw_data: np.ndarray, fs: int, window_size: float = 1.5) -> np.ndarray:
|
| 231 |
-
|
| 232 |
-
|
| 233 |
|
| 234 |
-
|
| 235 |
-
|
| 236 |
-
|
| 237 |
-
|
| 238 |
|
| 239 |
-
|
| 240 |
-
|
| 241 |
-
|
| 242 |
-
|
| 243 |
|
| 244 |
-
|
| 245 |
-
|
| 246 |
-
|
| 247 |
-
|
| 248 |
|
| 249 |
-
|
| 250 |
-
|
| 251 |
-
|
| 252 |
|
| 253 |
-
|
| 254 |
-
|
| 255 |
|
| 256 |
-
|
| 257 |
|
|
|
|
| 146 |
|
| 147 |
return combined_raw, fs
|
| 148 |
|
| 149 |
+
def prepare_loso_split(self, file_paths: List[str], test_session_idx: int = 0) -> Tuple:
|
| 150 |
"""
|
| 151 |
+
Prepare Leave-One-Session-Out (LOSO) split for EEG data.
|
| 152 |
|
| 153 |
Args:
|
| 154 |
file_paths: List of .mat file paths (one per subject)
|
|
|
|
| 157 |
Returns:
|
| 158 |
X_train, y_train, X_test, y_test, subject_info
|
| 159 |
"""
|
| 160 |
+
all_sessions_data = []
|
| 161 |
+
session_info = []
|
| 162 |
|
| 163 |
# Load each subject separately
|
| 164 |
for i, file_path in enumerate(file_paths):
|
|
|
|
| 170 |
# Convert to arrays
|
| 171 |
X_subject = epochs.get_data().astype("float32")
|
| 172 |
y_subject = (epochs.events[:, -1] - 1).astype("int64")
|
| 173 |
+
all_sessions_data.append((X_subject, y_subject))
|
| 174 |
+
session_info.append({
|
|
|
|
| 175 |
'file_path': file_path,
|
| 176 |
'subject_id': f"Subject_{i+1}",
|
| 177 |
'n_epochs': len(X_subject),
|
|
|
|
| 179 |
'fs': fs
|
| 180 |
})
|
| 181 |
|
| 182 |
+
# LOSO split: one session for test, others for train
|
| 183 |
+
test_sessions = all_sessions_data[test_session_idx]
|
| 184 |
+
train_sessions = [all_sessions_data[i] for i in range(len(all_sessions_data)) if i != test_session_idx]
|
| 185 |
+
|
| 186 |
+
# Combine training sessions
|
| 187 |
+
if len(train_sessions) > 1:
|
| 188 |
+
X_train = np.concatenate([sess[0] for sess in train_sessions], axis=0)
|
| 189 |
+
y_train = np.concatenate([sess[1] for sess in train_sessions], axis=0)
|
| 190 |
else:
|
| 191 |
+
X_train, y_train = train_sessions[0]
|
| 192 |
+
|
| 193 |
+
X_test, y_test = test_sessions
|
| 194 |
+
|
| 195 |
print("LOSO Split:")
|
| 196 |
+
print(f" Test Subject: {session_info[test_session_idx]['subject_id']} ({len(X_test)} epochs)")
|
| 197 |
+
print(f" Train Subjects: {len(train_sessions)} subjects ({len(X_train)} epochs)")
|
| 198 |
+
|
| 199 |
+
return X_train, y_train, X_test, y_test, session_info
|
| 200 |
+
|
| 201 |
def simulate_real_time_data(self, X: np.ndarray, y: np.ndarray, mode: str = "random") -> Tuple[np.ndarray, int]:
|
| 202 |
"""
|
| 203 |
Simulate real-time EEG data for demo purposes.
|
|
|
|
| 226 |
|
| 227 |
return X[idx], y[idx]
|
| 228 |
|
| 229 |
+
# def simulate_continuous_stream(self, raw_data: np.ndarray, fs: int, window_size: float = 1.5) -> np.ndarray:
|
| 230 |
+
# """
|
| 231 |
+
# Simulate continuous EEG stream by extracting sliding windows from raw data.
|
| 232 |
|
| 233 |
+
# Args:
|
| 234 |
+
# raw_data: Continuous EEG data [n_channels, n_timepoints]
|
| 235 |
+
# fs: Sampling frequency
|
| 236 |
+
# window_size: Window size in seconds
|
| 237 |
|
| 238 |
+
# Returns:
|
| 239 |
+
# Single window of EEG data [n_channels, window_samples]
|
| 240 |
+
# """
|
| 241 |
+
# window_samples = int(window_size * fs) # e.g., 1.5s * 200Hz = 300 samples
|
| 242 |
|
| 243 |
+
# # Ensure we don't go beyond the data
|
| 244 |
+
# max_start = raw_data.shape[1] - window_samples
|
| 245 |
+
# if max_start <= 0:
|
| 246 |
+
# return raw_data # Return full data if too short
|
| 247 |
|
| 248 |
+
# # Random starting point in the continuous stream
|
| 249 |
+
# start_idx = np.random.randint(0, max_start)
|
| 250 |
+
# end_idx = start_idx + window_samples
|
| 251 |
|
| 252 |
+
# # Extract window
|
| 253 |
+
# window = raw_data[:, start_idx:end_idx]
|
| 254 |
|
| 255 |
+
# return window
|
| 256 |
|
sound_library.py
CHANGED
|
@@ -19,28 +19,19 @@ from scipy import signal
|
|
| 19 |
import librosa
|
| 20 |
|
| 21 |
class AudioEffectsProcessor:
|
| 22 |
-
"""Professional audio effects for DJ mode using scipy and librosa."""
|
| 23 |
-
|
| 24 |
@staticmethod
|
| 25 |
-
def
|
| 26 |
-
"""
|
| 27 |
try:
|
| 28 |
-
|
| 29 |
-
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
| 33 |
-
fade_curve = np.linspace(1.0, 0.3, fade_samples)
|
| 34 |
-
data[-fade_samples:] *= fade_curve
|
| 35 |
-
elif fade_type == "in":
|
| 36 |
-
# Fade in: linear increase from 0.3 to 1.0
|
| 37 |
-
fade_curve = np.linspace(0.3, 1.0, fade_samples)
|
| 38 |
-
data[:fade_samples] *= fade_curve
|
| 39 |
-
|
| 40 |
-
return data
|
| 41 |
except Exception as e:
|
| 42 |
-
print(f"
|
| 43 |
return data
|
|
|
|
| 44 |
|
| 45 |
@staticmethod
|
| 46 |
def apply_high_pass_filter(data: np.ndarray, samplerate: int, cutoff: float = 800.0) -> np.ndarray:
|
|
@@ -79,8 +70,8 @@ class AudioEffectsProcessor:
|
|
| 79 |
"""Apply simple reverb effect using delay and feedback."""
|
| 80 |
try:
|
| 81 |
# Simple reverb using multiple delayed copies
|
| 82 |
-
delay_samples = int(0.
|
| 83 |
-
decay = 0.
|
| 84 |
|
| 85 |
# Create reverb buffer
|
| 86 |
reverb_data = np.copy(data)
|
|
@@ -98,6 +89,19 @@ class AudioEffectsProcessor:
|
|
| 98 |
print(f"Reverb effect failed: {e}")
|
| 99 |
return data
|
| 100 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 101 |
@staticmethod
|
| 102 |
def apply_bass_boost(data: np.ndarray, samplerate: int, boost_db: float = 6.0) -> np.ndarray:
|
| 103 |
"""Apply bass boost using low-frequency shelving filter."""
|
|
@@ -127,9 +131,10 @@ class AudioEffectsProcessor:
|
|
| 127 |
print(f"Bass boost failed: {e}")
|
| 128 |
return data
|
| 129 |
|
|
|
|
| 130 |
@staticmethod
|
| 131 |
-
def
|
| 132 |
-
"""Process
|
| 133 |
try:
|
| 134 |
if not audio_file or not os.path.exists(audio_file):
|
| 135 |
print(f"Invalid audio file: {audio_file}")
|
|
@@ -138,36 +143,39 @@ class AudioEffectsProcessor:
|
|
| 138 |
if len(data.shape) > 1:
|
| 139 |
data = np.mean(data, axis=1)
|
| 140 |
processed_data = np.copy(data)
|
| 141 |
-
|
| 142 |
-
#
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
|
| 149 |
-
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
|
| 157 |
-
|
| 158 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 159 |
base_name = os.path.splitext(audio_file)[0]
|
| 160 |
-
|
| 161 |
-
processed_file = f"{base_name}_fx_{effects_suffix}.wav"
|
| 162 |
try:
|
| 163 |
-
|
| 164 |
-
print(f"ποΈ
|
| 165 |
return os.path.abspath(processed_file)
|
| 166 |
except Exception as e:
|
| 167 |
-
print(f"Failed to save processed
|
| 168 |
return os.path.abspath(audio_file)
|
| 169 |
except Exception as e:
|
| 170 |
-
print(f"
|
| 171 |
return os.path.abspath(audio_file) if audio_file else None
|
| 172 |
|
| 173 |
class SoundManager:
|
|
@@ -198,7 +206,7 @@ class SoundManager:
|
|
| 198 |
|
| 199 |
# DJ Effects phase management
|
| 200 |
self.current_phase = "building" # "building" or "dj_effects"
|
| 201 |
-
self.mixed_composition_file = None # Path to current mixed composition
|
| 202 |
self.active_effects = { # Track which effects are currently active
|
| 203 |
"left_hand": False, # Volume fade
|
| 204 |
"right_hand": False, # Filter sweep
|
|
@@ -210,7 +218,7 @@ class SoundManager:
|
|
| 210 |
# All possible movements (neutral is optional for composition)
|
| 211 |
self.all_movements = ["left_hand", "right_hand", "neutral", "left_leg", "tongue", "right_leg"]
|
| 212 |
|
| 213 |
-
# Active movements that contribute to composition (excluding neutral)
|
| 214 |
self.active_movements = ["left_hand", "right_hand", "left_leg", "right_leg"]
|
| 215 |
|
| 216 |
# Current cycle's random movement sequence (shuffled each cycle)
|
|
@@ -417,16 +425,17 @@ class SoundManager:
|
|
| 417 |
|
| 418 |
def transition_to_dj_phase(self):
|
| 419 |
"""Transition from building phase to DJ effects phase."""
|
| 420 |
-
# Only start DJ mode if all
|
| 421 |
unique_sounds = set()
|
| 422 |
for layer in self.composition_layers:
|
| 423 |
if layer.get('sound_file'):
|
| 424 |
unique_sounds.add(layer['sound_file'])
|
| 425 |
print(f"DEBUG: Unique sound files in composition_layers: {unique_sounds}")
|
| 426 |
print(f"DEBUG: Number of unique sounds: {len(unique_sounds)}")
|
| 427 |
-
if len(unique_sounds) >=
|
| 428 |
self.current_phase = "dj_effects"
|
| 429 |
-
self._create_mixed_composition()
|
|
|
|
| 430 |
print("π΅ Composition Complete! Transitioning to DJ Effects Phase...")
|
| 431 |
print("π§ You are now the DJ! Use movements to control effects:")
|
| 432 |
print(" π Left Hand: Volume Fade")
|
|
@@ -439,132 +448,134 @@ class SoundManager:
|
|
| 439 |
print("DEBUG: Not enough unique sounds to transition to DJ mode.")
|
| 440 |
return False
|
| 441 |
|
| 442 |
-
def _create_mixed_composition(self):
|
| 443 |
-
|
| 444 |
-
|
| 445 |
-
|
| 446 |
-
|
| 447 |
-
|
| 448 |
-
|
| 449 |
-
|
| 450 |
-
|
| 451 |
-
|
| 452 |
-
|
| 453 |
-
|
| 454 |
-
|
| 455 |
-
|
| 456 |
-
|
| 457 |
-
|
| 458 |
-
|
| 459 |
-
|
| 460 |
-
|
| 461 |
-
|
| 462 |
-
|
| 463 |
-
|
| 464 |
-
|
| 465 |
-
|
| 466 |
-
|
| 467 |
-
|
| 468 |
-
|
| 469 |
-
|
| 470 |
-
|
| 471 |
-
|
| 472 |
-
|
| 473 |
-
|
| 474 |
-
sample_rate = sr
|
| 475 |
|
| 476 |
-
|
| 477 |
-
|
| 478 |
-
|
| 479 |
|
| 480 |
-
|
| 481 |
-
|
| 482 |
-
|
| 483 |
-
|
| 484 |
-
|
| 485 |
-
|
| 486 |
-
|
| 487 |
-
|
| 488 |
-
|
| 489 |
|
| 490 |
-
|
| 491 |
-
|
| 492 |
-
|
| 493 |
-
|
| 494 |
-
|
| 495 |
-
|
| 496 |
-
|
| 497 |
-
|
| 498 |
-
|
| 499 |
-
|
| 500 |
-
|
| 501 |
|
| 502 |
-
|
| 503 |
-
|
| 504 |
-
|
| 505 |
-
|
| 506 |
-
|
| 507 |
-
|
| 508 |
-
|
| 509 |
|
| 510 |
-
|
| 511 |
-
|
| 512 |
-
|
| 513 |
-
|
| 514 |
-
|
| 515 |
-
|
| 516 |
-
|
| 517 |
-
|
| 518 |
-
|
| 519 |
-
|
| 520 |
-
|
| 521 |
-
|
| 522 |
-
|
| 523 |
|
| 524 |
def toggle_dj_effect(self, movement: str) -> dict:
|
| 525 |
-
"""Toggle a DJ effect for the given movement and process
|
| 526 |
if self.current_phase != "dj_effects":
|
| 527 |
return {"effect_applied": False, "message": "Not in DJ effects phase"}
|
| 528 |
-
|
| 529 |
if movement not in self.active_effects:
|
| 530 |
return {"effect_applied": False, "message": f"Unknown movement: {movement}"}
|
| 531 |
-
|
| 532 |
-
#
|
| 533 |
self.active_effects[movement] = not self.active_effects[movement]
|
| 534 |
effect_status = "ON" if self.active_effects[movement] else "OFF"
|
| 535 |
effect_names = {
|
| 536 |
-
"left_hand": "
|
| 537 |
"right_hand": "High Pass Filter",
|
| 538 |
-
"left_leg": "Reverb Effect",
|
| 539 |
"right_leg": "Low Pass Filter",
|
| 540 |
-
#"tongue": "Bass Boost"
|
| 541 |
}
|
| 542 |
effect_name = effect_names.get(movement, movement)
|
| 543 |
print(f"ποΈ {effect_name}: {effect_status}")
|
| 544 |
-
|
| 545 |
-
|
| 546 |
-
|
| 547 |
-
|
| 548 |
-
|
|
|
|
|
|
|
|
|
|
| 549 |
)
|
| 550 |
else:
|
| 551 |
-
|
| 552 |
-
|
| 553 |
-
# Only process if we successfully created a mixed composition
|
| 554 |
-
if self.mixed_composition_file and os.path.exists(self.mixed_composition_file):
|
| 555 |
-
processed_file = AudioEffectsProcessor.process_with_effects(
|
| 556 |
-
self.mixed_composition_file,
|
| 557 |
-
self.active_effects
|
| 558 |
-
)
|
| 559 |
-
else:
|
| 560 |
-
print("Failed to create mixed composition, using fallback")
|
| 561 |
-
processed_file = self.mixed_composition_file
|
| 562 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 563 |
return {
|
| 564 |
"effect_applied": True,
|
| 565 |
"effect_name": effect_name,
|
| 566 |
"effect_status": effect_status,
|
| 567 |
-
"
|
|
|
|
| 568 |
}
|
| 569 |
|
| 570 |
def get_cycle_success_rate(self) -> float:
|
|
@@ -641,8 +652,8 @@ class SoundManager:
|
|
| 641 |
# Create a unique identifier for this composition
|
| 642 |
import hashlib
|
| 643 |
composition_hash = hashlib.md5(''.join(sorted(audio_files)).encode()).hexdigest()[:8]
|
| 644 |
-
|
| 645 |
-
# FILE SAVING
|
| 646 |
mixed_audio_path = os.path.join(self.sound_dir, f"mixed_composition_{composition_hash}.wav")
|
| 647 |
|
| 648 |
# Since file saving is disabled, use the first available audio file from the list
|
|
@@ -650,7 +661,7 @@ class SoundManager:
|
|
| 650 |
# Use the first audio file as the "mixed" composition
|
| 651 |
first_audio_file = os.path.join(self.sound_dir, audio_files[0])
|
| 652 |
if os.path.exists(first_audio_file):
|
| 653 |
-
print(f"DEBUG: Using first audio file as mixed composition: {os.path.basename(first_audio_file)}
|
| 654 |
# Estimate BPM from the mixed composition audio file
|
| 655 |
try:
|
| 656 |
import librosa
|
|
|
|
| 19 |
import librosa
|
| 20 |
|
| 21 |
class AudioEffectsProcessor:
|
|
|
|
|
|
|
| 22 |
@staticmethod
|
| 23 |
+
def apply_echo(data: np.ndarray, samplerate: int, delay_time: float = 0.3, feedback: float = 0.4) -> np.ndarray:
|
| 24 |
+
"""Echo/delay effect (tempo-sync if delay_time is set to fraction of beat)."""
|
| 25 |
try:
|
| 26 |
+
delay_samples = int(delay_time * samplerate)
|
| 27 |
+
echo_data = np.copy(data)
|
| 28 |
+
for i in range(delay_samples, len(data)):
|
| 29 |
+
echo_data[i] += feedback * echo_data[i - delay_samples]
|
| 30 |
+
return 0.7 * data + 0.3 * echo_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 31 |
except Exception as e:
|
| 32 |
+
print(f"Echo failed: {e}")
|
| 33 |
return data
|
| 34 |
+
"""Professional audio effects for DJ mode using scipy and librosa."""
|
| 35 |
|
| 36 |
@staticmethod
|
| 37 |
def apply_high_pass_filter(data: np.ndarray, samplerate: int, cutoff: float = 800.0) -> np.ndarray:
|
|
|
|
| 70 |
"""Apply simple reverb effect using delay and feedback."""
|
| 71 |
try:
|
| 72 |
# Simple reverb using multiple delayed copies
|
| 73 |
+
delay_samples = int(0.08 * samplerate) # 80ms delay
|
| 74 |
+
decay = 0.4 * room_size
|
| 75 |
|
| 76 |
# Create reverb buffer
|
| 77 |
reverb_data = np.copy(data)
|
|
|
|
| 89 |
print(f"Reverb effect failed: {e}")
|
| 90 |
return data
|
| 91 |
|
| 92 |
+
@staticmethod
|
| 93 |
+
def apply_echo(data: np.ndarray, samplerate: int, delay_time: float = 0.3, feedback: float = 0.4) -> np.ndarray:
|
| 94 |
+
"""Echo/delay effect (tempo-sync if delay_time is set to fraction of beat)."""
|
| 95 |
+
try:
|
| 96 |
+
delay_samples = int(delay_time * samplerate)
|
| 97 |
+
echo_data = np.copy(data)
|
| 98 |
+
for i in range(delay_samples, len(data)):
|
| 99 |
+
echo_data[i] += feedback * echo_data[i - delay_samples]
|
| 100 |
+
return 0.7 * data + 0.3 * echo_data
|
| 101 |
+
except Exception as e:
|
| 102 |
+
print(f"Echo failed: {e}")
|
| 103 |
+
return data
|
| 104 |
+
|
| 105 |
@staticmethod
|
| 106 |
def apply_bass_boost(data: np.ndarray, samplerate: int, boost_db: float = 6.0) -> np.ndarray:
|
| 107 |
"""Apply bass boost using low-frequency shelving filter."""
|
|
|
|
| 131 |
print(f"Bass boost failed: {e}")
|
| 132 |
return data
|
| 133 |
|
| 134 |
+
# --- DJ MODE WRAPPER ---
|
| 135 |
@staticmethod
|
| 136 |
+
def process_layer_with_effects(audio_file: str, movement: str, active_effects: Dict[str, bool]) -> str:
|
| 137 |
+
"""Process a single layer with its corresponding DJ effect if active."""
|
| 138 |
try:
|
| 139 |
if not audio_file or not os.path.exists(audio_file):
|
| 140 |
print(f"Invalid audio file: {audio_file}")
|
|
|
|
| 143 |
if len(data.shape) > 1:
|
| 144 |
data = np.mean(data, axis=1)
|
| 145 |
processed_data = np.copy(data)
|
| 146 |
+
|
| 147 |
+
# Map movement to effect
|
| 148 |
+
effect_map = {
|
| 149 |
+
"left_hand": AudioEffectsProcessor.apply_echo,
|
| 150 |
+
"right_hand": AudioEffectsProcessor.apply_high_pass_filter,
|
| 151 |
+
"left_leg": AudioEffectsProcessor.apply_reverb,
|
| 152 |
+
"right_leg": AudioEffectsProcessor.apply_low_pass_filter,
|
| 153 |
+
}
|
| 154 |
+
effect_names = {
|
| 155 |
+
"left_hand": "echo",
|
| 156 |
+
"right_hand": "hpf",
|
| 157 |
+
"left_leg": "rev",
|
| 158 |
+
"right_leg": "lpf",
|
| 159 |
+
}
|
| 160 |
+
effect_func = effect_map.get(movement)
|
| 161 |
+
effect_name = effect_names.get(movement, "clean")
|
| 162 |
+
if active_effects.get(movement, False) and effect_func:
|
| 163 |
+
processed_data = effect_func(processed_data, samplerate)
|
| 164 |
+
suffix = f"_fx_{effect_name}"
|
| 165 |
+
else:
|
| 166 |
+
suffix = "_fx_clean"
|
| 167 |
+
|
| 168 |
base_name = os.path.splitext(audio_file)[0]
|
| 169 |
+
processed_file = f"{base_name}{suffix}.wav"
|
|
|
|
| 170 |
try:
|
| 171 |
+
sf.write(processed_file, processed_data, samplerate)
|
| 172 |
+
print(f"ποΈ Layer processed: {os.path.basename(processed_file)}")
|
| 173 |
return os.path.abspath(processed_file)
|
| 174 |
except Exception as e:
|
| 175 |
+
print(f"Failed to save processed layer: {e}")
|
| 176 |
return os.path.abspath(audio_file)
|
| 177 |
except Exception as e:
|
| 178 |
+
print(f"Layer processing failed: {e}")
|
| 179 |
return os.path.abspath(audio_file) if audio_file else None
|
| 180 |
|
| 181 |
class SoundManager:
|
|
|
|
| 206 |
|
| 207 |
# DJ Effects phase management
|
| 208 |
self.current_phase = "building" # "building" or "dj_effects"
|
| 209 |
+
# self.mixed_composition_file = None # Path to current mixed composition
|
| 210 |
self.active_effects = { # Track which effects are currently active
|
| 211 |
"left_hand": False, # Volume fade
|
| 212 |
"right_hand": False, # Filter sweep
|
|
|
|
| 218 |
# All possible movements (neutral is optional for composition)
|
| 219 |
self.all_movements = ["left_hand", "right_hand", "neutral", "left_leg", "tongue", "right_leg"]
|
| 220 |
|
| 221 |
+
# Active movements that contribute to composition (excluding neutral and tongue)
|
| 222 |
self.active_movements = ["left_hand", "right_hand", "left_leg", "right_leg"]
|
| 223 |
|
| 224 |
# Current cycle's random movement sequence (shuffled each cycle)
|
|
|
|
| 425 |
|
| 426 |
def transition_to_dj_phase(self):
|
| 427 |
"""Transition from building phase to DJ effects phase."""
|
| 428 |
+
# Only start DJ mode if all 4 sound layers are present (not just movements completed)
|
| 429 |
unique_sounds = set()
|
| 430 |
for layer in self.composition_layers:
|
| 431 |
if layer.get('sound_file'):
|
| 432 |
unique_sounds.add(layer['sound_file'])
|
| 433 |
print(f"DEBUG: Unique sound files in composition_layers: {unique_sounds}")
|
| 434 |
print(f"DEBUG: Number of unique sounds: {len(unique_sounds)}")
|
| 435 |
+
if len(unique_sounds) >= 4:
|
| 436 |
self.current_phase = "dj_effects"
|
| 437 |
+
# self._create_mixed_composition()
|
| 438 |
+
# just keep current stems running
|
| 439 |
print("π΅ Composition Complete! Transitioning to DJ Effects Phase...")
|
| 440 |
print("π§ You are now the DJ! Use movements to control effects:")
|
| 441 |
print(" π Left Hand: Volume Fade")
|
|
|
|
| 448 |
print("DEBUG: Not enough unique sounds to transition to DJ mode.")
|
| 449 |
return False
|
| 450 |
|
| 451 |
+
# def _create_mixed_composition(self):
|
| 452 |
+
# """Create a mixed audio file from all completed layers."""
|
| 453 |
+
# try:
|
| 454 |
+
# import hashlib
|
| 455 |
+
# movement_hash = hashlib.md5(str(sorted(self.movements_completed)).encode()).hexdigest()[:8]
|
| 456 |
+
# self.mixed_composition_file = os.path.abspath(f"mixed_composition_{movement_hash}.wav")
|
| 457 |
+
|
| 458 |
+
# # Try to use the first available completed movement's audio file
|
| 459 |
+
# for movement in self.movements_completed:
|
| 460 |
+
# if movement in self.current_sound_mapping and self.current_sound_mapping[movement] is not None:
|
| 461 |
+
# sound_file = os.path.join(self.sound_dir, self.current_sound_mapping[movement])
|
| 462 |
+
# if os.path.exists(sound_file):
|
| 463 |
+
# self.mixed_composition_file = os.path.abspath(sound_file)
|
| 464 |
+
# print(f"π Using existing audio as mixed composition: {self.mixed_composition_file}")
|
| 465 |
+
# return
|
| 466 |
+
|
| 467 |
+
# # If file already exists, use it
|
| 468 |
+
# if os.path.exists(self.mixed_composition_file):
|
| 469 |
+
# print(f"π Using existing mixed composition: {self.mixed_composition_file}")
|
| 470 |
+
# return
|
| 471 |
+
|
| 472 |
+
# # Create actual mixed composition by layering completed sounds
|
| 473 |
+
# mixed_data = None
|
| 474 |
+
# sample_rate = 44100 # Default sample rate
|
| 475 |
+
|
| 476 |
+
# for movement in self.movements_completed:
|
| 477 |
+
# if movement in self.current_sound_mapping and self.current_sound_mapping[movement] is not None:
|
| 478 |
+
# sound_file = os.path.join(self.sound_dir, self.current_sound_mapping[movement])
|
| 479 |
+
# if os.path.exists(sound_file):
|
| 480 |
+
# try:
|
| 481 |
+
# data, sr = sf.read(sound_file)
|
| 482 |
+
# sample_rate = sr
|
|
|
|
| 483 |
|
| 484 |
+
# # Convert stereo to mono
|
| 485 |
+
# if len(data.shape) > 1:
|
| 486 |
+
# data = np.mean(data, axis=1)
|
| 487 |
|
| 488 |
+
# # Initialize or add to mixed data
|
| 489 |
+
# if mixed_data is None:
|
| 490 |
+
# mixed_data = data * 0.8 # Reduce volume to prevent clipping
|
| 491 |
+
# else:
|
| 492 |
+
# # Ensure same length by padding shorter audio
|
| 493 |
+
# if len(data) > len(mixed_data):
|
| 494 |
+
# mixed_data = np.pad(mixed_data, (0, len(data) - len(mixed_data)))
|
| 495 |
+
# elif len(mixed_data) > len(data):
|
| 496 |
+
# data = np.pad(data, (0, len(mixed_data) - len(data)))
|
| 497 |
|
| 498 |
+
# # Mix the audio (layer them)
|
| 499 |
+
# mixed_data += data * 0.8
|
| 500 |
+
# except Exception as e:
|
| 501 |
+
# print(f"Error mixing {sound_file}: {e}")
|
| 502 |
+
|
| 503 |
+
# # Save mixed composition or create silent fallback
|
| 504 |
+
# if mixed_data is not None:
|
| 505 |
+
# # Normalize to prevent clipping
|
| 506 |
+
# max_val = np.max(np.abs(mixed_data))
|
| 507 |
+
# if max_val > 0.95:
|
| 508 |
+
# mixed_data = mixed_data * 0.95 / max_val
|
| 509 |
|
| 510 |
+
# sf.write(self.mixed_composition_file, mixed_data, sample_rate)
|
| 511 |
+
# print(f"π Mixed composition created: {self.mixed_composition_file} (FILE SAVING ENABLED)")
|
| 512 |
+
# else:
|
| 513 |
+
# # Create silent fallback file
|
| 514 |
+
# silent_data = np.zeros(int(sample_rate * 2)) # 2 seconds of silence
|
| 515 |
+
# sf.write(self.mixed_composition_file, silent_data, sample_rate)
|
| 516 |
+
# print(f"π Silent fallback composition created: {self.mixed_composition_file}")
|
| 517 |
|
| 518 |
+
# except Exception as e:
|
| 519 |
+
# print(f"Error creating mixed composition: {e}")
|
| 520 |
+
# # Create minimal fallback file with actual content
|
| 521 |
+
# self.mixed_composition_file = os.path.abspath("mixed_composition_fallback.wav")
|
| 522 |
+
# try:
|
| 523 |
+
# # Create a short silent audio file as fallback
|
| 524 |
+
# sample_rate = 44100
|
| 525 |
+
# silent_data = np.zeros(int(sample_rate * 2)) # 2 seconds of silence
|
| 526 |
+
# sf.write(self.mixed_composition_file, silent_data, sample_rate)
|
| 527 |
+
# print(f"π Silent fallback composition created: {self.mixed_composition_file}")
|
| 528 |
+
# except Exception as fallback_error:
|
| 529 |
+
# print(f"Failed to create fallback file: {fallback_error}")
|
| 530 |
+
# self.mixed_composition_file = None
|
| 531 |
|
| 532 |
def toggle_dj_effect(self, movement: str) -> dict:
|
| 533 |
+
"""Toggle a DJ effect for the given movement and process the corresponding layer."""
|
| 534 |
if self.current_phase != "dj_effects":
|
| 535 |
return {"effect_applied": False, "message": "Not in DJ effects phase"}
|
| 536 |
+
|
| 537 |
if movement not in self.active_effects:
|
| 538 |
return {"effect_applied": False, "message": f"Unknown movement: {movement}"}
|
| 539 |
+
|
| 540 |
+
# Toggle effect
|
| 541 |
self.active_effects[movement] = not self.active_effects[movement]
|
| 542 |
effect_status = "ON" if self.active_effects[movement] else "OFF"
|
| 543 |
effect_names = {
|
| 544 |
+
"left_hand": "Echo",
|
| 545 |
"right_hand": "High Pass Filter",
|
| 546 |
+
"left_leg": "Reverb Effect",
|
| 547 |
"right_leg": "Low Pass Filter",
|
|
|
|
| 548 |
}
|
| 549 |
effect_name = effect_names.get(movement, movement)
|
| 550 |
print(f"ποΈ {effect_name}: {effect_status}")
|
| 551 |
+
|
| 552 |
+
# Find the audio file for this movement
|
| 553 |
+
sound_file = self.current_sound_mapping.get(movement)
|
| 554 |
+
audio_path = os.path.join(self.sound_dir, sound_file) if sound_file else None
|
| 555 |
+
processed_file = None
|
| 556 |
+
if audio_path and os.path.exists(audio_path):
|
| 557 |
+
processed_file = AudioEffectsProcessor.process_layer_with_effects(
|
| 558 |
+
audio_path, movement, self.active_effects
|
| 559 |
)
|
| 560 |
else:
|
| 561 |
+
print(f"No audio file found for movement: {movement}")
|
| 562 |
+
processed_file = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 563 |
|
| 564 |
+
# For DJ phase, always play all base layers (with effects if toggled)
|
| 565 |
+
all_layers = {}
|
| 566 |
+
for m in self.active_movements:
|
| 567 |
+
sf_name = self.current_sound_mapping.get(m)
|
| 568 |
+
apath = os.path.join(self.sound_dir, sf_name) if sf_name else None
|
| 569 |
+
if apath and os.path.exists(apath):
|
| 570 |
+
all_layers[m] = AudioEffectsProcessor.process_layer_with_effects(
|
| 571 |
+
apath, m, self.active_effects
|
| 572 |
+
)
|
| 573 |
return {
|
| 574 |
"effect_applied": True,
|
| 575 |
"effect_name": effect_name,
|
| 576 |
"effect_status": effect_status,
|
| 577 |
+
"processed_layer": processed_file,
|
| 578 |
+
"all_layers": all_layers
|
| 579 |
}
|
| 580 |
|
| 581 |
def get_cycle_success_rate(self) -> float:
|
|
|
|
| 652 |
# Create a unique identifier for this composition
|
| 653 |
import hashlib
|
| 654 |
composition_hash = hashlib.md5(''.join(sorted(audio_files)).encode()).hexdigest()[:8]
|
| 655 |
+
|
| 656 |
+
# FILE SAVING ENABLED: Return first available audio file instead of creating mixed composition
|
| 657 |
mixed_audio_path = os.path.join(self.sound_dir, f"mixed_composition_{composition_hash}.wav")
|
| 658 |
|
| 659 |
# Since file saving is disabled, use the first available audio file from the list
|
|
|
|
| 661 |
# Use the first audio file as the "mixed" composition
|
| 662 |
first_audio_file = os.path.join(self.sound_dir, audio_files[0])
|
| 663 |
if os.path.exists(first_audio_file):
|
| 664 |
+
print(f"DEBUG: Using first audio file as mixed composition: {os.path.basename(first_audio_file)}")
|
| 665 |
# Estimate BPM from the mixed composition audio file
|
| 666 |
try:
|
| 667 |
import librosa
|