|
|
import streamlit as st |
|
|
import os |
|
|
import yt_dlp |
|
|
import subprocess |
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
import re |
|
|
import torch |
|
|
from PIL import Image |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
import numpy as np |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import google.generativeai as genai |
|
|
from llama_index.core import SimpleDirectoryReader |
|
|
from llama_index.multi_modal_llms.gemini import GeminiMultiModal |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_youtube_video_id(url): |
|
|
pattern = r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com\/(?:[^\/\n\s]+\/\S+\/|(?:v|e(?:mbed)?)\/|.*[?&]v=)|youtu\.be\/)([^&\n]{11})' |
|
|
match = re.match(pattern, url) |
|
|
return match.group(1) if match else None |
|
|
|
|
|
|
|
|
def video_to_images(video_url, output_folder): |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
ydl_opts = { |
|
|
'outtmpl': os.path.join(output_folder, 'video.%(ext)s'), |
|
|
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', |
|
|
'noplaylist': True, |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([video_url]) |
|
|
|
|
|
video_filepath = os.path.join(output_folder, 'video.mp4') |
|
|
if not os.path.exists(video_filepath): |
|
|
return "Error: Video file was not downloaded successfully." |
|
|
|
|
|
frame_output_pattern = os.path.join(output_folder, 'frame_%04d.png') |
|
|
ffmpeg_command = [ |
|
|
'ffmpeg', '-i', video_filepath, '-vf', 'fps=0.2', frame_output_pattern |
|
|
] |
|
|
subprocess.run(ffmpeg_command) |
|
|
return "Frames extracted successfully." |
|
|
|
|
|
|
|
|
def extract_youtube_transcript(video_url): |
|
|
video_id = get_youtube_video_id(video_url) |
|
|
if not video_id: |
|
|
return "Invalid YouTube video URL." |
|
|
try: |
|
|
transcript = YouTubeTranscriptApi.get_transcript(video_id) |
|
|
transcript_text = ' '.join([entry['text'] for entry in transcript]) |
|
|
return transcript_text |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
def find_top_3_similar_images(query_text, image_directory): |
|
|
model = SentenceTransformer('clip-ViT-B-32', 'clean_up_tokenization_spaces' == False) |
|
|
query_feature = model.encode([query_text]).tolist()[0] |
|
|
|
|
|
image_features = {} |
|
|
for filename in os.listdir(image_directory): |
|
|
if filename.endswith((".jpg", ".png")): |
|
|
image_path = os.path.join(image_directory, filename) |
|
|
image = Image.open(image_path) |
|
|
image_feature = model.encode(image).tolist() |
|
|
image_features[filename] = image_feature |
|
|
|
|
|
similarities = [] |
|
|
for filename, feature in image_features.items(): |
|
|
similarity = util.cos_sim(query_feature, feature).item() |
|
|
similarities.append((filename, similarity)) |
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
top_3_images = [x[0] for x in similarities[:3]] |
|
|
return top_3_images |
|
|
|
|
|
|
|
|
def get_top_chunks(text, user_query, top_n=6): |
|
|
def chunk_text(text, chunk_size=100): |
|
|
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] |
|
|
|
|
|
chunks = chunk_text(text) |
|
|
model = SentenceTransformer("all-MiniLM-L6-v2", 'clean_up_tokenization_spaces' == False) |
|
|
chunk_embeddings = model.encode(chunks) |
|
|
query_embedding = model.encode([user_query]) |
|
|
|
|
|
similarities = cosine_similarity(query_embedding, chunk_embeddings).flatten() |
|
|
top_indices = np.argsort(similarities)[-top_n:][::-1] |
|
|
top_chunks = [chunks[i] for i in top_indices] |
|
|
return top_chunks |
|
|
|
|
|
|
|
|
def get_llm_answer(query, context, images): |
|
|
GOOGLE_API_TOKEN = "YOUR_GOOGLE_API_TOKEN" |
|
|
genai.configure(api_key=GOOGLE_API_TOKEN) |
|
|
|
|
|
gemini_mm_llm = GeminiMultiModal( |
|
|
model_name="models/gemini-1.5-flash", |
|
|
api_key=GOOGLE_API_TOKEN, |
|
|
temperature=0.7, |
|
|
max_output_tokens=1500, |
|
|
) |
|
|
|
|
|
qa_tmpl_str = """ |
|
|
Based on the provided information, including relevant images and retrieved context from the video, |
|
|
accurately and precisely answer the query without any additional prior knowledge. |
|
|
|
|
|
--------------------- |
|
|
Context: {context_str} |
|
|
--------------------- |
|
|
Images: {image_list} |
|
|
--------------------- |
|
|
Query: {query_str} |
|
|
Answer: |
|
|
""" |
|
|
|
|
|
image_documents = SimpleDirectoryReader(input_files=images).load_data() |
|
|
|
|
|
response = gemini_mm_llm.complete( |
|
|
prompt=qa_tmpl_str.format( |
|
|
query_str=query, |
|
|
context_str=context, |
|
|
image_list=", ".join(images) |
|
|
), |
|
|
image_documents=image_documents, |
|
|
) |
|
|
|
|
|
return response.text |
|
|
|
|
|
|
|
|
st.title("YouTube Video Analysis") |
|
|
|
|
|
url = st.text_input("Enter YouTube URL") |
|
|
query = st.text_input("Enter your query") |
|
|
|
|
|
if url and query: |
|
|
if st.button("Extract Matched Images"): |
|
|
with st.spinner("Processing..."): |
|
|
output_folder = "video_data" |
|
|
result = video_to_images(url, output_folder) |
|
|
st.write(result) |
|
|
if "successfully" in result: |
|
|
top_images = find_top_3_similar_images(query, output_folder) |
|
|
st.write("Top 3 matched images:") |
|
|
for img in top_images: |
|
|
st.image(os.path.join(output_folder, img)) |
|
|
|
|
|
if st.button("Extract Matched Text Chunks"): |
|
|
with st.spinner("Processing..."): |
|
|
transcript = extract_youtube_transcript(url) |
|
|
if not transcript.startswith("Error"): |
|
|
top_chunks = get_top_chunks(transcript, query) |
|
|
st.write("Top matched text chunks:") |
|
|
for chunk in top_chunks: |
|
|
st.write(chunk) |
|
|
st.write("---") |
|
|
else: |
|
|
st.error(transcript) |
|
|
|
|
|
if st.button("Get Precise Answer"): |
|
|
with st.spinner("Processing..."): |
|
|
transcript = extract_youtube_transcript(url) |
|
|
if not transcript.startswith("Error"): |
|
|
top_chunks = get_top_chunks(transcript, query) |
|
|
output_folder = "video_data" |
|
|
top_images = find_top_3_similar_images(query, output_folder) |
|
|
image_paths = [os.path.join(output_folder, img) for img in top_images] |
|
|
|
|
|
answer = get_llm_answer(query, "\n".join(top_chunks), image_paths) |
|
|
st.write("LLM Answer:") |
|
|
st.write(answer) |
|
|
else: |
|
|
st.error(transcript) |
|
|
return sections[best_match] |