Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """pod_to_sum_v3.ipynb | |
| Automatically generated by Colaboratory. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1rbZ98r1Z_IM0Z3VDuNQObxpuZf5KUgmL | |
| ### Initialization | |
| """ | |
| import os | |
| save_dir= os.path.join('./','docs') | |
| if not os.path.exists(save_dir): | |
| os.mkdir(save_dir) | |
| transcription_model = "openai/whisper-base" | |
| llm_model = "gmurro/bart-large-finetuned-filtered-spotify-podcast-summ" | |
| import pandas as pd | |
| import numpy as np | |
| import pytube | |
| from pytube import YouTube | |
| import transformers | |
| from transformers import pipeline | |
| import torch | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| """### Define how to get transcript of the YT video""" | |
| def get_transcript(url): | |
| yt_video = YouTube(str(url)) | |
| yt_audio = yt_video.streams.filter(only_audio=True, file_extension='mp4').first() # get 1st available audio stream | |
| out_file = yt_audio.download(filename="audio.mp4", output_path = save_dir) | |
| asr = pipeline("automatic-speech-recognition", model=transcription_model, device=device) | |
| import librosa | |
| speech_array, sampling_rate = librosa.load(out_file, sr=16000) # getting audio file array | |
| audio_text = asr( | |
| speech_array, | |
| max_new_tokens=256, | |
| generate_kwargs={"task": "transcribe"}, | |
| chunk_length_s=30, | |
| batch_size=8) # calling whisper model | |
| del(asr) | |
| torch.cuda.empty_cache() #deleting cache | |
| return audio_text['text'] | |
| """### Define functions to generate summary""" | |
| def clean_sent(sent_list): | |
| new_sent_list = [sent_list[0]] | |
| for i in range(len(sent_list)): | |
| if sent_list[i] != new_sent_list[-1]: new_sent_list.append(sent_list[i]) | |
| return new_sent_list | |
| import nltk | |
| nltk.download('punkt') | |
| def get_chunks (audio_text, sent_overlap, max_token, tokenizer): | |
| # pre-processing text | |
| sentences = nltk.tokenize.sent_tokenize(audio_text) | |
| sentences = clean_sent(sentences) | |
| first_sentence = 0 | |
| last_sentence = 0 | |
| chunks=[] | |
| while last_sentence <= len(sentences) - 1: | |
| last_sentence = first_sentence | |
| chunk_parts = [] | |
| chunk_size = 0 | |
| for sentence in sentences[first_sentence:]: | |
| sentence_sz = len(tokenizer.tokenize(sentence)) | |
| if chunk_size + sentence_sz > max_token: | |
| break | |
| chunk_parts.append(sentence) | |
| chunk_size += sentence_sz | |
| last_sentence += 1 | |
| chunks.append(" ".join(chunk_parts)) | |
| first_sentence = last_sentence - sent_overlap | |
| return chunks | |
| """### Define how to get summary of the transcript""" | |
| def get_summary(audio_text): | |
| import re | |
| audio_text = re.sub(r'\b(\w+) \1\b', r'\1', audio_text, flags=re.IGNORECASE) # cleaning text | |
| from transformers import AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(llm_model) # set tockenizer | |
| from transformers import pipeline | |
| summarizer = pipeline("summarization", model=llm_model) # set summarizer | |
| model_max_tokens = tokenizer.model_max_length # get max tockens model can process | |
| text_tokens = len(tokenizer.tokenize(audio_text)) # get number of tockens in audio text | |
| def get_map_summary(chunk_text, summarizer): | |
| max_token = model_max_tokens - 2 #protect for "" before and after the text | |
| sent_overlap = 3 #overlapping sentences between 2 chunks | |
| sent_chunks = get_chunks(audio_text = chunk_text,sent_overlap = sent_overlap,max_token = max_token, tokenizer = tokenizer) # get chunks | |
| chunk_summary_list = summarizer(sent_chunks,min_length=50, max_length=200, batch_size=8) # get summary per chunk | |
| grouped_summary = "" | |
| for c in chunk_summary_list: grouped_summary += c['summary_text'] + " " | |
| return grouped_summary | |
| # check text requires map-reduce stategy | |
| map_text = audio_text | |
| long_summary = "" | |
| while text_tokens > model_max_tokens: | |
| map_summary = get_map_summary(chunk_text=map_text, summarizer=summarizer) | |
| text_tokens = len(tokenizer.tokenize(map_summary)) | |
| long_summary = map_summary | |
| map_text = map_summary | |
| # else deploy reduce method | |
| else: | |
| max_token = round(text_tokens*0.3) # 1/3rd reduction | |
| final_summary = summarizer(map_text,min_length=35, max_length=max_token) | |
| final_summary = final_summary[0]["summary_text"] | |
| if long_summary == "": long_summary = "The video is too short to produce a descriptive summary" | |
| del(tokenizer, summarizer) | |
| torch.cuda.empty_cache() #deleting cache | |
| return final_summary, long_summary | |
| """### Defining Gradio App""" | |
| import gradio as gr | |
| import pytube | |
| from pytube import YouTube | |
| def get_youtube_title(url): | |
| yt = YouTube(str(url)) | |
| return yt.title | |
| def get_video(url): | |
| vid_id = pytube.extract.video_id(url) | |
| embed_html = '<iframe width="100%" height="315" src="https://www.youtube.com/embed/{}" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>'.format(vid_id) | |
| return embed_html | |
| def summarize_youtube_video(url): | |
| print("URL:",url) | |
| text = get_transcript(url) | |
| print("Transcript:",text[:500]) | |
| short_summary, long_summary = get_summary(text) | |
| print("Short Summary:",short_summary) | |
| print("Long Summary:",long_summary) | |
| return text, short_summary, long_summary | |
| html = '<iframe width="100%" height="315" src="https://www.youtube.com/embed/" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>' | |
| # Defining the structure of the UI | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| gr.Markdown("# Summarize a Long YouTube Video") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| url = gr.Textbox(label="Enter YouTube video link here:",placeholder="Place for youtube link..") | |
| with gr.Column(scale=1): | |
| sum_btn = gr.Button("Summarize!") | |
| gr.Markdown("# Results") | |
| title = gr.Textbox(label="Video Title",placeholder="title...") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| video = gr.HTML(html,scale=1) | |
| with gr.Column(): | |
| with gr.Row(): | |
| short_summary = gr.Textbox(label="Gist",placeholder="short summary...",scale=1) | |
| with gr.Row(): | |
| long_summary = gr.Textbox(label="Summary",placeholder="long summary...",scale=2) | |
| with gr.Row(): | |
| with gr.Group(): | |
| text = gr.Textbox(label="Full Transcript",placeholder="transcript...",show_label=True) | |
| with gr.Accordion("Credits and Notes",open=False): | |
| gr.Markdown(""" | |
| 1. Transcipt is generated by openai/whisper-base model by downloading YouTube video.\n | |
| 2. Summary is generated by gmurro/bart-large-finetuned-filtered-spotify-podcast-summ.\n | |
| 3. The app is possible because of Hugging Face Transformers.\n | |
| """) | |
| # Defining the functions to call on clicking the button | |
| sum_btn.click(fn=get_youtube_title, inputs=url, outputs=title, api_name="get_youtube_title", queue=False) | |
| sum_btn.click(fn=summarize_youtube_video, inputs=url, outputs=[text, short_summary, long_summary], api_name="summarize_youtube_video", queue=True) | |
| sum_btn.click(fn=get_video, inputs=url, outputs=video, api_name="get_youtube_video", queue=False) | |
| demo.queue() | |
| demo.launch(share=False) |