Spaces:
Build error
Build error
| import streamlit as st | |
| from keybert import KeyBERT | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from transformers import AutoTokenizer | |
| import os, re, json | |
| import openai | |
| import spacy | |
| import en_core_web_sm | |
| from sklearn.cluster import KMeans, AgglomerativeClustering | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| MODEL = 'all-MiniLM-L6-v2' | |
| nltk.download('stopwords') | |
| def load_autotoken(): | |
| autotok = AutoTokenizer.from_pretrained('facebook/bart-large-mnli') | |
| return autotok | |
| def load_keyword_model(): | |
| sentence_model = load_model() | |
| kw_model = KeyBERT(model=sentence_model) | |
| return kw_model | |
| def load_model(): | |
| embedder = SentenceTransformer(MODEL) | |
| return embedder | |
| def load_nlp(): | |
| nlp = en_core_web_sm.load() | |
| return nlp | |
| def create_nest_sentences(document:str, token_max_length = 1023): | |
| nested = [] | |
| sent = [] | |
| length = 0 | |
| tokenizer = load_autotoken() | |
| for sentence in re.split(r'(?<=[^A-Z].[.?]) +(?=[A-Z])', document.replace("\n", '.')): | |
| tokens_in_sentence = tokenizer(str(sentence), truncation=False, padding=False)[0] # hugging face transformer tokenizer | |
| length += len(tokens_in_sentence) | |
| if length < token_max_length: | |
| sent.append(sentence) | |
| else: | |
| nested.append(sent) | |
| sent = [sentence] | |
| length = 0 | |
| if sent: | |
| nested.append(sent) | |
| return nested | |
| def preprocess(text) -> str: | |
| stop_words = set(stopwords.words("english")) | |
| text = text.lower() | |
| # text = ''.join([c for c in text if c not in ('!', '.', ',', '?', ':', ';', '"', "'", '-', '(', ')')]) | |
| words = text.split() | |
| words = [w for w in words if not w in stop_words] | |
| return " ".join(words) | |
| def generate_keywords(kw_model, document: str) -> list: | |
| atomic_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 1), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) | |
| complex_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 2), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) | |
| final_topics = [] | |
| for extraction in atomic_extractions: | |
| final_topics.append(extraction[0]) | |
| for extraction in complex_extractions: | |
| final_topics.append(extraction[0]) | |
| return final_topics | |
| def cluster_based_on_topics(nlp, embedder, text1:str, text2:str, num_clusters=3): | |
| # Preprocess and tokenize the texts | |
| doc1 = nlp(preprocess(text1)) | |
| doc2 = nlp(preprocess(text2)) | |
| # Extract sentences from the texts | |
| sentences1 = [sent.text for sent in doc1.sents] | |
| sentences2 = [sent.text for sent in doc2.sents] | |
| all_sentences = sentences1 + sentences2 | |
| # Generate sentence embeddings for each sentence | |
| sentence_embeddings1 = embedder.encode(sentences1) | |
| sentence_embeddings2 = embedder.encode(sentences2) | |
| all_embeddings = np.concatenate((sentence_embeddings1, sentence_embeddings2), axis=0) | |
| # Normalize the embeddings to unit length | |
| # all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True) | |
| # Perform agglomerative clustering | |
| clustering_model = KMeans(n_clusters=num_clusters) | |
| clustering_model.fit(all_embeddings) | |
| cluster_assignment = clustering_model.labels_ | |
| clustered_sentences = {} | |
| for sentence_id, cluster_id in enumerate(cluster_assignment): | |
| if cluster_id not in clustered_sentences: | |
| clustered_sentences[cluster_id] = [] | |
| clustered_sentences[cluster_id].append(all_sentences[sentence_id]) | |
| return clustered_sentences | |
| def generate_insights(topics:dict, name1:str, name2:str, text1:str, text2:str, clusters) -> list: | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| PROMPT = open("insights.prompt", "r").read() | |
| # print(topics) | |
| PROMPT = PROMPT.replace("{{name1}}", name1) | |
| PROMPT = PROMPT.replace("{{name2}}", name2) | |
| PROMPT = PROMPT.replace("{{topic1}}", ",".join(topics['insight1'][0])) | |
| PROMPT = PROMPT.replace("{{topic2}}", ",".join(topics['insight2'][0])) | |
| PROMPT = PROMPT.replace("{{complex1}}", ",".join(topics['insight1'][1])) | |
| PROMPT = PROMPT.replace("{{complex2}}", ",".join(topics['insight2'][1])) | |
| final_insights = [] | |
| for cluster_id, sentences in clusters.items(): | |
| # print(cluster_id, " ", sentences) | |
| final_sentences = "\n".join(sentences)[:4000] | |
| final_prompt = PROMPT.replace("{{sentences}}", final_sentences) | |
| # with open(f"prompter/insights_{cluster_id}.prompt", "w") as f: | |
| # f.write(final_prompt) | |
| # Generate insights for each cluster | |
| response = openai.Completion.create( | |
| model="text-davinci-003", | |
| prompt=final_prompt, | |
| max_tokens=200, | |
| temperature=0.7, | |
| top_p=1, | |
| frequency_penalty=0.0, | |
| presence_penalty=0.0, | |
| ) | |
| text = response['choices'][0]['text'] | |
| jsonify = json.loads(text) | |
| final_insights.append(jsonify) | |
| return final_insights | |