|
|
import streamlit as st |
|
|
import os |
|
|
import yt_dlp |
|
|
from moviepy.editor import VideoFileClip |
|
|
import speech_recognition as sr |
|
|
import torch |
|
|
from PIL import Image |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
from collections import defaultdict |
|
|
import numpy as np |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import google.generativeai as genai |
|
|
from llama_index.core import SimpleDirectoryReader |
|
|
from llama_index.multi_modal_llms.gemini import GeminiMultiModal |
|
|
|
|
|
|
|
|
GOOGLE_API_TOKEN = st.secrets["GOOGLE_API_TOKEN"] |
|
|
genai.configure(api_key=GOOGLE_API_TOKEN) |
|
|
|
|
|
|
|
|
output_folder = "/tmp/mixed_data/" |
|
|
output_audio_path = "/tmp/mixed_data/output_audio.wav" |
|
|
output_video_path = '/tmp/video_data/%(title)s.%(ext)s' |
|
|
|
|
|
|
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
os.makedirs(os.path.dirname(output_video_path), exist_ok=True) |
|
|
|
|
|
def download_video(video_url, output_video_path, input_vid): |
|
|
ydl_opts = { |
|
|
'format': 'best', |
|
|
'outtmpl': os.path.join(os.path.dirname(output_video_path), f"{input_vid}.%(ext)s"), |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(video_url, download=True) |
|
|
metadata = { |
|
|
"Author": info.get('uploader', ''), |
|
|
"Title": info.get('title', ''), |
|
|
"Views": info.get('view_count', 0) |
|
|
} |
|
|
return info, metadata |
|
|
|
|
|
def video_to_images(video_path, output_folder): |
|
|
clip = VideoFileClip(video_path) |
|
|
clip.write_images_sequence( |
|
|
os.path.join(output_folder, "frame%04d.png"), fps=0.2 |
|
|
) |
|
|
|
|
|
def video_to_audio(video_path, output_audio_path): |
|
|
clip = VideoFileClip(video_path) |
|
|
clip.audio.write_audiofile(output_audio_path) |
|
|
|
|
|
def audio_to_text(audio_path): |
|
|
recognizer = sr.Recognizer() |
|
|
with sr.AudioFile(audio_path) as source: |
|
|
audio_data = recognizer.record(source) |
|
|
try: |
|
|
text = recognizer.recognize_whisper(audio_data) |
|
|
except sr.UnknownValueError: |
|
|
st.error("Speech recognition could not understand the audio.") |
|
|
text = "" |
|
|
return text |
|
|
|
|
|
@st.cache_data |
|
|
def load_image_features(image_directory): |
|
|
model = SentenceTransformer('clip-ViT-B-32') |
|
|
image_features = {} |
|
|
for filename in os.listdir(image_directory): |
|
|
if filename.endswith((".jpg", ".png")): |
|
|
image_path = os.path.join(image_directory, filename) |
|
|
image = Image.open(image_path) |
|
|
image_feature = model.encode(image).tolist() |
|
|
image_features[filename] = image_feature |
|
|
return image_features |
|
|
|
|
|
def find_top_3_similar_images(query_text, image_features): |
|
|
model = SentenceTransformer('clip-ViT-B-32') |
|
|
query_feature = model.encode([query_text]).tolist()[0] |
|
|
similarities = [] |
|
|
for filename, feature in image_features.items(): |
|
|
similarity = util.cos_sim(query_feature, feature).item() |
|
|
similarities.append((filename, similarity)) |
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
top_3_images = [x[0] for x in similarities[:3]] |
|
|
return top_3_images |
|
|
|
|
|
def read_and_chunk(file_path, chunk_size=100): |
|
|
with open(file_path, 'r') as file: |
|
|
text = file.read() |
|
|
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] |
|
|
|
|
|
def embed_content(content, model_name="all-MiniLM-L6-v2"): |
|
|
model = SentenceTransformer(model_name) |
|
|
embeddings = model.encode(content, convert_to_tensor=True) |
|
|
return embeddings |
|
|
|
|
|
def get_top_chunks(file_path, user_query, top_n=6): |
|
|
chunks = read_and_chunk(file_path) |
|
|
chunk_embeddings = embed_content(chunks) |
|
|
query_embedding = embed_content([user_query]) |
|
|
chunk_embeddings = chunk_embeddings.cpu().numpy() |
|
|
query_embedding = query_embedding.cpu().numpy() |
|
|
similarities = cosine_similarity(query_embedding, chunk_embeddings).flatten() |
|
|
top_indices = np.argsort(similarities)[-top_n:][::-1] |
|
|
top_chunks = [chunks[i] for i in top_indices] |
|
|
return top_chunks |
|
|
|
|
|
|
|
|
st.title("Video Analysis and Q&A") |
|
|
|
|
|
video_url = st.text_input("Enter YouTube video URL") |
|
|
|
|
|
if video_url: |
|
|
if st.button("Process Video"): |
|
|
with st.spinner("Processing video..."): |
|
|
info, metadata = download_video(video_url, output_video_path, "INPUT_VIDEO") |
|
|
st.json(metadata) |
|
|
|
|
|
filepath = "/tmp/video_data/INPUT_VIDEO.mp4" |
|
|
video_to_images(filepath, output_folder) |
|
|
video_to_audio(filepath, output_audio_path) |
|
|
|
|
|
text_data = audio_to_text(output_audio_path) |
|
|
with open(os.path.join(output_folder, "output_text.txt"), "w") as file: |
|
|
file.write(text_data) |
|
|
|
|
|
st.success("Video processed successfully!") |
|
|
|
|
|
|
|
|
image_features = load_image_features(output_folder) |
|
|
|
|
|
|
|
|
query = st.text_input("Enter your question about the video") |
|
|
|
|
|
if query and st.button("Get Answer"): |
|
|
with st.spinner("Analyzing..."): |
|
|
top_chunks = get_top_chunks(os.path.join(output_folder, "output_text.txt"), query) |
|
|
top_3_similar_images = find_top_3_similar_images(query, image_features) |
|
|
|
|
|
img = [os.path.join(output_folder, filename) for filename in top_3_similar_images] |
|
|
image_documents = SimpleDirectoryReader(input_files=img).load_data() |
|
|
|
|
|
gemini_mm_llm = GeminiMultiModal( |
|
|
model_name="models/gemini-1.5-flash", |
|
|
api_key=GOOGLE_API_TOKEN, |
|
|
temperature=0.7, |
|
|
max_output_tokens=1500, |
|
|
) |
|
|
|
|
|
context_str = "\n".join(top_chunks) |
|
|
|
|
|
qa_tmpl_str = """ |
|
|
Based on the provided information, including relevant images and retrieved context from the video, |
|
|
accurately and precisely answer the query without any additional prior knowledge. |
|
|
|
|
|
--------------------- |
|
|
Context: {context_str} |
|
|
--------------------- |
|
|
Images: {image_list} |
|
|
--------------------- |
|
|
Query: {query_str} |
|
|
Answer: |
|
|
""" |
|
|
|
|
|
response = gemini_mm_llm.complete( |
|
|
prompt=qa_tmpl_str.format( |
|
|
query_str=query, |
|
|
context_str=context_str, |
|
|
image_list=", ".join(img) |
|
|
), |
|
|
image_documents=image_documents, |
|
|
) |
|
|
|
|
|
st.write("Answer:", response.text) |
|
|
|
|
|
|
|
|
st.subheader("Relevant Images") |
|
|
for image_path in img[:3]: |
|
|
st.image(image_path) |