Andrchest's picture
Single commit for Hugging Face
ab250f8
from langchain_community.document_loaders import (
UnstructuredWordDocumentLoader,
TextLoader,
CSVLoader,
UnstructuredMarkdownLoader,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from app.core.chunks import Chunk
import nltk # used for proper tokenizer workflow
from uuid import (
uuid4,
) # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others)
import numpy as np
from app.settings import logging, settings
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import fitz
class PDFLoader:
def __init__(self, file_path: str):
self.file_path = file_path
def load(self) -> list[Document]:
docs = []
with fitz.open(self.file_path) as doc:
for page in doc:
text = page.get_text("text")
metadata = {
"source": self.file_path,
"page": page.number,
}
docs.append(Document(page_content=text, metadata=metadata))
return docs
class DocumentProcessor:
"""
TODO: determine the most suitable chunk size
chunks -> the list of chunks from loaded files
chunks_unsaved -> the list of recently added chunks that have not been saved to db yet
processed -> the list of files that were already splitted into chunks
unprocessed -> !processed
text_splitter -> text splitting strategy
"""
def __init__(self):
self.chunks_unsaved: list[Chunk] = []
self.unprocessed: list[Document] = []
self.max_workers = min(4, os.cpu_count() or 1)
self.text_splitter = RecursiveCharacterTextSplitter(
**settings.text_splitter.model_dump()
)
"""
Measures cosine between two vectors
"""
def cosine_similarity(self, vec1, vec2):
return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
"""
Updates a list of the most relevant chunks without interacting with db
"""
def update_most_relevant_chunk(
self,
chunk: list[np.float64, Chunk],
relevant_chunks: list[list[np.float64, Chunk]],
mx_len=15,
):
relevant_chunks.append(chunk)
for i in range(len(relevant_chunks) - 1, 0, -1):
if relevant_chunks[i][0] > relevant_chunks[i - 1][0]:
relevant_chunks[i], relevant_chunks[i - 1] = (
relevant_chunks[i - 1],
relevant_chunks[i],
)
else:
break
if len(relevant_chunks) > mx_len:
del relevant_chunks[-1]
"""
Loads one file - extracts text from file
TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader
TODO: Play with .pdf and text from img extraction
TODO: Try chunking with llm
add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true
"""
def check_size(self, file_path: str = "") -> bool:
try:
size = os.path.getsize(filename=file_path)
except Exception:
size = 0
if size > 1000000:
return True
return False
def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False):
loader = None
parallelization = False
if filepath.endswith(".pdf"):
loader = PDFLoader(
file_path=filepath
) # splits each presentation into slides and processes it as separate file
parallelization = False
elif filepath.endswith(".docx") or filepath.endswith(".doc"):
loader = UnstructuredWordDocumentLoader(file_path=filepath)
elif filepath.endswith(".txt"):
loader = TextLoader(file_path=filepath)
elif filepath.endswith(".csv"):
loader = CSVLoader(file_path=filepath)
elif filepath.endswith(".json"):
loader = TextLoader(file_path=filepath)
elif filepath.endswith(".md"):
loader = UnstructuredMarkdownLoader(file_path=filepath)
if filepath.endswith(".pdf"):
parallelization = False
else:
parallelization = self.check_size(file_path=filepath)
if get_loader:
return loader
elif get_chunking_strategy:
return parallelization
else:
raise RuntimeError("What to do, my lord?")
def load_document(
self, filepath: str, add_to_unprocessed: bool = False
) -> list[Document]:
loader = self.document_multiplexer(filepath=filepath, get_loader=True)
if loader is None:
raise RuntimeError("Unsupported type of file")
documents: list[Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files
try:
documents = loader.load()
# print("-" * 100, documents, "-" * 100, sep="\n")
except Exception:
raise RuntimeError("File is corrupted")
if add_to_unprocessed:
for doc in documents:
self.unprocessed.append(doc)
strategy = self.document_multiplexer(filepath=filepath, get_chunking_strategy=True)
print(f"Strategy --> {strategy}")
self.generate_chunks(parallelization=strategy)
return documents
"""
Similar to load_document, but for multiple files
add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true
"""
def load_documents(
self, documents: list[str], add_to_unprocessed: bool = False
) -> list[Document]:
extracted_documents: list[Document] = []
for doc in documents:
temp_storage: list[Document] = []
try:
temp_storage = self.load_document(
filepath=doc, add_to_unprocessed=True
)
except Exception as e:
logging.error(
"Error at load_documents while loading %s", doc, exc_info=e
)
continue
for extrc_doc in temp_storage:
extracted_documents.append(extrc_doc)
if add_to_unprocessed:
self.unprocessed.append(extrc_doc)
return extracted_documents
def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]:
output = []
for i in range(0, len(original_list), split_by):
new_group = original_list[i: i + split_by]
output.append(new_group)
return output
def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]:
output: list[Chunk] = []
for chunk in text:
start_l, end_l = self.get_start_end_lines(
splitted_text=lines,
start_char=chunk.metadata.get("start_index", 0),
end_char=chunk.metadata.get("start_index", 0)
+ len(chunk.page_content),
)
new_chunk = Chunk(
id=uuid4(),
filename=document.metadata.get("source", ""),
page_number=document.metadata.get("page", 0),
start_index=chunk.metadata.get("start_index", 0),
start_line=start_l,
end_line=end_l,
text=chunk.page_content,
)
# print(new_chunk)
output.append(new_chunk)
return output
def precompute_lines(self, splitted_document: list[str]) -> list[dict]:
current_start = 0
output: list[dict] = []
for i, line in enumerate(splitted_document):
output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line})
current_start += len(line) + 1
return output
def generate_chunks(self, parallelization: bool = True):
intermediate = []
for document in self.unprocessed:
text: list[str] = self.text_splitter.split_documents(documents=[document])
lines: list[dict] = self.precompute_lines(splitted_document=document.page_content.splitlines())
groups = self.split_into_groups(original_list=text, split_by=50)
if parallelization:
print("<------- Apply Parallel Execution ------->")
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self._chunkinize, document, group, lines) for group in groups]
for feature in as_completed(futures):
intermediate.append(feature.result())
else:
intermediate.append(self._chunkinize(document=document, text=text, lines=lines))
for group in intermediate:
for chunk in group:
self.chunks_unsaved.append(chunk)
self.unprocessed = []
def find_line(self, splitted_text: list[dict], char) -> int:
l, r = 0, len(splitted_text) - 1
while l <= r:
m = (l + r) // 2
line = splitted_text[m]
if line["start"] <= char < line["end"]:
return m + 1
elif char < line["start"]:
r = m - 1
else:
l = m + 1
return r
def get_start_end_lines(
self,
splitted_text: list[dict],
start_char: int,
end_char: int,
debug_mode: bool = False,
) -> tuple[int, int]:
start = self.find_line(splitted_text=splitted_text, char=start_char)
end = self.find_line(splitted_text=splitted_text, char=end_char)
return (start, end)
"""
Note: it should be used only once to download tokenizers, futher usage is not recommended
"""
def update_nltk(self) -> None:
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
"""
For now the system works as follows: we save recently loaded chunks in two arrays:
chunks - for all chunks, even for that ones that havn't been saveed to db
chunks_unsaved - for chunks that have been added recently
I do not know weather we really need to store all chunks that were added in the
current session, but chunks_unsaved are used to avoid dublications while saving to db.
"""
def get_and_save_unsaved_chunks(self) -> list[Chunk]:
chunks_copy: list[Chunk] = self.chunks_unsaved.copy()
self.clear_unsaved_chunks()
return chunks_copy
def clear_unsaved_chunks(self):
self.chunks_unsaved = []
def get_all_chunks(self) -> list[Chunk]:
return self.chunks_unsaved