Andrchest's picture
Single commit for Hugging Face
ab250f8
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv
from sentence_transformers import (
SentenceTransformer,
CrossEncoder,
) # SentenceTransformer -> model for embeddings, CrossEncoder -> re-ranker
from ctransformers import AutoModelForCausalLM
from torch import Tensor
from google import genai
from google.genai import types
from app.core.chunks import Chunk
from app.settings import settings, BASE_DIR, GeminiEmbeddingSettings
load_dotenv()
class Embedder:
def __init__(self, model: str = "BAAI/bge-m3"):
self.device: str = settings.device
self.model_name: str = model
self.model: SentenceTransformer = SentenceTransformer(model, device=self.device)
"""
Encodes string to dense vector
"""
def encode(self, text: str | list[str]) -> Tensor | list[Tensor]:
return self.model.encode(sentences=text, show_progress_bar=False, batch_size=32)
"""
Returns the dimensionality of dense vector
"""
def get_vector_dimensionality(self) -> int | None:
return self.model.get_sentence_embedding_dimension()
class Reranker:
def __init__(self, model: str = "cross-encoder/ms-marco-MiniLM-L6-v2"):
self.device: str = settings.device
self.model_name: str = model
self.model: CrossEncoder = CrossEncoder(model, device=self.device)
"""
Returns re-sorted (by relevance) vector with dicts, from which we need only the 'corpus_id'
since it is a position of chunk in original list
"""
def rank(self, query: str, chunks: list[Chunk]) -> list[dict[str, int]]:
return self.model.rank(query, [chunk.get_raw_text() for chunk in chunks])
# TODO: add models parameters to global config file
# TODO: add exception handling when response have more tokens than was set
# TODO: find a way to restrict the model for providing too long answers
class LocalLLM:
def __init__(self):
self.model = AutoModelForCausalLM.from_pretrained(
**settings.local_llm.model_dump()
)
"""
Produces the response to user's prompt
stream -> flag, determines weather we need to wait until the response is ready or can show it token by token
TODO: invent a way to really stream the answer (as return value)
"""
def get_response(
self,
prompt: str,
stream: bool = True,
logging: bool = True,
use_default_config: bool = True,
) -> str:
with open("../prompt.txt", "w") as f:
f.write(prompt)
generated_text = ""
tokenized_text: list[int] = self.model.tokenize(text=prompt)
response: list[int] = self.model.generate(
tokens=tokenized_text, **settings.local_llm.model_dump()
)
if logging:
print(response)
if not stream:
return self.model.detokenize(response)
for token in response:
chunk = self.model.detokenize([token])
generated_text += chunk
if logging:
print(chunk, end="", flush=True) # flush -> clear the buffer
return generated_text
class GeminiLLM:
def __init__(self, model="gemini-2.0-flash"):
self.client = genai.Client(api_key=settings.api_key)
self.model = model
def get_response(
self,
prompt: str,
stream: bool = True,
logging: bool = True,
use_default_config: bool = False,
) -> str:
path_to_prompt = os.path.join(BASE_DIR, "prompt.txt")
with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f:
f.write(prompt)
response = self.client.models.generate_content(
model=self.model,
contents=prompt,
config=(
types.GenerateContentConfig(**settings.gemini_generation.model_dump())
if use_default_config
else None
),
)
return response.text
async def get_streaming_response(
self,
prompt: str,
stream: bool = True,
logging: bool = True,
use_default_config: bool = False,
):
path_to_prompt = os.path.join(BASE_DIR, "prompt.txt")
with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f:
f.write(prompt)
response = self.client.models.generate_content_stream(
model=self.model,
contents=prompt,
config=(
types.GenerateContentConfig(**settings.gemini_generation.model_dump())
if use_default_config
else None
),
)
for chunk in response:
yield chunk
class GeminiEmbed:
def __init__(self, model="text-embedding-004"):
self.client = genai.Client(api_key=settings.api_key)
self.model = model
self.settings = GeminiEmbeddingSettings()
self.max_workers = 5
def _embed_batch(self, batch: list[str], idx: int) -> dict:
response = self.client.models.embed_content(
model=self.model,
contents=batch,
config=types.EmbedContentConfig(
**settings.gemini_embedding.model_dump()
),
).embeddings
return {"idx": idx, "embeddings": response}
def encode(self, text: str | list[str]) -> list[Tensor]:
if isinstance(text, str):
text = [text]
groups: list[list[float]] = []
max_batch_size = 100 # can not be changed due to google restrictions
batches: list[list[str]] = [text[i : i + max_batch_size] for i in range(0, len(text), max_batch_size)]
print(*[len(batch) for batch in batches])
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self._embed_batch, batch, idx) for idx, batch in enumerate(batches)]
for future in as_completed(futures):
groups.append(future.result())
groups.sort(key=lambda x: x["idx"])
result: list[float] = []
for group in groups:
for vec in group["embeddings"]:
result.append(vec.values)
return result
def get_vector_dimensionality(self) -> int | None:
return getattr(self.settings, "output_dimensionality")
class Wrapper:
def __init__(self, model: str = "gemini-2.0-flash"):
self.model = model
self.client = genai.Client(api_key=settings.api_key)
def wrap(self, prompt: str) -> str:
response = self.client.models.generate_content(
model=self.model,
contents=prompt,
config=types.GenerateContentConfig(**settings.gemini_wrapper.model_dump())
)
return response.text