Spaces:
Runtime error
Runtime error
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.vectorstores.faiss import FAISS | |
| from langchain import OpenAI | |
| from langchain.chains.qa_with_sources import load_qa_with_sources_chain | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.llms import OpenAI | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.docstore.document import Document | |
| from langchain.vectorstores import FAISS, VectorStore | |
| import docx2txt | |
| from typing import List, Dict, Any, Union, Text, Tuple, Iterable | |
| import re | |
| from io import BytesIO | |
| import streamlit as st | |
| from .prompts import STUFF_PROMPT | |
| from pypdf import PdfReader | |
| from langchain.memory import ConversationBufferWindowMemory | |
| import openai | |
| class PDFFile: | |
| """A PDF file class for typing purposes.""" | |
| def is_pdf(file:Any) -> bool: | |
| return file.name.endswith(".pdf") | |
| class DocxFile: | |
| """A Docx file class for typing purposes.""" | |
| def is_docx(file:Any) -> bool: | |
| return file.name.endswith(".docx") | |
| class TxtFile: | |
| """A Txt file class for typing purposes.""" | |
| def is_txt(file:Any) -> bool: | |
| return file.name.endswith(".txt") | |
| class CodeFile: | |
| """A scripting-file class for typing purposes.""" | |
| def is_code(file:Any) -> bool: | |
| return file.name.split(".")[1] in [".py", ".json", ".html", ".css", ".md"] | |
| class HashDocument(Document): | |
| """A document that uses the page content as the hash.""" | |
| def __hash__(self): | |
| content = self.page_content + "".join(self.metadata[k] for k in self.metadata.keys()) | |
| return hash(content) | |
| def check_openai_api_key(api_key:str)->bool: | |
| """This function checks the given OpenAI API key and returns True if it is valid, False otherwise. | |
| Checking is performed using""" | |
| if not (api_key.startswith('sk-') and len(api_key)==51): | |
| st.error("Invalid OpenAI API key! Please provide a valid key.") | |
| return False | |
| # setting the openai api key to the given value | |
| openai.api_key = api_key | |
| try: | |
| _ = openai.Completion.create( | |
| engine="davinci", | |
| prompt="This is a call test to test out the API Key.", | |
| max_tokens=5 | |
| ) | |
| except openai.error.AuthenticationError: | |
| return False | |
| return True | |
| def parse_docx(file: BytesIO) -> str: | |
| text = docx2txt.process(file) | |
| # Remove multiple newlines | |
| text = re.sub(r"\n\s*\n", "\n\n", text) | |
| return text | |
| def parse_pdf(file: BytesIO) -> List[str]: | |
| pdf = PdfReader(file) | |
| output = [] | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| # Merge hyphenated words | |
| text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text) | |
| # Fix newlines in the middle of sentences | |
| text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip()) | |
| # Remove multiple newlines | |
| text = re.sub(r"\n\s*\n", "\n\n", text) | |
| output.append(text) | |
| return output | |
| def parse_txt(file: BytesIO) -> str: | |
| text = file.read().decode("utf-8") | |
| # Remove multiple newlines | |
| text = re.sub(r"\n\s*\n", "\n\n", text) | |
| return text | |
| def get_text_splitter( | |
| chunk_size:int=500, | |
| chunk_overlap:int=50, | |
| separators:Iterable[Text]= ["\n\n", "\n", ".", "!", "?", ",", " ", ""])->RecursiveCharacterTextSplitter: | |
| """Returns a text splitter instance with the given parameters. Cached for performance.""" | |
| # text splitter to split the text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, # a limited chunk size ensures smaller chunks and more precise answers | |
| separators=separators, # a list of separators to split the text on | |
| chunk_overlap=chunk_overlap, # minimal overlap to capture sematic overlap across chunks | |
| ) | |
| return text_splitter | |
| def text_to_docs(pages: Union[Text, Tuple[Text]], **kwargs) -> List[HashDocument]: | |
| """ | |
| Converts a string or frozenset of pages content to a list of HashDocuments (for efficient caching) | |
| with metadata. | |
| """ | |
| # sanity check on the input provided | |
| if not isinstance(pages, (str, tuple)): | |
| raise ValueError("Text must be either a string or a list of strings. Got: {type(text)}") | |
| elif isinstance(pages, str): | |
| # Take a single string as one page - make it a tuple so that is hashable | |
| pages = (pages, ) | |
| if isinstance(pages, tuple): | |
| # map each page into a document instance | |
| page_docs = [HashDocument(page_content=page) for page in pages] | |
| # Add page numbers as metadata | |
| for i, doc in enumerate(page_docs): | |
| doc.metadata["page"] = i + 1 | |
| doc.metadata["file_name"] = kwargs.get("file_name", "") | |
| # Split pages into chunks | |
| doc_chunks = [] | |
| for ntokens in [50,250,500,750]: | |
| # Get the text splitter | |
| text_splitter = get_text_splitter(chunk_size=ntokens, chunk_overlap=ntokens//10) | |
| for doc in page_docs: | |
| # this splits the page into chunks | |
| chunks = text_splitter.split_text(doc.page_content) | |
| for i, chunk in enumerate(chunks): | |
| # Create a new document for each individual chunk | |
| new_doc = HashDocument( | |
| page_content=chunk, | |
| metadata={"file_name": doc.metadata["file_name"], "page": doc.metadata["page"], "chunk": i} | |
| ) | |
| # Add sources to metadata for retrieval later on | |
| new_doc.metadata["source"] = \ | |
| f"{new_doc.metadata['file_name']}/Page-{new_doc.metadata['page']}/Chunk-{new_doc.metadata['chunk']}/Chunksize-{ntokens}" | |
| doc_chunks.append(new_doc) | |
| return doc_chunks | |
| def embed_docs(file_name:Text, _docs: Tuple[Document]) -> VectorStore: | |
| """ | |
| Embeds a list of Documents and returns a FAISS index. | |
| Adds a dummy file_name variable to permit caching. | |
| """ | |
| # Embed the chunks | |
| embeddings = OpenAIEmbeddings(openai_api_key=st.session_state.get("OPENAI_API_KEY")) | |
| index = FAISS.from_documents(list(_docs), embeddings) | |
| return index | |
| # removing caching - consider to reintroduce it afterwise considering performance | |
| # @st.cache_data | |
| def search_docs(_index: VectorStore, query: str, k:int=5) -> List[Document]: | |
| """Searches a FAISS index for similar chunks to the query | |
| and returns a list of Documents.""" | |
| # Search for similar chunks | |
| docs = _index.similarity_search(query, k=k) | |
| return docs | |
| # removing caching - consider to reintroduce it afterwise considering performance | |
| # @st.cache_data | |
| def get_answer( | |
| _docs: List[Document], | |
| query: str, | |
| model: str="gpt-4", | |
| stream_answer:bool=True) -> Dict[str, Any]: | |
| """Gets an answer to a question from a list of Documents.""" | |
| # Create the chain to be used in this specific setting | |
| chain = load_qa_with_sources_chain( | |
| ChatOpenAI(temperature=0, openai_api_key=st.session_state.get("OPENAI_API_KEY"), model=model, streaming=stream_answer), | |
| chain_type="stuff", | |
| prompt=STUFF_PROMPT, | |
| verbose=True, | |
| # chain_type_kwargs={ | |
| # "verbose": True, | |
| # "prompt": query, | |
| # "memory": ConversationBufferWindowMemory( | |
| # k=5, | |
| # memory_key="history", | |
| # input_key="question"), | |
| # } | |
| ) | |
| # also returnig the text of the source used to form the answer | |
| answer = chain( | |
| {"input_documents": _docs, "question": query} | |
| ) | |
| return answer | |
| # removing caching - consider to reintroduce it afterwise considering performance | |
| # @st.cache_data | |
| def get_sources(answer: Dict[str, Any], docs: List[Document]) -> List[Document]: | |
| """Gets the source documents for an answer.""" | |
| # Get sources for the answer | |
| source_keys = [s for s in answer["output_text"].split("SOURCES: ")[-1].split(", ")] | |
| # Retrieving the documents the actual sources refer to | |
| source_docs = [] | |
| for doc in docs: | |
| if doc.metadata["source"] in source_keys: | |
| source_docs.append(doc) | |
| return source_docs | |
| # this function could be removed - it is not used anymore | |
| def wrap_text_in_html(text: str) -> str: | |
| """Wraps each text block separated by newlines in <p> tags""" | |
| if isinstance(text, list): | |
| # Add horizontal rules between pages | |
| text = "\n<hr/>\n".join(text) | |
| return "".join([f"<p>{line}</p>" for line in text.split("\n")]) |