Kevin King commited on
Commit
e83cd54
·
1 Parent(s): 764dc1d

REFAC: Update requirements and enhance Streamlit app for multimodal emotion analysis

Browse files
Files changed (2) hide show
  1. requirements.txt +3 -2
  2. src/streamlit_app.py +85 -67
requirements.txt CHANGED
@@ -17,10 +17,11 @@ tf-keras==2.16.0
17
  torch==2.7.0
18
  torchaudio==2.7.0
19
 
20
- # Pin data/audio libraries for stability
21
  pandas==2.2.2
22
  numpy==1.26.4
23
  soundfile==0.12.1
24
  librosa==0.10.1
25
  scipy==1.13.0
26
- Pillow==10.3.0
 
 
17
  torch==2.7.0
18
  torchaudio==2.7.0
19
 
20
+ # Pin data/audio libraries for stability and new features
21
  pandas==2.2.2
22
  numpy==1.26.4
23
  soundfile==0.12.1
24
  librosa==0.10.1
25
  scipy==1.13.0
26
+ Pillow==10.3.0
27
+ scikit-learn==1.4.2
src/streamlit_app.py CHANGED
@@ -11,48 +11,33 @@ import tempfile
11
  import cv2
12
  from moviepy.editor import VideoFileClip
13
  import time
14
- import shutil
 
15
 
16
- # --- Create a cross-platform, writable cache directory for all libraries ---
17
  CACHE_DIR = os.path.join(tempfile.gettempdir(), "affectlink_cache")
18
  os.makedirs(CACHE_DIR, exist_ok=True)
19
  os.environ['DEEPFACE_HOME'] = CACHE_DIR
20
  os.environ['HF_HOME'] = CACHE_DIR
21
 
22
- # Define paths for the pre-included model weights
23
- MODEL_NAME = "facial_expression_model_weights.h5"
24
- SOURCE_PATH = os.path.join("src", "weights", MODEL_NAME)
25
- DEST_DIR = os.path.join(CACHE_DIR, ".deepface", "weights")
26
- DEST_PATH = os.path.join(DEST_DIR, MODEL_NAME)
27
-
28
- # Create the destination directory if it doesn't exist and copy the model
29
- if not os.path.exists(DEST_PATH):
30
- print(f"Model not found in cache. Copying from {SOURCE_PATH} to {DEST_PATH}...")
31
- os.makedirs(DEST_DIR, exist_ok=True)
32
- try:
33
- shutil.copy(SOURCE_PATH, DEST_PATH)
34
- print("Model copied successfully.")
35
- except FileNotFoundError:
36
- print(f"Warning: Local model file not found at {SOURCE_PATH}. App will attempt to download it.")
37
- except Exception as e:
38
- print(f"Error copying model file: {e}")
39
-
40
  # --- Page Configuration ---
41
  st.set_page_config(page_title="AffectLink Demo", page_icon="😊", layout="wide")
42
  st.title("AffectLink: Post-Hoc Emotion Analysis")
43
- st.write("Upload a short video clip (under 30 seconds) to analyze facial expressions, speech-to-text, and the emotional tone of the audio.")
44
 
45
  # --- Logger Configuration ---
46
-
47
  logging.basicConfig(level=logging.INFO)
48
  logging.getLogger('deepface').setLevel(logging.ERROR)
49
  logging.getLogger('huggingface_hub').setLevel(logging.WARNING)
50
  logging.getLogger('moviepy').setLevel(logging.ERROR)
51
 
 
52
  # --- Emotion Mappings ---
53
- UNIFIED_EMOTIONS = ['neutral', 'happy', 'sad', 'angry']
54
  TEXT_TO_UNIFIED = {'neutral': 'neutral', 'joy': 'happy', 'sadness': 'sad', 'anger': 'angry'}
55
  SER_TO_UNIFIED = {'neu': 'neutral', 'hap': 'happy', 'sad': 'sad', 'ang': 'angry'}
 
56
  AUDIO_SAMPLE_RATE = 16000
57
 
58
  # --- Model Loading ---
@@ -68,6 +53,29 @@ def load_models():
68
 
69
  whisper_model, text_classifier, ser_model, ser_feature_extractor = load_models()
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  # --- UI and Processing Logic ---
72
  uploaded_file = st.file_uploader("Choose a video file...", type=["mp4", "mov", "avi", "mkv"])
73
 
@@ -81,9 +89,11 @@ if uploaded_file is not None:
81
  st.video(temp_video_path)
82
 
83
  if st.button("Analyze Video"):
84
- facial_analysis_results = []
 
