""" VitalSync AI - Intelligent Triage Assistant Bridging the gap between symptoms and care. Developed by Kunal Shaw https://github.com/KUNALSHAWW """ from datasets import load_dataset from IPython.display import clear_output import pandas as pd import re from dotenv import load_dotenv import os from ibm_watson_machine_learning.foundation_models.utils.enums import ModelTypes from ibm_watson_machine_learning.metanames import GenTextParamsMetaNames as GenParams from ibm_watson_machine_learning.foundation_models.utils.enums import DecodingMethods from langchain.llms import WatsonxLLM from langchain.embeddings import SentenceTransformerEmbeddings from langchain.embeddings.base import Embeddings from langchain.vectorstores.milvus import Milvus from langchain.embeddings import HuggingFaceEmbeddings from dotenv import load_dotenv import os from pymilvus import Collection, utility from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility from towhee import pipe, ops import numpy as np from langchain_core.retrievers import BaseRetriever from langchain_core.callbacks import CallbackManagerForRetrieverRun from langchain_core.documents import Document from pymilvus import Collection, utility from towhee import pipe, ops import numpy as np from towhee.datacollection import DataCollection from typing import List from langchain.chains import RetrievalQA from langchain.prompts import PromptTemplate from langchain.schema.runnable import RunnablePassthrough from langchain_core.retrievers import BaseRetriever from langchain_core.callbacks import CallbackManagerForRetrieverRun from fpdf import FPDF import time from datetime import datetime print_full_prompt = False # ═══════════════════════════════════════════════════════════════════════════════ # VITALSYNC AI - CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ VITALSYNC_CONFIG = { "name": "VitalSync AI", "version": "1.0.0", "tagline": "Bridging the gap between symptoms and care", "author": "Kunal Shaw", "github": "https://github.com/KUNALSHAWW" } # ═══════════════════════════════════════════════════════════════════════════════ # SAFETY TRIAGE LAYER - Emergency Detection System # ═══════════════════════════════════════════════════════════════════════════════ EMERGENCY_KEYWORDS = [ "suicide", "kill myself", "want to die", "end my life", "heart attack", "chest pain", "crushing chest", "can't breathe", "cannot breathe", "difficulty breathing", "choking", "unconscious", "passed out", "fainted", "stroke", "face drooping", "arm weakness", "speech difficulty", "severe bleeding", "heavy bleeding", "overdose", "poisoning", "seizure", "convulsions" ] EMERGENCY_RESPONSE = """ ⚠️ **CRITICAL HEALTH ALERT** ⚠️ Based on what you've described, this may be a **medical emergency**. **🚨 PLEASE TAKE IMMEDIATE ACTION:** 1. **Call Emergency Services NOW:** - 🇺🇸 USA: **911** - 🇮🇳 India: **112** or **102** - 🇬🇧 UK: **999** - 🇪🇺 Europe: **112** 2. **Do not wait** for AI assistance in emergencies 3. **Stay calm** and follow dispatcher instructions 4. If someone is with you, **ask them to help** --- *VitalSync AI cannot provide emergency medical care. Your safety is the priority.* **This conversation has been flagged for safety. Please seek immediate professional help.** """ def check_emergency_triage(message: str) -> bool: """ Safety Triage Layer: Detects emergency medical situations. Returns True if an emergency keyword is detected. """ message_lower = message.lower() for keyword in EMERGENCY_KEYWORDS: if keyword in message_lower: return True return False # ═══════════════════════════════════════════════════════════════════════════════ # PDF REPORT GENERATION - Consultation Export Feature # ═══════════════════════════════════════════════════════════════════════════════ class ConsultationReportPDF(FPDF): """Custom PDF class for VitalSync consultation reports.""" def header(self): self.set_font('Arial', 'B', 16) self.set_text_color(0, 128, 128) # Teal color self.cell(0, 10, 'VitalSync AI - Consultation Report', 0, 1, 'C') self.set_font('Arial', 'I', 10) self.set_text_color(128, 128, 128) self.cell(0, 5, 'Intelligent Triage Assistant', 0, 1, 'C') self.ln(5) self.set_draw_color(0, 128, 128) self.line(10, self.get_y(), 200, self.get_y()) self.ln(10) def footer(self): self.set_y(-30) self.set_draw_color(0, 128, 128) self.line(10, self.get_y(), 200, self.get_y()) self.ln(5) self.set_font('Arial', 'I', 8) self.set_text_color(128, 128, 128) self.multi_cell(0, 4, 'DISCLAIMER: This report is generated by VitalSync AI for informational purposes only. ' 'It does not constitute medical advice, diagnosis, or treatment. Always consult a qualified ' 'healthcare professional for medical concerns.', 0, 'C') self.cell(0, 4, f'Page {self.page_no()}', 0, 0, 'C') def generate_consultation_report(chat_history) -> str: """ Generates a PDF report from the chat history. Returns the filename of the generated PDF. """ if not chat_history or len(chat_history) == 0: return None pdf = ConsultationReportPDF() pdf.add_page() # Report metadata pdf.set_font('Arial', 'B', 12) pdf.set_text_color(0, 0, 0) pdf.cell(0, 8, f'Report Generated: {datetime.now().strftime("%B %d, %Y at %I:%M %p")}', 0, 1) pdf.cell(0, 8, f'Session ID: VS-{int(time.time())}', 0, 1) pdf.ln(10) # Conversation transcript pdf.set_font('Arial', 'B', 14) pdf.set_text_color(0, 128, 128) pdf.cell(0, 10, 'Consultation Transcript', 0, 1) pdf.ln(5) for i, (user_msg, bot_msg) in enumerate(chat_history, 1): # Patient message pdf.set_font('Arial', 'B', 11) pdf.set_text_color(70, 130, 180) # Steel blue pdf.cell(0, 8, f'Patient (Message {i}):', 0, 1) pdf.set_font('Arial', '', 10) pdf.set_text_color(0, 0, 0) safe_user_msg = user_msg.encode('latin-1', 'replace').decode('latin-1') pdf.multi_cell(0, 6, safe_user_msg) pdf.ln(3) # AI Response pdf.set_font('Arial', 'B', 11) pdf.set_text_color(0, 128, 128) # Teal pdf.cell(0, 8, f'VitalSync AI Response:', 0, 1) pdf.set_font('Arial', '', 10) pdf.set_text_color(0, 0, 0) safe_bot_msg = bot_msg.encode('latin-1', 'replace').decode('latin-1') safe_bot_msg = re.sub(r'\*\*(.+?)\*\*', r'\1', safe_bot_msg) safe_bot_msg = re.sub(r'\*(.+?)\*', r'\1', safe_bot_msg) pdf.multi_cell(0, 6, safe_bot_msg) pdf.ln(8) filename = f"vitalsync_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" pdf.output(filename) return filename # ═══════════════════════════════════════════════════════════════════════════════ # DATA & MODEL SETUP (Original Logic - Preserved) # ═══════════════════════════════════════════════════════════════════════════════ ## Step 1 Dataset Retrieving dataset = load_dataset("ruslanmv/ai-medical-chatbot") clear_output() train_data = dataset["train"] #For this demo let us choose the first 1000 dialogues df = pd.DataFrame(train_data[:1000]) #df = df[["Patient", "Doctor"]].rename(columns={"Patient": "question", "Doctor": "answer"}) df = df[["Description", "Doctor"]].rename(columns={"Description": "question", "Doctor": "answer"}) # Add the 'ID' column as the first column df.insert(0, 'id', df.index) # Reset the index and drop the previous index column df = df.reset_index(drop=True) # Clean the 'question' and 'answer' columns df['question'] = df['question'].apply(lambda x: re.sub(r'\s+', ' ', x.strip())) df['answer'] = df['answer'].apply(lambda x: re.sub(r'\s+', ' ', x.strip())) df['question'] = df['question'].str.replace('^Q.', '', regex=True) # Assuming your DataFrame is named df max_length = 500 # Due to our enbeeding model does not allow long strings df['question'] = df['question'].str.slice(0, max_length) #To use the dataset to get answers, let's first define the dictionary: #- `id_answer`: a dictionary of id and corresponding answer id_answer = df.set_index('id')['answer'].to_dict() load_dotenv() ## Step 2 Milvus connection COLLECTION_NAME='qa_medical' load_dotenv() # Configuration for Milvus/Zilliz milvus_uri = os.environ.get("MILVUS_URI") milvus_token = os.environ.get("MILVUS_TOKEN") host_milvus = os.environ.get("REMOTE_SERVER", '127.0.0.1') # Connect to Zilliz Cloud (if URI/Token provided) or Self-Hosted Milvus if milvus_uri and milvus_token: print(f"Connecting to Zilliz Cloud: {milvus_uri}") connections.connect(alias="default", uri=milvus_uri, token=milvus_token) else: print(f"Connecting to Milvus Host: {host_milvus}") connections.connect(host=host_milvus, port='19530') # Check if collection exists, if not create and populate it try: # Zilliz Cloud sometimes raises an exception instead of returning False has_col = utility.has_collection(COLLECTION_NAME) except Exception as e: print(f"Note: has_collection check failed ({str(e)}). Assuming collection does not exist.") has_col = False if not has_col: print(f"Collection {COLLECTION_NAME} not found. Creating and populating...") # Use MilvusClient for Zilliz Serverless (recommended approach) from pymilvus import MilvusClient if milvus_uri and milvus_token: client = MilvusClient(uri=milvus_uri, token=milvus_token) else: client = MilvusClient(uri=f"http://{host_milvus}:19530") # Create collection with MilvusClient (simpler API for serverless) client.create_collection( collection_name=COLLECTION_NAME, dimension=768, # DPR embedding dimension metric_type="IP", auto_id=False, id_type="int" ) print(f"Collection {COLLECTION_NAME} created successfully.") # 2. Generate Embeddings print("Generating embeddings for initial data...") embedding_pipe = ( pipe.input('question') .map('question', 'vec', lambda x: x[:500]) .map('vec', 'vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base')) .map('vec', 'vec', lambda x: x / np.linalg.norm(x, axis=0)) .output('vec') ) # Process and collect data data_to_insert = [] print("Processing embeddings (this may take a few minutes)...") for idx, q in enumerate(df['question']): res = embedding_pipe(q) vec = res.get()[0] data_to_insert.append({ "id": int(df['id'].iloc[idx]), "vector": vec.tolist() }) if (idx + 1) % 100 == 0: print(f"Processed {idx + 1}/{len(df)} embeddings...") # 3. Insert Data print("Inserting data into Zilliz...") client.insert(collection_name=COLLECTION_NAME, data=data_to_insert) print("Collection setup complete.") # Close the client connection client.close() # Reconnect with standard connection for the rest of the app connections.disconnect("default") if milvus_uri and milvus_token: connections.connect(alias="default", uri=milvus_uri, token=milvus_token) else: connections.connect(host=host_milvus, port='19530') collection = Collection(COLLECTION_NAME) collection.load() utility.load_state(COLLECTION_NAME) utility.loading_progress(COLLECTION_NAME) max_input_length = 500 # Maximum length allowed by the model # Initialize MilvusClient for search (compatible with Zilliz Serverless) from pymilvus import MilvusClient as SearchClient if milvus_uri and milvus_token: search_client = SearchClient(uri=milvus_uri, token=milvus_token) else: search_client = SearchClient(uri=f"http://{host_milvus}:19530") # Initialize embedding pipeline (without Milvus search - we'll do that separately) embedding_pipe = ( pipe.input('question') .map('question', 'vec', lambda x: x[:max_input_length]) .map('vec', 'vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base')) .map('vec', 'vec', lambda x: x / np.linalg.norm(x, axis=0)) .output('vec') ) def search_similar_questions(question: str) -> list: """Search for similar questions using MilvusClient directly (Zilliz Serverless compatible).""" # Get embedding for the question result = embedding_pipe(question) query_vector = result.get()[0].tolist() # Search using MilvusClient search_results = search_client.search( collection_name=COLLECTION_NAME, data=[query_vector], limit=1, output_fields=["id"] ) # Extract answers from results answers = [] for hits in search_results: for hit in hits: doc_id = hit['id'] if doc_id in id_answer: answers.append(id_answer[doc_id]) return answers # Step 3 - Custom LLM from openai import OpenAI # Get model name from environment or use Groq's llama model (mixtral was deprecated) LLM_MODEL = os.environ.get("LLM_MODEL", "llama-3.1-8b-instant") def generate_stream(prompt, model=None): # Use environment variables for flexibility (OpenAI, Groq, or Custom HF Endpoint) base_url = os.environ.get("LLM_BASE_URL", "https://api.groq.com/openai/v1") api_key = os.environ.get("LLM_API_KEY") if not api_key: print("ERROR: LLM_API_KEY not set!") return None if model is None: model = LLM_MODEL print(f"Using LLM: {model} at {base_url}") client = OpenAI(base_url=base_url, api_key=api_key) response = client.chat.completions.create( model=model, messages=[ { "role": "system", "content": """You are VitalSync AI, an expert medical health assistant developed by Kunal Shaw. Your role is to provide detailed, accurate, and helpful health information. Guidelines: - Provide comprehensive and medically accurate responses - Structure your answers clearly with relevant details - Include possible causes, symptoms, and general recommendations when appropriate - Always recommend consulting a healthcare professional for proper diagnosis and treatment - Be empathetic and supportive in your tone - If you're uncertain about something, acknowledge it honestly - Never provide specific medication dosages or treatment plans - always defer to medical professionals""" }, { "role": "user", "content": prompt, } ], stream=True, temperature=0.7, max_tokens=1024, ) return response # Zephyr formatter def format_prompt_zephyr(message, history, system_message): prompt = ( "<|system|>\n" + system_message + "" ) for user_prompt, bot_response in history: prompt += f"<|user|>\n{user_prompt}" prompt += f"<|assistant|>\n{bot_response}" if message=="": message="Hello" prompt += f"<|user|>\n{message}" prompt += f"<|assistant|>" #print(prompt) return prompt # Step 4 Langchain Definitions class CustomRetrieverLang(BaseRetriever): def get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: # Perform the encoding and retrieval for a specific question answers = search_similar_questions(query) answer_string = ' '.join(answers) if answers else "No relevant information found." return [Document(page_content=answer_string)] # Ensure correct VectorStoreRetriever usage retriever = CustomRetrieverLang() def full_prompt( question, history="" ): context=[] # Get the retrieved context docs = retriever.get_relevant_documents(question) print("Retrieved context:") for doc in docs: context.append(doc.page_content) context=" ".join(context) #print(context) default_system_message = f""" You are VitalSync AI, an expert health assistant. Please follow these guidelines: 1. **Provide Detailed Responses**: Give comprehensive answers that address the user's health concerns thoroughly. 2. **Be Medically Accurate**: Base your responses on the provided medical context and established medical knowledge. 3. **Structure Your Answer**: - Start with a direct answer to the question - Explain relevant causes or factors - Provide helpful recommendations or next steps - Mention when professional medical consultation is advised 4. **Be Empathetic**: Show understanding and compassion for health concerns. 5. **Safety First**: Always recommend consulting a healthcare provider for proper diagnosis and treatment. 6. **Use the Context**: Reference the following medical context to provide accurate information: Medical Context: {context} Remember: You are here to help users understand their health concerns better, not to replace professional medical advice. """ system_message = os.environ.get("SYSTEM_MESSAGE", default_system_message) formatted_prompt = format_prompt_zephyr(question, history, system_message=system_message) print(formatted_prompt) return formatted_prompt def custom_llm( question, history="", temperature=0.8, max_tokens=256, top_p=0.95, stop=None, ): formatted_prompt = full_prompt(question, history) try: print("LLM Input:", formatted_prompt) output = "" stream = generate_stream(formatted_prompt) # Check if stream is None before iterating if stream is None: print("No response generated.") return for response in stream: character = response.choices[0].delta.content # Handle empty character and stop reason if character is not None: print(character, end="", flush=True) output += character elif response.choices[0].finish_reason == "stop": print("Generation stopped.") break # or return output depending on your needs else: pass if "<|user|>" in character: # end of context print("----end of context----") return #print(output) #yield output except Exception as e: error_msg = str(e) print(f"LLM ERROR: {error_msg}") if "Too Many Requests" in error_msg or "rate_limit" in error_msg.lower(): output = "I'm receiving too many requests right now. Please try again in a moment." elif "authentication" in error_msg.lower() or "api_key" in error_msg.lower() or "401" in error_msg: output = "There's an authentication issue with the AI service. Please check the API configuration." elif "model" in error_msg.lower() and "not found" in error_msg.lower(): output = f"The AI model is not available. Error: {error_msg}" else: output = f"I encountered an error while processing your request. Technical details: {error_msg[:200]}" return output from langchain.llms import BaseLLM from langchain_core.language_models.llms import LLMResult class MyCustomLLM(BaseLLM): def _generate( self, prompt: str, *, temperature: float = 0.7, max_tokens: int = 256, top_p: float = 0.95, stop: list[str] = None, **kwargs, ) -> LLMResult: # Change return type to LLMResult response_text = custom_llm( question=prompt, temperature=temperature, max_tokens=max_tokens, top_p=top_p, stop=stop, ) # Convert the response text to LLMResult format response = LLMResult(generations=[[{'text': response_text}]]) return response def _llm_type(self) -> str: return "VitalSync LLM" # Create a Langchain with your custom LLM rag_chain = MyCustomLLM() # Invoke the chain with your question question = "I have started to get lots of acne on my face, particularly on my forehead what can I do" print(rag_chain.invoke(question)) # ═══════════════════════════════════════════════════════════════════════════════ # VITALSYNC CHAT FUNCTIONS # ═══════════════════════════════════════════════════════════════════════════════ import gradio as gr def vitalsync_chat(message, history): """ Main chat function with integrated Safety Triage Layer. """ history = history or [] if isinstance(history, str): history = [] # SAFETY TRIAGE CHECK - Intercept emergencies before AI processing if check_emergency_triage(message): return EMERGENCY_RESPONSE # Normal AI processing response = rag_chain.invoke(message) return response def chat(message, history): history = history or [] if isinstance(history, str): history = [] # Reset history to empty list if it's a string response = vitalsync_chat(message, history) history.append((message, response)) return history, response def chat_v1(message, history): response = vitalsync_chat(message, history) return (response) collection.load() # ═══════════════════════════════════════════════════════════════════════════════ # GRADIO INTERFACE - VitalSync AI Dashboard # ═══════════════════════════════════════════════════════════════════════════════ # Function to read CSS from file (improved readability) def read_css_from_file(filename): with open(filename, "r") as f: return f.read() # Read CSS from file css = read_css_from_file("style.css") # VitalSync Welcome Message welcome_message = '''