Spaces:
Runtime error
Runtime error
| import os | |
| # we need to compile a OPENBLAS version for cpu | |
| # Or get it from https://jllllll.github.io/llama-cpp-python-cuBLAS-wheels/ | |
| os.system('CMAKE_ARGS="-DLLAMA_BLAS=ON -DLLAMA_BLAS_VENDOR=OpenBLAS" pip install llama-cpp-python==0.2.11') | |
| import re, requests, json | |
| import gradio as gr | |
| import random | |
| import torch | |
| from itertools import chain | |
| import asyncio | |
| from llama_cpp import Llama | |
| import datetime | |
| from transformers import ( | |
| StoppingCriteriaList, | |
| MaxLengthCriteria, | |
| ) | |
| # Created by | |
| # https://huggingface.co/gorkemgoknar | |
| #Coqui V1 api render voice, you can also use XTTS | |
| #COQUI_URL="https://app.coqui.ai/api/v2/samples" | |
| COQUI_URL="https://app.coqui.ai/api/v2/samples/xtts" | |
| COQUI_URL_EN="https://app.coqui.ai/api/v2/samples/xtts/render/" | |
| ### Warning each sample will consume your credits | |
| COQUI_TOKEN=os.environ.get("COQUI_TOKEN") | |
| PER_RUN_MAX_VOICE=int( os.environ.get("PER_RUN_MAX_VOICE") ) | |
| PER_RUN_COUNTER=0 | |
| RUN_START_HOUR=datetime.datetime.now().hour | |
| MAX_NEW_TOKENS = 30 | |
| GPU_LAYERS = 0 | |
| STOP_LIST=["###","##"] | |
| LLAMA_VERBOSE=False | |
| TITLE = "<html> <head> <style> h1 {text-align: center;} </style> </head> <body> <h1> Movie Chatbot - Auto-Chatbot Powered by Coqui.ai XTTS 🐸</b> </body> </html>" | |
| INTRODUCTION_TEXT = "Choose your characters, enter initial text see and hear (🐸) them talk. \ | |
| For voice there is per user and hourly limit, copy space and use your Coqui.ai token and voice_ids for your own usage.\ | |
| Additional hint, try French, Italian, German, Spanish initial texts." | |
| #stopping_criteria = StoppingCriteriaList([MaxLengthCriteria(max_length=64)]) | |
| from huggingface_hub import hf_hub_download | |
| hf_hub_download(repo_id="gorkemgoknar/llama2-7f-moviechatbot-ggml-q4", local_dir=".", filename="llama2-7f-fp16-gguf-q4.bin") | |
| model_path="./llama2-7f-fp16-gguf-q4.bin" | |
| import langid | |
| llm = Llama(model_path=model_path,n_gpu_layers=0, n_ctx=256,n_batch=256,verbose=LLAMA_VERBOSE) | |
| # to use with ctransfomers | |
| #llm = AutoModelForCausalLM.from_pretrained("gorkemgoknar/llama2-7f-moviechatbot-ggml-q4", | |
| # model_type='llama', | |
| # gpu_layers=GPU_LAYERS, | |
| # max_new_tokens=MAX_NEW_TOKENS, | |
| # stop=STOP_LIST) | |
| ########################################## | |
| #You can use coqui.ai api to generate audio | |
| #first you need to create clone voice for characters | |
| voices = {} | |
| voices["Gerald"]=os.environ.get("VOICE_ID_GERALD") | |
| voices["Vader"]=os.environ.get("VOICE_ID_VADER") | |
| voices["Batman"]=os.environ.get("VOICE_ID_BATMAN") | |
| voices["Gandalf"]=os.environ.get("VOICE_ID_GANDALF") | |
| voices["Morpheus"]=os.environ.get("VOICE_ID_MORPHEUS") | |
| voices["Neo"]=os.environ.get("VOICE_ID_NEO") | |
| voices["Ig-11"]=os.environ.get("VOICE_ID_IG11") | |
| voices["Tony Stark"]=os.environ.get("VOICE_ID_TONY") | |
| voices["Kirk"]=os.environ.get("VOICE_ID_KIRK") | |
| voices["Spock"]=os.environ.get("VOICE_ID_SPOCK") | |
| voices["Don"]=os.environ.get("VOICE_ID_DON") | |
| voices["Morgan"]=os.environ.get("VOICE_ID_MORGAN") | |
| voices["Yoda"]=os.environ.get("VOICE_ID_YODA") | |
| voices["Ian"]=os.environ.get("VOICE_ID_IAN") | |
| voices["Thanos"]=os.environ.get("VOICE_ID_THANOS") | |
| def get_audio_url(text,character): | |
| url = COQUI_URL | |
| text_language=langid.classify(text)[0] | |
| supported_languages=["en","de","fr","es","it","pt","pl"] | |
| if text_language not in supported_languages: | |
| text_language="en" | |
| if text_language=="en": | |
| # use main English model for english, better on english only | |
| url = COQUI_URL_EN | |
| # voice id of "Baldur Sanjin" from buildin coqui.ai speakers | |
| # more via https://docs.coqui.ai/reference/speakers_retrieve | |
| payload = { | |
| "voice_id": voices[character], ## Voice id in form of (this is dummy) "a399c204-7040-4f1d-bb92-5223fa1aeceb" | |
| "text": f"{text}", | |
| "emotion": "Neutral", ## You can set Angry, Surprise etc on V1 api.. XTTS auto understands it | |
| "speed": 1, | |
| "language": text_language | |
| } | |
| headers = { | |
| "accept": "application/json", | |
| "content-type": "application/json", | |
| "authorization": f"Bearer {COQUI_TOKEN}" | |
| } | |
| response = requests.post(url, json=payload, headers=headers) | |
| res = json.loads(response.text) | |
| print("Character:",character, "text:",text,) | |
| print("Audio response",res) | |
| return res["audio_url"] | |
| def get_response_cpp(prompt): | |
| output = llm(prompt, max_tokens=32, stop=["#","sierpeda"], echo=True) | |
| #print(output) | |
| response_text= output["choices"][0]["text"] | |
| return response_text | |
| def build_question(character,question,context=None, answer=None,history=None , use_history=False, modify_history=True,human_character=None,add_answer_to_history=True): | |
| # THIS MODEL (gorkemgoknar/llama2-7f-moviechatbot-ggml-q4) is specifically fined tuned by | |
| # ### Context: {context}### History: {history}### {human_character}: {question}### {character}: {answer} | |
| # Where History contains all previous lines talked by characters in order | |
| # Context is actually arbitrary it shows something characters can start talking upon | |
| if context is None: | |
| context= "movie" | |
| #if human_character is None: | |
| # human_character="" | |
| #else: | |
| # human_character="#"+"I am " + human_character +"#" | |
| if use_history: | |
| if history is None: | |
| if answer is None: | |
| history="" | |
| else: | |
| history=answer | |
| else: | |
| if modify_history: | |
| if answer is None: | |
| history=history | |
| else: | |
| if add_answer_to_history: | |
| history=history +"#" + answer | |
| else: | |
| history=history | |
| else: | |
| history=history | |
| if human_character is None: | |
| prompt = f"### Context: {context}### History: {history}### Human: {question}### {character}:" | |
| else: | |
| prompt = f"### Context: {context}### History: {history}### {human_character}: {question}### {character}:" | |
| else: | |
| if human_character is None: | |
| prompt = f"### Context: {context}### Human: {question}### {character}:" | |
| else: | |
| prompt = f"### Context: {context}### {human_character}: {question}### {character}:" | |
| return prompt,history | |
| def get_answer_from_response(text,character): | |
| # on HF it has same text plus additional | |
| # on llama_cpp same full text | |
| response= text.split(f"### {character}:")[1] | |
| # on cpp it continues | |
| # response= text | |
| # get only first line of response | |
| response= response.split("###")[0] | |
| response= response.split("#")[0] | |
| # Weirdly llama2 7f creates some German or Polski on the end... need to crop them | |
| response= response.split("Unterscheidung")[0] # weird, german seperators on output | |
| response= response.split("Hinweis")[0] # weird, german seperators on output | |
| response= response.split("sierp ")[0] # weird, sierp | |
| response= response.split("sierpni ")[0] # weird, sierp | |
| response= response.split("sierpien")[0] # weird, sierp | |
| response= response.split("kwiet")[0] # weird, kwiet | |
| response= response.split("\n")[0] # cut at end of line | |
| response= re.split("sierp.+\d+", response)[0] # comes as sierpina 2018 something something | |
| response= re.split("styczen.+\d+", response)[0] # comes as styczen 2018 something something | |
| response= re.split("kwierk.+\d+", response)[0] # comes as kwierk 2018 something something | |
| response= response.split(":")[0] | |
| if response.startswith('"'): | |
| response= response[1:] | |
| if response=="" or response=="...": | |
| response="Hmm." | |
| return response | |
| def run_chatter(num_repeat=2, character="kirk",human_character="Mr. Sulu",context="Captain Kirk from U.S.S. Enterprise", | |
| initial_question="There is a ship approaching captain!", | |
| withaudio=False, | |
| history=None, | |
| add_answer_to_history=True, | |
| answer=None, | |
| debug_print=False, | |
| use_cpu=False): | |
| question=initial_question | |
| dialogue="" | |
| if debug_print: | |
| print("**** START Dialogue ****") | |
| print("Input History:",history) | |
| audio_urls=[] | |
| for i in range(num_repeat): | |
| if question is not None: | |
| question=question.strip() | |
| if answer is not None: | |
| answer=answer.strip() | |
| prompt,history= build_question(character,question,context=context,history=history,answer=answer,human_character=human_character,use_history=True,add_answer_to_history=add_answer_to_history) | |
| print("PROMPT:",prompt) | |
| response= get_response_cpp(prompt) | |
| print("RESPONSE:",response) | |
| answer = get_answer_from_response(response,character).strip() | |
| if withaudio: | |
| answer_audio_url = get_audio_url(answer) | |
| audio_urls.append(answer_audio_url) | |
| if debug_print: | |
| print("\nAct:",i+1) | |
| dialogue = dialogue + f"{human_character}: {question}" + "\n" | |
| if debug_print: | |
| print(f"{human_character}:",question) | |
| print(f"{character}:",answer) | |
| dialogue = dialogue + f"{character}: {answer}" + "\n" | |
| if question is not None: | |
| question=question.strip() | |
| if answer is not None: | |
| answer=answer.strip() | |
| prompt,history= build_question(human_character,answer,context=context,history=history,answer=question,human_character=character,use_history=True,add_answer_to_history=add_answer_to_history) | |
| print("PROMPT:",prompt) | |
| response= get_response_cpp(prompt) | |
| print("RESPONSE:",response) | |
| resp_answer = get_answer_from_response(response,human_character) | |
| if withaudio: | |
| # No use.. running on main | |
| response_audio_url = get_audio_url(resp_answer) | |
| audio_urls.append(response_audio_url) | |
| if debug_print: | |
| print(f"{human_character}:",resp_answer) | |
| question = resp_answer | |
| if debug_print: | |
| print("Final History:",history) | |
| print("**** END Dialogue ****") | |
| if withaudio: | |
| return dialogue,question,answer,history,audio_urls | |
| else: | |
| return dialogue,question,answer,history | |
| ###################### | |
| # GRADIO PART | |
| ###################### | |
| # to close on Jupyter remote | |
| #if("interface" in vars()): | |
| # print("Closing existing interface") | |
| # interface.close() | |
| css=""" | |
| .chatbox {display:flex;flex-direction:column} | |
| .user_msg, .resp_msg {padding:4px;margin-bottom:4px;border-radius:4px;width:80%} | |
| .user_msg {background-color:cornflowerblue;color:white;align-self:start} | |
| .resp_msg {background-color:lightgray;align-self:self-end} | |
| .audio {background-color:cornflowerblue;color:white;align-self:start;height:5em} | |
| """ | |
| def get_per_run_voice_counter(increase=False): | |
| hour_now = datetime.datetime.now().hour | |
| global PER_RUN_COUNTER | |
| global RUN_START_HOUR | |
| print("Per run check: Hour now:", hour_now, " RUN_START_HOUR:",RUN_START_HOUR," PER_RUN_COUNTER",PER_RUN_COUNTER) | |
| if hour_now>RUN_START_HOUR: | |
| #reset hourly voice calls | |
| print("resetting per run voice calls") | |
| PER_RUN_COUNTER = 0 | |
| RUN_START_HOUR = hour_now | |
| elif increase: | |
| PER_RUN_COUNTER = PER_RUN_COUNTER + 1 | |
| print("per run voice calls:", PER_RUN_COUNTER) | |
| print("Per run check: Hour now:", hour_now, " RUN_START_HOUR:",RUN_START_HOUR," PER_RUN_COUNTER",PER_RUN_COUNTER) | |
| return PER_RUN_COUNTER | |
| async def add_text(WITH_AUDIO,char1,char2,runs,context,initial_question,history,VOICE_COUNTER): | |
| print(f"{char1} talks to {char2}") | |
| history = None | |
| last_question=None | |
| # todo build a context from dropdown | |
| returned_history = "" | |
| unnamed_question="This weird guy did not input anything.. so, tell me a joke!" | |
| if initial_question is None: | |
| initial_question = unnamed_question | |
| if initial_question=="": | |
| initial_question = unnamed_question | |
| for i in range(int(runs)): | |
| print("char1:",char1," :", initial_question) | |
| returned_history += char1 + " : " + initial_question + "\n" | |
| dialogue,last_question,last_answer,history = run_chatter(num_repeat=1, | |
| character=char2, | |
| human_character=char1, | |
| context=context, | |
| initial_question=initial_question, | |
| withaudio=False, | |
| history=history, | |
| answer=last_question, | |
| debug_print=False, | |
| add_answer_to_history=False | |
| ) | |
| print("char2:",char2," :", last_answer) | |
| returned_history += char2 + " : " + last_answer + "\n" | |
| # add last answer to history | |
| history = history + "#" +initial_question + "#"+ last_answer | |
| print("WITH_AUDIO",WITH_AUDIO) | |
| if int(WITH_AUDIO): | |
| use_voice=True | |
| else: | |
| use_voice=False | |
| print("Voice Counter:",VOICE_COUNTER) | |
| if initial_question=="..." and last_answer=="...": | |
| use_voice=False | |
| global PER_RUN_MAX_VOICE | |
| if use_voice: | |
| global PER_RUN_MAX_VOICE | |
| can_use_voice=get_per_run_voice_counter()<PER_RUN_MAX_VOICE | |
| if not can_use_voice: | |
| print("Voice limit reached for this hour, try again in an hour") | |
| gr.Warning("Hourly overal voice limit reached, try again in an hour... running without voice.") | |
| use_voice=False | |
| if use_voice and (VOICE_COUNTER>VOICE_LIMIT): | |
| print("You have reached voiced limit, try with voice later.. running without voice") | |
| gr.Warning("You have reached voiced limit.. running without voice") | |
| use_voice=False | |
| try: | |
| if use_voice: | |
| char1_audio_url= get_audio_url(initial_question,char1) | |
| VOICE_COUNTER+=1 | |
| get_per_run_voice_counter(increase=True) | |
| char2_audio_url= get_audio_url(last_answer,char2) | |
| VOICE_COUNTER+=1 | |
| get_per_run_voice_counter(increase=True) | |
| except: | |
| gr.Warning("Something wrong with getting audio.. ") | |
| use_voice=False | |
| print("Voice Counter:",VOICE_COUNTER) | |
| if use_voice: | |
| audios = ( | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() , | |
| gr.Audio.update() | |
| ) | |
| else: | |
| audios = ( | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) , | |
| gr.Audio.update(visible=False) | |
| ) | |
| audios = list(audios) | |
| #should now do a loop | |
| if use_voice: | |
| audios[i*2] = gr.Audio.update(char1_audio_url, visible=True,label=str(i*2 )+"_"+char1) | |
| audios[i*2 + 1] = gr.Audio.update(char2_audio_url, visible=True,label=str(i*2 + 1)+"_"+char2) | |
| audios = tuple(audios) | |
| #This needs to be last before yield | |
| initial_question=last_question | |
| yield gr.update(value=initial_question, interactive=True),returned_history, *audios, VOICE_COUNTER | |
| history=None | |
| #some selected ones are in for demo use (there are more, get a copy and try it , just do not expect much with this fast finetuned model) | |
| CHARACTER_1_CHOICES = ["Gandalf","Gerald", "Morpheus", "Neo","Kirk","Spock","Vader","Yoda","Ig-11","Tony Stark","Batman","Thanos"] | |
| CHARACTER_2_CHOICES = ["Gandalf","Gerald", "Morpheus", "Neo","Kirk","Spock","Vader","Yoda","Ig-11","Tony Stark","Batman","Thanos"] | |
| CONTEXT_CHOICES = ["talks friendly", | |
| "insults", | |
| "diss in rap", | |
| "on a cruise ship going to Mars from Earth", | |
| "blames on something", | |
| "tries to save the world", | |
| "talks agressively", | |
| "argues over if a movie is good", | |
| "sword insult fighting", | |
| "inside a dark cavern"] | |
| EXAMPLE_INITIALS=["I challenge you to battle of words!", | |
| "how much would a woodchuck chuck if a woodchuck could chuck wood?", | |
| "The world is changing.", | |
| "What do you think about AI?", | |
| "I went to the supermarket yesterday.", | |
| "Who are you?", | |
| "I am richer than you!", | |
| "Wie geht es dir?", | |
| "O que você fez ontem?", | |
| "Il fait trop chaud aujourd'hui."] | |
| VOICE_CHOICES=["With Coqui.ai Voice", | |
| "No voice"] | |
| RUN_COUNT = [2,3,4] | |
| title = "Metayazar - Movie Chatbot Llama Finetuned Voice powered by Coqui.ai" | |
| description = "Auto-chat your favorite movie characters. Voice via Coqui.ai" | |
| article = "<p style='text-align: center'><a href='https://www.linkedin.com/pulse/ai-goes-job-interview-g%C3%B6rkem-g%C3%B6knar/' target='_blank'>AI Goes to Job Interview</a> | <a href='https://www.metayazar.com/' target='_blank'>Metayazar AI Writer</a> |<a href='https://www.linkedin.com/in/goknar/' target='_blank'>Görkem Göknar</a></p>" | |
| def change_run_count(run_count): | |
| print("update run count:",run_count) | |
| visible_audios=[False,False,False,False,False,False,False,False] | |
| run_count=int(run_count) | |
| for i in range(run_count*2-1): | |
| if i>=len(visible_audios): | |
| break | |
| visible_audios[i] = False # Set true to become visible upon change | |
| return_list=[] | |
| #Max audio 8 | |
| for i in range(8): | |
| return_list.append( gr.Audio.update( visible=visible_audios[i]) ) | |
| return return_list | |
| def switch_voice(with_voice, WITH_AUDIO,VOICE_COUNTER): | |
| print("update use voice:",with_voice) | |
| if (VOICE_COUNTER>VOICE_LIMIT) or (PER_RUN_COUNTER>PER_RUN_MAX_VOICE): | |
| gr.Warning("Unfortunately voice limit is reached, try again after another time, or use without voice") | |
| WITH_AUDIO=0 | |
| else: | |
| if with_voice==VOICE_CHOICES[0]: | |
| WITH_AUDIO=1 | |
| else: | |
| WITH_AUDIO=0 | |
| return with_voice, WITH_AUDIO | |
| with gr.Blocks(css=css) as interface: | |
| VOICE_COUNTER=gr.State(value=0) | |
| WITH_AUDIO=gr.State(value=1) | |
| VOICE_LIMIT=int( os.environ.get("VOICE_LIMIT") ) | |
| with gr.Row(): | |
| gr.HTML(TITLE, elem_id="banner") | |
| gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
| with gr.Row(): | |
| drop_char1 = gr.components.Dropdown(CHARACTER_1_CHOICES,label="Character 1",value=CHARACTER_1_CHOICES[0]) | |
| drop_char2 = gr.components.Dropdown(CHARACTER_2_CHOICES,label="Character 2",value=CHARACTER_2_CHOICES[1]) | |
| run_count = gr.components.Dropdown(RUN_COUNT,label="Line count per character",value="2") | |
| with gr.Row(): | |
| context_choice = gr.components.Dropdown(CONTEXT_CHOICES, label="Context",value=CONTEXT_CHOICES[0]) | |
| with gr.Row(): | |
| with_voice = gr.components.Dropdown(VOICE_CHOICES,label="Voice via Coqui.ai (demo)",value=VOICE_CHOICES[0]) | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Enter text and press enter, or upload an image", | |
| value=EXAMPLE_INITIALS[0],elem_classes="user_msg" | |
| ) | |
| submit_btn = gr.Button(value="Submit") | |
| examples = gr.Examples(examples=EXAMPLE_INITIALS, | |
| inputs=[txt]) | |
| with gr.Row(): | |
| with gr.Column(): | |
| history = gr.Textbox(lines=25, | |
| show_label=True, | |
| label="History", | |
| placeholder="History", | |
| ).style(height=50) | |
| with gr.Column(): | |
| audio1 = gr.Audio(elem_id="audio1",elem_classes="audio",autoplay=False,visible=False) | |
| audio2 = gr.Audio(elem_id="audio2",elem_classes="audio",autoplay=False,visible=False) | |
| audio3 = gr.Audio(elem_id="audio3",elem_classes="audio",autoplay=False,visible=False) | |
| audio4 = gr.Audio(elem_id="audio4",elem_classes="audio",autoplay=False,visible=False) | |
| audio5 = gr.Audio(elem_id="audio5",elem_classes="audio",autoplay=False,visible=False) | |
| audio6 = gr.Audio(elem_id="audio6",elem_classes="audio",autoplay=False,visible=False) | |
| audio7 = gr.Audio(elem_id="audio7",elem_classes="audio",autoplay=False,visible=False) | |
| audio8 = gr.Audio(elem_id="audio8",elem_classes="audio",autoplay=False,visible=False) | |
| with_voice.change(switch_voice,[with_voice,WITH_AUDIO,VOICE_COUNTER],[with_voice,WITH_AUDIO]) | |
| run_count.change(change_run_count,[run_count],[audio1,audio2,audio3,audio4,audio5,audio6,audio7,audio8]) | |
| submit_btn.click(add_text, [WITH_AUDIO,drop_char1, drop_char2,run_count, context_choice, txt,history,VOICE_COUNTER], [txt,history,audio1,audio2,audio3,audio4,audio5,audio6,audio7,audio8,VOICE_COUNTER], api_name="chat") | |
| interface.queue().launch() | |