Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| #@title scirpts | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import faiss | |
| from sklearn.preprocessing import normalize | |
| from transformers import AutoTokenizer, AutoModelForQuestionAnswering | |
| from sentence_transformers import SentenceTransformer,util | |
| from pythainlp import Tokenizer | |
| import pickle | |
| import evaluate | |
| from sklearn.metrics.pairwise import cosine_similarity,euclidean_distances | |
| import gradio as gr | |
| print(torch.cuda.is_available()) | |
| __all__ = [ | |
| "mdeberta", | |
| "wangchanberta-hyp", # Best model | |
| ] | |
| predict_method = [ | |
| "faiss", | |
| "faissWithModel", | |
| "cosineWithModel", | |
| "semanticSearchWithModel", | |
| ] | |
| DEFAULT_MODEL='wangchanberta-hyp' | |
| DEFAULT_SENTENCE_EMBEDDING_MODEL='intfloat/multilingual-e5-base' | |
| MODEL_DICT = { | |
| 'wangchanberta': 'Chananchida/wangchanberta-th-wiki-qa_ref-params', | |
| 'wangchanberta-hyp': 'Chananchida/wangchanberta-th-wiki-qa_hyp-params', | |
| 'mdeberta': 'Chananchida/mdeberta-v3-th-wiki-qa_ref-params', | |
| 'mdeberta-hyp': 'Chananchida/mdeberta-v3-th-wiki-qa_hyp-params', | |
| } | |
| DATA_PATH='models/dataset.xlsx' | |
| EMBEDDINGS_PATH='models/embeddings.pkl' | |
| class ChatbotModel: | |
| def __init__(self, model=DEFAULT_MODEL): | |
| self._chatbot = Chatbot() | |
| self._chatbot.load_data() | |
| self._chatbot.load_model(model) | |
| self._chatbot.load_embedding_model(DEFAULT_SENTENCE_EMBEDDING_MODEL) | |
| self._chatbot.set_vectors() | |
| self._chatbot.set_index() | |
| def chat(self, question): | |
| return self._chatbot.answer_question(question) | |
| def eval(self,model,predict_method): | |
| return self._chatbot.eval(model_name=model,predict_method=predict_method) | |
| class Chatbot: | |
| def __init__(self): | |
| # Initialize variables | |
| self.df = None | |
| self.test_df = None | |
| self.model = None | |
| self.model_name = None | |
| self.tokenizer = None | |
| self.embedding_model = None | |
| self.vectors = None | |
| self.index = None | |
| self.k = 1 # top k most similar | |
| def load_data(self, path: str = DATA_PATH): | |
| self.df = pd.read_excel(path, sheet_name='Default') | |
| self.df['Context'] = pd.read_excel(path, sheet_name='mdeberta')['Context'] | |
| # print('Load data done') | |
| def load_model(self, model_name: str = DEFAULT_MODEL): | |
| self.model = AutoModelForQuestionAnswering.from_pretrained(MODEL_DICT[model_name]) | |
| self.tokenizer = AutoTokenizer.from_pretrained(MODEL_DICT[model_name]) | |
| self.model_name = model_name | |
| # print('Load model done') | |
| def load_embedding_model(self, model_name: str = DEFAULT_SENTENCE_EMBEDDING_MODEL): | |
| if torch.cuda.is_available(): # Check if GPU is available | |
| self.embedding_model = SentenceTransformer(model_name, device='cpu') | |
| else: self.embedding_model = SentenceTransformer(model_name) | |
| # print('Load sentence embedding model done') | |
| def set_vectors(self): | |
| self.vectors = self.prepare_sentences_vector(self.load_embeddings(EMBEDDINGS_PATH)) | |
| def set_index(self): | |
| if torch.cuda.is_available(): # Check if GPU is available | |
| res = faiss.StandardGpuResources() | |
| self.index = faiss.IndexFlatL2(self.vectors.shape[1]) | |
| gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, self.index) | |
| gpu_index_flat.add(self.vectors) | |
| self.index = gpu_index_flat | |
| else: # If GPU is not available, use CPU-based Faiss index | |
| self.index = faiss.IndexFlatL2(self.vectors.shape[1]) | |
| self.index.add(self.vectors) | |
| def get_embeddings(self, text_list): | |
| return self.embedding_model.encode(text_list) | |
| def prepare_sentences_vector(self, encoded_list): | |
| encoded_list = [i.reshape(1, -1) for i in encoded_list] | |
| encoded_list = np.vstack(encoded_list).astype('float32') | |
| encoded_list = normalize(encoded_list) | |
| return encoded_list | |
| def store_embeddings(self, embeddings): | |
| with open('models/embeddings.pkl', "wb") as fOut: | |
| pickle.dump({'sentences': self.df['Question'], 'embeddings': embeddings}, fOut, protocol=pickle.HIGHEST_PROTOCOL) | |
| print('Store embeddings done') | |
| def load_embeddings(self, file_path): | |
| with open(file_path, "rb") as fIn: | |
| stored_data = pickle.load(fIn) | |
| stored_sentences = stored_data['sentences'] | |
| stored_embeddings = stored_data['embeddings'] | |
| print('Load (questions) embeddings done') | |
| return stored_embeddings | |
| def model_pipeline(self, question, similar_context): | |
| inputs = self.tokenizer(question, similar_context, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| answer_start_index = outputs.start_logits.argmax() | |
| answer_end_index = outputs.end_logits.argmax() | |
| predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] | |
| Answer = self.tokenizer.decode(predict_answer_tokens) | |
| return Answer | |
| def faiss_search(self, question_vector): | |
| distances, indices = self.index.search(question_vector, self.k) | |
| similar_questions = [self.df['Question'][indices[0][i]] for i in range(self.k)] | |
| similar_contexts = [self.df['Context'][indices[0][i]] for i in range(self.k)] | |
| return similar_questions, similar_contexts, distances, indices | |
| def predict_faiss(self, message): | |
| message = message.strip() | |
| question_vector = self.get_embeddings(message) | |
| question_vector = self.prepare_sentences_vector([question_vector]) | |
| similar_questions, similar_contexts, distances, indices = self.faiss_search(question_vector) | |
| Answers = [self.df['Answer'][i] for i in indices[0]] | |
| Answer = Answers[0] | |
| return Answer | |
| def predict_faiss(self, message): | |
| message = message.strip() | |
| question_vector = self.get_embeddings([message]) | |
| question_vector = self.prepare_sentences_vector([question_vector]) | |
| similar_questions, similar_contexts, distances, indices = self.faiss_search(question_vector) | |
| Answers = [self.df['Answer'][i] for i in indices[0]] | |
| Answer = Answers[0] | |
| return Answer | |
| # Function to predict using BERT embedding | |
| def predict_bert_embedding(self,message): | |
| message = message.strip() | |
| question_vector = self.get_embeddings(message) | |
| question_vector=self.prepare_sentences_vector([question_vector]) | |
| similar_questions, similar_contexts, distances,indices = self.faiss_search(question_vector) | |
| Answer = self.model_pipeline(similar_questions, similar_contexts) | |
| return Answer | |
| # def predict_semantic_search(self,message,corpus_embeddings): | |
| # message = message.strip() | |
| # query_embedding = self.embedding_model.encode(message, convert_to_tensor=True) | |
| # query_embedding = query_embedding.to('cpu') | |
| # hits = util.semantic_search(query_embedding, corpus_embeddings, top_k=1) | |
| # hit = hits[0][0] | |
| # context=self.df['Context'][hit['corpus_id']] | |
| # score="{:.4f})".format(hit['score']) | |
| # Answer = self.model_pipeline(message, context) | |
| # return Answer | |
| def predict_semantic_search(self, message): | |
| corpus_embeddings = bot._chatbot.prepare_sentences_vector(bot._chatbot.get_embeddings(bot._chatbot.df['Context'])) | |
| message = message.strip() | |
| query_embedding = self.embedding_model.encode(message, convert_to_tensor=True) | |
| query_embedding = query_embedding.to('cuda') | |
| hits = util.semantic_search(query_embedding, corpus_embeddings, top_k=1) | |
| hit = hits[0][0] | |
| context = self.df['Context'][hit['corpus_id']] | |
| Answer = self.model_pipeline(message, context) | |
| return Answer | |
| def predict_without_faiss(self,message): | |
| MostSimilarContext = "" | |
| min_distance = 1000 | |
| message = message.strip(' \t\n') | |
| question_vector = self.get_embeddings([message]) | |
| question_vector=self.prepare_sentences_vector(question_vector) | |
| for j, _question_vector in enumerate(self.vectors): | |
| distance = euclidean_distances(question_vector, _question_vector.reshape(1, -1))[0][0] | |
| if distance < min_distance: | |
| min_distance = distance | |
| MostSimilarContext = self.df['Context'][j] | |
| similar_question = self.df['Question'][j] | |
| if distance <= 0.02469331026: | |
| break | |
| predict_answer = self.model_pipeline(message, MostSimilarContext) | |
| Answer = predict_answer.strip().replace("<unk>","@") | |
| return Answer | |
| bot = ChatbotModel() | |
| """#Gradio""" | |
| EXAMPLE_PATH = ["หลิน ไห่เฟิง มีชื่อเรียกอีกชื่อว่าอะไร" , "ใครเป็นผู้ตั้งสภาเศรษฐกิจโลกขึ้นในปี พ.ศ. 2514 โดยทุกปีจะมีการประชุมที่ประเทศสวิตเซอร์แลนด์", "โปรดิวเซอร์ของอัลบั้มตลอดกาล ของวงคีรีบูนคือใคร", "สกุลเดิมของหม่อมครูนุ่ม นวรัตน ณ อยุธยา คืออะไร"] | |
| # demoFaiss = gr.ChatInterface(fn=bot._chatbot.predict_faiss, examples=EXAMPLE_PATH) | |
| # demoBert = gr.ChatInterface(fn=bot._chatbot.predict_bert_embedding,examples=EXAMPLE_PATH) | |
| # demoSemantic = gr.ChatInterface(fn=bot._chatbot.predict_semantic_search,examples=EXAMPLE_PATH) | |
| # demoWithoutFiss = gr.ChatInterface(fn=bot._chatbot.predict_without_faiss,examples=EXAMPLE_PATH) | |
| demoFaiss = gr.Interface(fn=bot._chatbot.predict_faiss, inputs="text", outputs="text", title="TH wiki (just Faiss)") | |
| demoBert = gr.Interface(fn=bot._chatbot.predict_bert_embedding, inputs="text", outputs="text", title="TH wiki (Faiss & Model)") | |
| demoSemantic = gr.Interface(fn=bot._chatbot.predict_semantic_search, inputs="text", outputs="text", title="TH wiki (Semantic Search & Model)") | |
| demoWithoutFiss = gr.Interface(fn=bot._chatbot.predict_without_faiss, inputs="text", outputs="text", title="TH wiki (just Model)") | |
| demo = gr.TabbedInterface([demoFaiss, demoWithoutFiss, demoBert, demoSemantic], ["Faiss", "Model", "Faiss & Model", "Semantic Search & Model"]) | |
| demo.launch() | |