Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from transformers import LongformerTokenizer, pipeline | |
| from PIL import Image | |
| import pytesseract | |
| import cv2 | |
| import re | |
| import torch | |
| import matplotlib.pyplot as plt | |
| import math | |
| import io | |
| import base64 | |
| from typing import Dict, List, Any, Optional | |
| import numpy as np | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| device = 0 if torch.cuda.is_available() else -1 | |
| model_id = "allenai/longformer-base-4096" | |
| tok = LongformerTokenizer.from_pretrained(model_id) | |
| emo_head = pipeline( | |
| "text-classification", | |
| model="j-hartmann/emotion-english-distilroberta-base", | |
| return_all_scores=True, | |
| device=device, | |
| ) | |
| translator = pipeline( | |
| "translation", | |
| model="Helsinki-NLP/opus-mt-mul-en", | |
| device=device, | |
| ) | |
| flan = pipeline("text2text-generation", model="google/flan-t5-base", device=device) | |
| time_regex = re.compile(r"(\d{1,2}[:]\d{2}\s*(AM|PM|am|pm)?)|(\d{1,2}[/]\d{1,2}[/]\d{2,4})") | |
| negative_keys = {"anger", "sadness", "fear", "disgust"} | |
| positive_keys = {"joy", "surprise"} | |
| def mask_names(names: List[str]) -> Dict[str, str]: | |
| return {n: f"User_{i+1}" for i, n in enumerate(names)} | |
| def extract_time(line: str) -> str: | |
| m = time_regex.search(line) | |
| return m.group() if m else "" | |
| def ocr_image(image: Image.Image) -> str: | |
| img = image.convert("RGB") | |
| try: | |
| return pytesseract.image_to_string(img, lang="eng+hin+tel") | |
| except Exception: | |
| return pytesseract.image_to_string(img) | |
| def ocr_video_bytes(video_bytes: bytes) -> str: | |
| temp_path = "/tmp/temp_video.mp4" | |
| with open(temp_path, "wb") as f: | |
| f.write(video_bytes) | |
| cap = cv2.VideoCapture(temp_path) | |
| texts = [] | |
| idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if idx % 25 == 0: | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| img = Image.fromarray(rgb) | |
| try: | |
| t = pytesseract.image_to_string(img, lang="eng+hin+tel") | |
| except Exception: | |
| t = pytesseract.image_to_string(img) | |
| if t.strip(): | |
| texts.append(t) | |
| idx += 1 | |
| cap.release() | |
| return "\n".join(texts) | |
| def split_by_speaker(text: str, privacy: bool) -> Dict[str, str]: | |
| speakers: Dict[str, List[str]] = {} | |
| for raw in text.splitlines(): | |
| if ":" in raw: | |
| name, msg = raw.split(":", 1) | |
| name, msg = name.strip(), msg.strip() | |
| if msg: | |
| speakers.setdefault(name, []).append(msg) | |
| if not speakers: | |
| speakers["User"] = [text] | |
| if privacy: | |
| mapping = mask_names(list(speakers.keys())) | |
| return {mapping[k]: " ".join(v) for k, v in speakers.items()} | |
| return {k: " ".join(v) for k, v in speakers.items()} | |
| def chunk_text(text: str, max_tokens: int = 2048) -> List[str]: | |
| words = text.split() | |
| chunks: List[str] = [] | |
| temp: List[str] = [] | |
| for w in words: | |
| temp.append(w) | |
| enc = tok(" ".join(temp), truncation=True, max_length=max_tokens) | |
| if len(enc["input_ids"]) >= max_tokens: | |
| temp.pop() | |
| chunks.append(" ".join(temp)) | |
| temp = [w] | |
| if temp: | |
| chunks.append(" ".join(temp)) | |
| return chunks | |
| def translate_to_english(text: str) -> str: | |
| if not text or not text.strip(): | |
| return text | |
| ascii_chars = sum(1 for ch in text if ord(ch) < 128) | |
| ascii_ratio = ascii_chars / max(1, len(text)) | |
| if ascii_ratio > 0.9: | |
| return text | |
| try: | |
| out = translator(text, max_length=512) | |
| if isinstance(out, list) and out: | |
| return out[0]["translation_text"] | |
| except Exception: | |
| return text | |
| return text | |
| def emotion_scores(text: str) -> Dict[str, float]: | |
| res = emo_head(text)[0] | |
| return {x["label"]: float(x["score"]) for x in res} | |
| def emotions_over_chunks(chunks: List[str]) -> Dict[str, float]: | |
| if not chunks: | |
| return {} | |
| sums: Dict[str, float] = {} | |
| count = 0 | |
| for c in chunks: | |
| translated = translate_to_english(c) | |
| e = emotion_scores(translated) | |
| for k, v in e.items(): | |
| sums[k] = sums.get(k, 0.0) + v | |
| count += 1 | |
| return {k: v / count for k, v in sums.items()} if count else {} | |
| def compute_risk(emotions: Dict[str, float]) -> float: | |
| neg = sum(emotions.get(k, 0.0) for k in negative_keys) | |
| strongest_neg = max((emotions.get(k, 0.0) for k in negative_keys), default=0.0) | |
| risk = 0.7 * neg + 0.3 * strongest_neg | |
| return max(0.0, min(1.0, risk)) | |
| def dominant_emotions(emotions: Dict[str, float], top_n: int = 2, threshold: float = 0.2): | |
| if not emotions: | |
| return [] | |
| sorted_items = sorted(emotions.items(), key=lambda x: x[1], reverse=True) | |
| dom = [k for k, v in sorted_items if v >= threshold] | |
| if not dom: | |
| dom = [sorted_items[0][0]] | |
| return dom[:top_n] | |
| def summarize_person(name: str, text: str, risk: float, emotions: Dict[str, float]) -> str: | |
| emo_str_for_prompt = ", ".join(f"{k}: {round(v,3)}" for k, v in emotions.items()) | |
| prompt = ( | |
| "You are a clinical psychologist describing one person from a chat. " | |
| "Write a short summary in THIRD PERSON about this person only. " | |
| "Explain briefly: what they mainly talked about, what they seem to feel, " | |
| "and how they are coping with work or life. " | |
| "IMPORTANT: Do NOT copy or quote any sentences from the chat. " | |
| "Do NOT use lines like 'Name:' or repeat the exact wording. " | |
| "Write 4 to 6 ORIGINAL sentences in your own words.\n" | |
| f"Person name: {name}\n" | |
| f"Risk score (0-1): {round(risk,3)}\n" | |
| f"Emotion scores: {emo_str_for_prompt}\n" | |
| f"Conversation from this person:\n{text[:2500]}" | |
| ) | |
| out = flan(prompt, max_length=220, do_sample=False)[0]["generated_text"].strip() | |
| return out | |
| def hybrid_suggestions(name: str, summary: str, risk: float, emotions: Dict[str, float]) -> str: | |
| emo_str_for_prompt = ", ".join(f"{k}: {round(v,3)}" for k, v in emotions.items()) | |
| neg_sum = sum(emotions.get(k, 0.0) for k in negative_keys) | |
| pos_sum = sum(emotions.get(k, 0.0) for k in positive_keys) | |
| prompt = ( | |
| "You are a therapist AND a practical workplace coach giving advice directly to this person. " | |
| "Use the summary below only as background. " | |
| "You MUST NOT repeat sentences or phrases from the summary. " | |
| "Do NOT retell what happened in the chat. " | |
| "Instead, give 4 to 6 sentences of specific, realistic suggestions that mix emotional support " | |
| "and workplace strategies. Include both coping ideas (breathing, journaling, breaks, talking to someone) " | |
| "AND practical tips (communication, planning, boundaries, routines). " | |
| "Keep the tone gentle and hopeful.\n" | |
| f"Person name: {name}\n" | |
| f"Risk score (0-1): {round(risk,3)}\n" | |
| f"Total negative emotion: {round(neg_sum,3)}\n" | |
| f"Total positive emotion: {round(pos_sum,3)}\n" | |
| f"Emotion scores: {emo_str_for_prompt}\n" | |
| f"Summary of this person:\n{summary}" | |
| ) | |
| out = flan(prompt, max_length=230, do_sample=False)[0]["generated_text"].strip() | |
| return out | |
| def build_two_line_overall_summary(results: List[Dict[str, Any]], group_emo: Dict[str, float]) -> str: | |
| if not results: | |
| return "No conversation detected." | |
| names = [r["name"] for r in results] | |
| if len(names) == 1: | |
| name_part = names[0] | |
| else: | |
| name_part = ", ".join(names[:-1]) + " and " + names[-1] | |
| avg_risk = sum(r["risk"] for r in results) / len(results) | |
| if avg_risk > 0.7: | |
| risk_text = "are experiencing intense emotional strain related to this conversation." | |
| elif avg_risk > 0.45: | |
| risk_text = "are dealing with noticeable stress and emotional discomfort." | |
| else: | |
| risk_text = "show mostly manageable emotions with some moments of stress." | |
| if group_emo: | |
| top_emos = sorted(group_emo.items(), key=lambda x: x[1], reverse=True)[:3] | |
| emo_part = ", ".join(k for k, _ in top_emos) | |
| emo_text = f"The most prominent emotions in the group are {emo_part}." | |
| else: | |
| emo_text = "The emotional tone of the conversation is relatively neutral." | |
| return f"{name_part} {risk_text} {emo_text}" | |
| def plot_to_base64(fig) -> str: | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", bbox_inches="tight") | |
| buf.seek(0) | |
| img_base64 = base64.b64encode(buf.read()).decode("utf-8") | |
| plt.close(fig) | |
| return img_base64 | |
| async def analyze( | |
| text_input: Optional[str] = Form(None), | |
| privacy: str = Form("OFF"), | |
| images: List[UploadFile] = File(None), | |
| videos: List[UploadFile] = File(None), | |
| ): | |
| collected: List[str] = [] | |
| if text_input and text_input.strip(): | |
| collected.append(text_input) | |
| if images: | |
| for img_file in images: | |
| img_bytes = await img_file.read() | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| t = ocr_image(img) | |
| if t.strip(): | |
| collected.append(t) | |
| if videos: | |
| for vid_file in videos: | |
| vid_bytes = await vid_file.read() | |
| t = ocr_video_bytes(vid_bytes) | |
| if t.strip(): | |
| collected.append(t) | |
| if not collected: | |
| return {"error": "No readable text found."} | |
| combined = "\n".join(collected) | |
| speakers = split_by_speaker(combined, privacy == "ON") | |
| results: List[Dict[str, Any]] = [] | |
| for name, txt in speakers.items(): | |
| chunks = chunk_text(txt) | |
| emos = emotions_over_chunks(chunks) | |
| risk = compute_risk(emos) | |
| summary = summarize_person(name, txt, risk, emos) | |
| feedback = hybrid_suggestions(name, summary, risk, emos) | |
| results.append( | |
| { | |
| "name": name, | |
| "risk": risk, | |
| "emotions": emos, | |
| "summary": summary, | |
| "feedback": feedback, | |
| } | |
| ) | |
| fig1, ax = plt.subplots(1, 2, figsize=(11, 4)) | |
| names = [x["name"] for x in results] | |
| scores = [x["risk"] for x in results] | |
| ax[0].bar(names, scores, color="#B03A2E") | |
| ax[0].set_ylim(0, 1) | |
| ax[0].set_title("Risk Levels") | |
| group_emo: Dict[str, float] = {} | |
| for r in results: | |
| for k, v in r["emotions"].items(): | |
| group_emo[k] = group_emo.get(k, 0.0) + v | |
| group_emo = {k: v / len(results) for k, v in group_emo.items()} | |
| ax[1].bar(list(group_emo.keys()), list(group_emo.values()), color="#2E86C1") | |
| ax[1].set_ylim(0, 1) | |
| ax[1].set_title("Group Emotion") | |
| plt.tight_layout() | |
| plot1_b64 = plot_to_base64(fig1) | |
| n = len(results) | |
| cols = min(3, n) | |
| rows = math.ceil(n / cols) | |
| fig2, ax2 = plt.subplots(rows, cols, figsize=(5 * cols, 3 * rows)) | |
| axlist = [ax2] if n == 1 else ax2.flatten() | |
| for i, r in enumerate(results): | |
| axp = axlist[i] | |
| axp.bar(list(r["emotions"].keys()), list(r["emotions"].values()), color="#17A589") | |
| axp.set_ylim(0, 1) | |
| axp.set_title(r["name"]) | |
| axp.tick_params(axis="x", rotation=45) | |
| for j in range(len(axlist) - n): | |
| axlist[n + j].axis("off") | |
| fig2.tight_layout() | |
| plot2_b64 = plot_to_base64(fig2) | |
| overall_summary = build_two_line_overall_summary(results, group_emo) | |
| return { | |
| "overall_summary": overall_summary, | |
| "results": results, | |
| "group_emotions": group_emo, | |
| "plot1": plot1_b64, | |
| "plot2": plot2_b64, | |
| } | |
| async def root(): | |
| return {"message": "Mental Health Chat Analyzer API"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |