PRIYANSHUDHAKED's picture
Create app.py
0ff4721 verified
raw
history blame
6.66 kB
import streamlit as st
import os
import yt_dlp
from moviepy.editor import VideoFileClip
import speech_recognition as sr
import torch
from PIL import Image
from sentence_transformers import SentenceTransformer, util
from collections import defaultdict
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import google.generativeai as genai
from llama_index.core import SimpleDirectoryReader
from llama_index.multi_modal_llms.gemini import GeminiMultiModal
# Set up the Gemini API
GOOGLE_API_TOKEN = st.secrets["GOOGLE_API_TOKEN"]
genai.configure(api_key=GOOGLE_API_TOKEN)
# Set up global variables
output_folder = "/tmp/mixed_data/"
output_audio_path = "/tmp/mixed_data/output_audio.wav"
output_video_path = '/tmp/video_data/%(title)s.%(ext)s'
# Ensure directories exist
os.makedirs(output_folder, exist_ok=True)
os.makedirs(os.path.dirname(output_video_path), exist_ok=True)
def download_video(video_url, output_video_path, input_vid):
ydl_opts = {
'format': 'best',
'outtmpl': os.path.join(os.path.dirname(output_video_path), f"{input_vid}.%(ext)s"),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
metadata = {
"Author": info.get('uploader', ''),
"Title": info.get('title', ''),
"Views": info.get('view_count', 0)
}
return info, metadata
def video_to_images(video_path, output_folder):
clip = VideoFileClip(video_path)
clip.write_images_sequence(
os.path.join(output_folder, "frame%04d.png"), fps=0.2
)
def video_to_audio(video_path, output_audio_path):
clip = VideoFileClip(video_path)
clip.audio.write_audiofile(output_audio_path)
def audio_to_text(audio_path):
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_whisper(audio_data)
except sr.UnknownValueError:
st.error("Speech recognition could not understand the audio.")
text = ""
return text
@st.cache_data
def load_image_features(image_directory):
model = SentenceTransformer('clip-ViT-B-32')
image_features = {}
for filename in os.listdir(image_directory):
if filename.endswith((".jpg", ".png")):
image_path = os.path.join(image_directory, filename)
image = Image.open(image_path)
image_feature = model.encode(image).tolist()
image_features[filename] = image_feature
return image_features
def find_top_3_similar_images(query_text, image_features):
model = SentenceTransformer('clip-ViT-B-32')
query_feature = model.encode([query_text]).tolist()[0]
similarities = []
for filename, feature in image_features.items():
similarity = util.cos_sim(query_feature, feature).item()
similarities.append((filename, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
top_3_images = [x[0] for x in similarities[:3]]
return top_3_images
def read_and_chunk(file_path, chunk_size=100):
with open(file_path, 'r') as file:
text = file.read()
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def embed_content(content, model_name="all-MiniLM-L6-v2"):
model = SentenceTransformer(model_name)
embeddings = model.encode(content, convert_to_tensor=True)
return embeddings
def get_top_chunks(file_path, user_query, top_n=6):
chunks = read_and_chunk(file_path)
chunk_embeddings = embed_content(chunks)
query_embedding = embed_content([user_query])
chunk_embeddings = chunk_embeddings.cpu().numpy()
query_embedding = query_embedding.cpu().numpy()
similarities = cosine_similarity(query_embedding, chunk_embeddings).flatten()
top_indices = np.argsort(similarities)[-top_n:][::-1]
top_chunks = [chunks[i] for i in top_indices]
return top_chunks
# Streamlit UI
st.title("Video Analysis and Q&A")
video_url = st.text_input("Enter YouTube video URL")
if video_url:
if st.button("Process Video"):
with st.spinner("Processing video..."):
info, metadata = download_video(video_url, output_video_path, "INPUT_VIDEO")
st.json(metadata)
filepath = "/tmp/video_data/INPUT_VIDEO.mp4"
video_to_images(filepath, output_folder)
video_to_audio(filepath, output_audio_path)
text_data = audio_to_text(output_audio_path)
with open(os.path.join(output_folder, "output_text.txt"), "w") as file:
file.write(text_data)
st.success("Video processed successfully!")
# Load image features
image_features = load_image_features(output_folder)
# Query input
query = st.text_input("Enter your question about the video")
if query and st.button("Get Answer"):
with st.spinner("Analyzing..."):
top_chunks = get_top_chunks(os.path.join(output_folder, "output_text.txt"), query)
top_3_similar_images = find_top_3_similar_images(query, image_features)
img = [os.path.join(output_folder, filename) for filename in top_3_similar_images]
image_documents = SimpleDirectoryReader(input_files=img).load_data()
gemini_mm_llm = GeminiMultiModal(
model_name="models/gemini-1.5-flash",
api_key=GOOGLE_API_TOKEN,
temperature=0.7,
max_output_tokens=1500,
)
context_str = "\n".join(top_chunks)
qa_tmpl_str = """
Based on the provided information, including relevant images and retrieved context from the video,
accurately and precisely answer the query without any additional prior knowledge.
---------------------
Context: {context_str}
---------------------
Images: {image_list}
---------------------
Query: {query_str}
Answer:
"""
response = gemini_mm_llm.complete(
prompt=qa_tmpl_str.format(
query_str=query,
context_str=context_str,
image_list=", ".join(img)
),
image_documents=image_documents,
)
st.write("Answer:", response.text)
# Display relevant images
st.subheader("Relevant Images")
for image_path in img[:3]: # Display up to 3 images
st.image(image_path)