|
|
import sys |
|
|
from pathlib import Path |
|
|
sys.path.append(Path(__file__).parent.absolute() + '/util') |
|
|
sys.path.append(Path(__file__).parent.absolute() + '/sentence_splitter') |
|
|
|
|
|
import chromadb |
|
|
from util.llm import LLaMaCPP |
|
|
from os.path import exists |
|
|
from json import load as json_load |
|
|
from time import sleep |
|
|
from sentence_splitter import split |
|
|
|
|
|
|
|
|
MAX_DIFFERENCE = 1.3 |
|
|
MAX_DB_RESULTS = 10 |
|
|
with open('prompt.md', 'r', encoding='utf-8') as _f: |
|
|
PROMPT = _f.read() |
|
|
GBNF_TEMPLATE = """ |
|
|
root ::= "```python\\n[" list "]\\n```" |
|
|
list ::= %% |
|
|
""" |
|
|
GBNF_TEMPLATE_ITEM = '("\'%%\'")?' |
|
|
GBNF_SEPARATOR = ' (", ")? ' |
|
|
|
|
|
|
|
|
def db_read(texts: list[str]): |
|
|
""" |
|
|
Get results from ChromaDB based on vector similarity |
|
|
:param texts: a list of strings to search for |
|
|
:return: Query results directly from ChromaDB |
|
|
""" |
|
|
client = chromadb.PersistentClient(path=Path(__file__).resolve().parent.parent.absolute().__str__() + '/data/database.chroma') |
|
|
collection = client.get_collection(name='PolitScanner') |
|
|
return collection.query(query_texts=texts, n_results=MAX_DB_RESULTS) |
|
|
|
|
|
|
|
|
def process(sentences: list, llm: LLaMaCPP) -> list: |
|
|
""" |
|
|
Check the given sentences for topics |
|
|
:param sentences: a list of sentences as strings |
|
|
:param llm: LLaMaCPP instance with a loaded model (PolitScanner fine-tune preferred) |
|
|
:return: a list of topics |
|
|
""" |
|
|
db_results = db_read(sentences) |
|
|
print(db_results) |
|
|
if len(db_results['ids'][0]) == 0: |
|
|
return [] |
|
|
topic_ids = [] |
|
|
|
|
|
for i, result in enumerate(db_results['ids'][0]): |
|
|
if db_results['distances'][0][i] < MAX_DIFFERENCE: |
|
|
id_ = result.split('-')[0] |
|
|
if id_ not in topic_ids: |
|
|
topic_ids.append(id_) |
|
|
if len(topic_ids) == 0: |
|
|
return [] |
|
|
|
|
|
if len(topic_ids) == 1 and topic_ids[0] != '0': |
|
|
topic_ids.append('0') |
|
|
topics = [] |
|
|
titles = {} |
|
|
|
|
|
for topic_id in topic_ids: |
|
|
with open(Path(__file__).resolve().parent.parent.absolute().__str__() + f"/data/parsed/{topic_id}.json", 'r') as f: |
|
|
topics.append(json_load(f)) |
|
|
titles[topics[-1]['topic']] = len(topics) - 1 |
|
|
formatted_topics = '' |
|
|
titles_list = list(titles.keys()) |
|
|
titles_list.sort() |
|
|
items = [] |
|
|
|
|
|
for title in titles_list: |
|
|
items.append(GBNF_TEMPLATE_ITEM.replace('%%', title)) |
|
|
grammar = GBNF_TEMPLATE.replace('%%', GBNF_SEPARATOR.join(items)) |
|
|
topics.sort(key=lambda x: x['topic']) |
|
|
for topic in topics: |
|
|
if len(formatted_topics) > 0: |
|
|
formatted_topics += '\n' |
|
|
formatted_topics += f"'{topic['topic']}'" |
|
|
|
|
|
prompt = PROMPT.replace('{TOPICS}', formatted_topics) |
|
|
for i, sentence in enumerate(sentences): |
|
|
prompt = prompt.replace('{' + f'SENTENCE_{i+1}' + '}', sentence) |
|
|
|
|
|
prompt = f"<|im_start|>user\n{prompt}\n/no_think\n<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n" |
|
|
print(prompt) |
|
|
output = llm.generate(prompt, enable_thinking=False, grammar=grammar, temperature=0.0) |
|
|
print(output) |
|
|
|
|
|
output = output.split('[')[-1].split(']')[0] |
|
|
truths = [] |
|
|
for title in titles_list: |
|
|
if title in output: |
|
|
truths.append(topics[titles[title]]['fact']) |
|
|
return truths |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
""" |
|
|
Check the `input.txt` file for topics and return the results in `output.txt` |
|
|
:return: None |
|
|
""" |
|
|
if not exists('input.txt'): |
|
|
raise FileNotFoundError('input.txt not found') |
|
|
with open('input.txt', 'r') as f: |
|
|
text = f.read() |
|
|
|
|
|
llm = LLaMaCPP() |
|
|
if exists('/opt/llms/Qwen3-1.7B-PolitScanner-Q5_K_S.gguf'): |
|
|
llm.set_model('Qwen3-1.7B-PolitScanner-Q5_K_S.gguf') |
|
|
else: |
|
|
llm.set_model('Qwen3-30B-A3B-Q5_K_M.gguf') |
|
|
|
|
|
sentences = split(text) |
|
|
print(f"{len(sentences)=}") |
|
|
chunked_sentences = [] |
|
|
|
|
|
for i in range(0, len(sentences), 3): |
|
|
if i == 0: |
|
|
chunk2 = ['EMPTY'] + sentences[:4] |
|
|
elif i + 3 >= len(sentences): |
|
|
chunk2 = sentences[-5:-1] + ['EMPTY'] |
|
|
else: |
|
|
chunk2 = sentences[i - 1:i + 4] |
|
|
chunked_sentences.append(chunk2) |
|
|
print(f"{len(chunked_sentences)=}") |
|
|
llm.load_model(print_log=True, threads=16, kv_cache_type='q8_0', context=8192) |
|
|
while llm.is_loading() or not llm.is_running(): |
|
|
sleep(1) |
|
|
with open('output.txt', 'w', encoding='utf-8') as f: |
|
|
|
|
|
for chunked_sentences2 in chunked_sentences: |
|
|
truths = process(chunked_sentences2, llm) |
|
|
for truth in truths: |
|
|
f.write(f" # Hinweis: {truth}\n") |
|
|
for i, sentence in enumerate(chunked_sentences2): |
|
|
if i in range(1, 4): |
|
|
f.write(f"{sentence}\n") |
|
|
f.write('\n') |
|
|
print('REACHED `llm.stop()`') |
|
|
llm.stop() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|