85
  audio_analysis_results = {}
86
 
 
87
  with st.spinner("Analyzing video for facial expressions... (1 frame per second)"):
88
  cap = None
89
  try:
@@ -97,16 +107,15 @@ if uploaded_file is not None:
97
  timestamp = frame_count / fps
98
  analysis = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False, silent=True)
99
  if isinstance(analysis, list) and len(analysis) > 0:
100
- facial_analysis_results.append((timestamp, analysis[0]['dominant_emotion'].capitalize()))
 
101
  frame_count += 1
102
- except Exception as e:
103
- st.error(f"An error occurred during facial analysis: {e}")
104
  finally:
105
  if cap: cap.release()
106
 
 
107
  with st.spinner("Extracting and analyzing audio..."):
108
  video_clip = None
109
- temp_audio_path = None
110
  try:
111
  video_clip = VideoFileClip(temp_video_path)
112
  if video_clip.audio:
@@ -114,65 +123,74 @@ if uploaded_file is not None:
114
  video_clip.audio.write_audiofile(taudio.name, fps=AUDIO_SAMPLE_RATE, logger=None)
115
  temp_audio_path = taudio.name
116
 
 
117
  result = whisper_model.transcribe(temp_audio_path, fp16=False)
118
- transcribed_text = result['text'] if result['text'].strip() else "No speech detected."
119
- audio_analysis_results['Transcription'] = transcribed_text
120
 
121
- if "No speech detected" not in transcribed_text:
122
  text_emotions = text_classifier(transcribed_text)[0]
123
- unified_text_scores = {e: 0.0 for e in UNIFIED_EMOTIONS}
124
- for emo in text_emotions:
125
- unified_emo = TEXT_TO_UNIFIED.get(emo['label'])
126
- if unified_emo: unified_text_scores[unified_emo] += emo['score']
127
- audio_analysis_results['Text Emotion'] = max(unified_text_scores, key=unified_text_scores.get).capitalize()
128
 
129
  audio_array, _ = sf.read(temp_audio_path, dtype='float32')
130
-
131
- if audio_array.ndim == 2:
132
- audio_array = audio_array.mean(axis=1)
133
-
134
- min_length = 1024
135
- if len(audio_array) < min_length:
136
- padding = np.zeros(min_length - len(audio_array), dtype=np.float32)
137
- audio_array = np.concatenate([audio_array, padding])
138
 
139
  inputs = ser_feature_extractor(audio_array, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt", padding=True)
140
  with torch.no_grad():
141
  logits = ser_model(**inputs).logits
142
  scores = torch.nn.functional.softmax(logits, dim=1).squeeze()
143
- unified_ser_scores = {e: 0.0 for e in UNIFIED_EMOTIONS}
144
- for i, score in enumerate(scores):
145
- raw_emo = ser_model.config.id2label[i]
146
- unified_emo = SER_TO_UNIFIED.get(raw_emo)
147
- if unified_emo: unified_ser_scores[unified_emo] += score.item()
148
- audio_analysis_results['Speech Emotion'] = max(unified_ser_scores, key=unified_ser_scores.get).capitalize()
149
  else:
150
- audio_analysis_results['Transcription'] = "No audio track found in the video."
151
-
152
- except Exception as e:
153
- st.error(f"An error occurred during audio analysis: {e}")
154
  finally:
155
  if video_clip: video_clip.close()
156
- if temp_audio_path and os.path.exists(temp_audio_path): os.unlink(temp_audio_path)
157
-
 
158
  st.header("Analysis Results")
159
- col1, col2 = st.columns(2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  with col1:
161
- st.subheader("Audio Analysis")
162
  st.write(f"**Transcription:** \"{audio_analysis_results.get('Transcription', 'N/A')}\"")
163
- st.metric("Emotion from Text", audio_analysis_results.get('Text Emotion', 'N/A'))
164
- st.metric("Emotion from Speech", audio_analysis_results.get('Speech Emotion', 'N/A'))
 
 
 
165
  with col2:
166
- st.subheader("Facial Expression Timeline")
167
- if facial_analysis_results:
168
- for timestamp, emotion in facial_analysis_results:
169
- st.write(f"**Time {int(timestamp // 60):02d}:{int(timestamp % 60):02d}:** {emotion}")
 
 
 
170
  else:
171
- st.write("No faces detected or video processing failed.")
172
-
173
  finally:
174
  if temp_video_path and os.path.exists(temp_video_path):
175
- time.sleep(1)
176
  try:
177
  os.unlink(temp_video_path)
178
  except Exception:
 
11
  import cv2
12
  from moviepy.editor import VideoFileClip
13
  import time
14
+ import pandas as pd
15
+ from sklearn.metrics.pairwise import cosine_similarity
16
 
17
+ # --- Create a cross-platform, writable cache directory ---
18
  CACHE_DIR = os.path.join(tempfile.gettempdir(), "affectlink_cache")
19
  os.makedirs(CACHE_DIR, exist_ok=True)
20
  os.environ['DEEPFACE_HOME'] = CACHE_DIR
21
  os.environ['HF_HOME'] = CACHE_DIR
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  # --- Page Configuration ---
24
  st.set_page_config(page_title="AffectLink Demo", page_icon="😊", layout="wide")
25
  st.title("AffectLink: Post-Hoc Emotion Analysis")
26
+ st.write("Upload a short video clip (under 30 seconds) to see a multimodal emotion analysis.")
27
 
28
  # --- Logger Configuration ---
29
+ # [Logger setup remains the same]
30
  logging.basicConfig(level=logging.INFO)
31
  logging.getLogger('deepface').setLevel(logging.ERROR)
32
  logging.getLogger('huggingface_hub').setLevel(logging.WARNING)
33
  logging.getLogger('moviepy').setLevel(logging.ERROR)
34
 
35
+
36
  # --- Emotion Mappings ---
37
+ UNIFIED_EMOTIONS = ['angry', 'happy', 'sad', 'neutral'] # Defined order for vectors
38
  TEXT_TO_UNIFIED = {'neutral': 'neutral', 'joy': 'happy', 'sadness': 'sad', 'anger': 'angry'}
39
  SER_TO_UNIFIED = {'neu': 'neutral', 'hap': 'happy', 'sad': 'sad', 'ang': 'angry'}
40
+ FACIAL_TO_UNIFIED = {'neutral': 'neutral', 'happy': 'happy', 'sad': 'sad', 'angry': 'angry'}
41
  AUDIO_SAMPLE_RATE = 16000
42
 
43
  # --- Model Loading ---
 
53
 
54
  whisper_model, text_classifier, ser_model, ser_feature_extractor = load_models()
55
 
56
+ # --- Helper Functions for Analysis ---
57
+ def create_unified_vector(scores_dict, mapping_dict):
58
+ """Creates a normalized vector from a dictionary of scores based on a mapping."""
59
+ vector = np.zeros(len(UNIFIED_EMOTIONS))
60
+ for label, score in scores_dict.items():
61
+ unified_label = mapping_dict.get(label)
62
+ if unified_label and unified_label in UNIFIED_EMOTIONS:
63
+ idx = UNIFIED_EMOTIONS.index(unified_label)
64
+ vector[idx] += score
65
+
66
+ # Normalize the vector
67
+ norm = np.linalg.norm(vector)
68
+ if norm > 0:
69
+ vector /= norm
70
+ return vector
71
+
72
+ def get_consistency_level(cosine_sim):
73
+ """Convert cosine similarity to a qualitative label."""
74
+ if cosine_sim >= 0.8: return "High"
75
+ if cosine_sim >= 0.6: return "Medium"
76
+ if cosine_sim >= 0.3: return "Low"
77
+ return "Very Low"
78
+
79
  # --- UI and Processing Logic ---
80
  uploaded_file = st.file_uploader("Choose a video file...", type=["mp4", "mov", "avi", "mkv"])
81
 
 
89
  st.video(temp_video_path)
90
 
91
  if st.button("Analyze Video"):
92
+ # Dictionaries to hold all results
93
+ fer_timeline = {}
94
  audio_analysis_results = {}
95
 
96
+ # --- Video Processing ---
97
  with st.spinner("Analyzing video for facial expressions... (1 frame per second)"):
98
  cap = None
99
  try:
 
107
  timestamp = frame_count / fps
108
  analysis = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False, silent=True)
109
  if isinstance(analysis, list) and len(analysis) > 0:
110
+ # Store the full emotion dictionary for the plot
111
+ fer_timeline[timestamp] = analysis[0]['emotion']
112
  frame_count += 1
 
 
113
  finally:
114
  if cap: cap.release()
115
 
116
+ # --- Audio Processing ---
117
  with st.spinner("Extracting and analyzing audio..."):
118
  video_clip = None
 
119
  try:
120
  video_clip = VideoFileClip(temp_video_path)
121
  if video_clip.audio:
 
123
  video_clip.audio.write_audiofile(taudio.name, fps=AUDIO_SAMPLE_RATE, logger=None)
124
  temp_audio_path = taudio.name
125
 
126
+ # Run all audio models
127
  result = whisper_model.transcribe(temp_audio_path, fp16=False)
128
+ transcribed_text = result['text'].strip()
129
+ audio_analysis_results['Transcription'] = transcribed_text if transcribed_text else "No speech detected."
130
 
131
+ if transcribed_text:
132
  text_emotions = text_classifier(transcribed_text)[0]
133
+ audio_analysis_results['Text Emotion Scores'] = {emo['label']: emo['score'] for emo in text_emotions}
 
 
 
 
134
 
135
  audio_array, _ = sf.read(temp_audio_path, dtype='float32')
136
+ if audio_array.ndim == 2: audio_array = audio_array.mean(axis=1)
137
+ if len(audio_array) < 1024: audio_array = np.pad(audio_array, (0, 1024 - len(audio_array)))
 
 
 
 
 
 
138
 
139
  inputs = ser_feature_extractor(audio_array, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt", padding=True)
140
  with torch.no_grad():
141
  logits = ser_model(**inputs).logits
142
  scores = torch.nn.functional.softmax(logits, dim=1).squeeze()
143
+ ser_scores = {ser_model.config.id2label[i]: score.item() for i, score in enumerate(scores)}
144
+ audio_analysis_results['Speech Emotion Scores'] = ser_scores
 
 
 
 
145
  else:
146
+ audio_analysis_results['Transcription'] = "No audio track found."
 
 
 
147
  finally:
148
  if video_clip: video_clip.close()
149
+ if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.unlink(temp_audio_path)
150
+
151
+ # --- Post-Analysis and Visualization ---
152
  st.header("Analysis Results")
153
+
154
+ # Prepare data for display
155
+ fer_avg_scores = pd.DataFrame(fer_timeline).T.mean().to_dict() if fer_timeline else {}
156
+ ser_scores = audio_analysis_results.get('Speech Emotion Scores', {})
157
+ text_scores = audio_analysis_results.get('Text Emotion Scores', {})
158
+
159
+ # Create vectors for cosine similarity
160
+ fer_vector = create_unified_vector(fer_avg_scores, FACIAL_TO_UNIFIED)
161
+ ser_vector = create_unified_vector(ser_scores, SER_TO_UNIFIED)
162
+ text_vector = create_unified_vector(text_scores, TEXT_TO_UNIFIED)
163
+
164
+ # Calculate similarities
165
+ sim_face_text = cosine_similarity([fer_vector], [text_vector])[0][0]
166
+ sim_face_speech = cosine_similarity([fer_vector], [ser_vector])[0][0]
167
+ sim_speech_text = cosine_similarity([ser_vector], [text_vector])[0][0]
168
+ avg_similarity = np.mean([sim_face_text, sim_face_speech, sim_speech_text])
169
+
170
+ # Display metrics
171
+ col1, col2 = st.columns([1, 2])
172
  with col1:
173
+ st.subheader("Multimodal Summary")
174
  st.write(f"**Transcription:** \"{audio_analysis_results.get('Transcription', 'N/A')}\"")
175
+ st.metric("Dominant Facial Emotion", max(fer_avg_scores, key=fer_avg_scores.get).capitalize() if fer_avg_scores else "N/A")
176
+ st.metric("Dominant Text Emotion", max(text_scores, key=lambda k: TEXT_TO_UNIFIED.get(k) is not None and text_scores.get(k) or -1).capitalize() if text_scores else "N/A")
177
+ st.metric("Dominant Speech Emotion", max(ser_scores, key=lambda k: SER_TO_UNIFIED.get(k) is not None and ser_scores.get(k) or -1).capitalize() if ser_scores else "N/A")
178
+ st.metric("Emotion Consistency", get_consistency_level(avg_similarity), f"{avg_similarity:.2f} Avg. Cosine Similarity")
179
+
180
  with col2:
181
+ st.subheader("Facial Emotion Over Time")
182
+ if fer_timeline:
183
+ # Convert timeline to a DataFrame suitable for st.line_chart
184
+ df = pd.DataFrame(fer_timeline).T
185
+ # Filter for only the unified emotions we care about
186
+ df_filtered = df[list(FACIAL_TO_UNIFIED.keys())].rename(columns=FACIAL_TO_UNIFIED)
187
+ st.line_chart(df_filtered)
188
  else:
189
+ st.write("No faces detected to plot.")
190
+
191
  finally:
192
  if temp_video_path and os.path.exists(temp_video_path):
193
+ time.sleep(1)
194
  try:
195
  os.unlink(temp_video_path)
196
  except Exception: