|
|
import os, time, sys, asyncio |
|
|
from typing import List, Dict |
|
|
import gradio as gr |
|
|
from dotenv import load_dotenv |
|
|
from openai import OpenAI |
|
|
import base64 |
|
|
from embedder import EmbeddingModel |
|
|
from Reranker import Reranker |
|
|
|
|
|
if sys.platform.startswith("win"): |
|
|
try: |
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
APP_Name = os.getenv("APP_Name", "RAG chatbot in Ghaymah documentation") |
|
|
APP_Version = os.getenv("APP_Version", "1.0.0") |
|
|
API_KEY = os.getenv("API_KEY") |
|
|
HOST = os.getenv("HOST") |
|
|
Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME") |
|
|
Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME") |
|
|
K = int(os.getenv("K", "8")) |
|
|
TOP_N = int(os.getenv("TOP_N", "5")) |
|
|
|
|
|
RPM_LIMIT = 20 |
|
|
MIN_SECONDS_BETWEEN = 3 |
|
|
N_DIM = 384 |
|
|
|
|
|
|
|
|
client = None |
|
|
if API_KEY: |
|
|
client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems") |
|
|
|
|
|
CSS = """ |
|
|
.app-header{display:flex;align-items:center;gap:12px;justify-content:center;margin:6px 0 16px} |
|
|
.app-header img{height:60px;border-radius:12px} |
|
|
.app-title{font-weight:800;font-size:28px;line-height:1.1} |
|
|
.app-sub{opacity:.7;font-size:14px} |
|
|
""" |
|
|
|
|
|
COMPANY_LOGO = "download.jpeg" |
|
|
OWNER_NAME = "ENG. Ahmed Yasser El Sharkawy" |
|
|
|
|
|
|
|
|
def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str: |
|
|
delays = [5, 10, 20] |
|
|
attempt = 0 |
|
|
while True: |
|
|
try: |
|
|
resp = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.3, |
|
|
timeout=60, |
|
|
) |
|
|
return resp.choices[0].message.content |
|
|
except Exception as e: |
|
|
msg = str(e) |
|
|
if "429" in msg or "Rate Limit" in msg: |
|
|
if attempt < len(delays): |
|
|
time.sleep(delays[attempt]); attempt += 1 |
|
|
continue |
|
|
raise |
|
|
|
|
|
def logo_data_uri(path: str) -> str: |
|
|
if not os.path.exists(path): |
|
|
return "" |
|
|
ext = os.path.splitext(path)[1].lower() |
|
|
mime = { |
|
|
".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", |
|
|
".webp": "image/webp", ".gif": "image/gif" |
|
|
}.get(ext, "image/png") |
|
|
with open(path, "rb") as f: |
|
|
b64 = base64.b64encode(f.read()).decode("utf-8") |
|
|
return f"data:{mime};base64,{b64}" |
|
|
|
|
|
def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str: |
|
|
Embedder = EmbeddingModel(model_name=Embed_Model_Name) |
|
|
RankerModel = Reranker(model_name=Reranker_Model_Name) |
|
|
results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST) |
|
|
Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N) |
|
|
|
|
|
snippets, sources = [], [] |
|
|
for p in Top_sort_results: |
|
|
txt = (p.get("text") or "").strip() |
|
|
if not txt: continue |
|
|
src = p.get("source") |
|
|
if isinstance(src, str) and src: sources.append(src) |
|
|
snippets.append(txt) |
|
|
|
|
|
if not snippets: |
|
|
return ("You are ghaymah expert . follow instraction to be strict RAG assistant. No context was retrieved from the vector store for this query. " |
|
|
"If the answer is not present, say do not mention in ghaymah documentation.") |
|
|
|
|
|
header = ("You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets. " |
|
|
"If the answer is not present, say do not mention in ghaymah documentation.") |
|
|
body_budget = max_total_chars - len(header) |
|
|
body_parts, used = [], 0 |
|
|
for snip in snippets: |
|
|
piece = snip + "\n\n" |
|
|
if used + len(piece) <= body_budget: |
|
|
body_parts.append(piece); used += len(piece) |
|
|
else: |
|
|
break |
|
|
seen, uniq_sources = set(), [] |
|
|
for s in sources: |
|
|
if s not in seen: |
|
|
uniq_sources.append(s); seen.add(s) |
|
|
footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else "" |
|
|
return (header + "".join(body_parts) + footer).strip() |
|
|
|
|
|
SYSTEM_SEED = "You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets." |
|
|
def init_state(): |
|
|
return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None} |
|
|
|
|
|
def can_call_now(state: dict) -> bool: |
|
|
last = state.get("last_call_ts") |
|
|
return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN |
|
|
|
|
|
def record_call_time(state: dict): |
|
|
state["last_call_ts"] = time.time() |
|
|
|
|
|
def respond(user_message: str, state: dict): |
|
|
missing = [] |
|
|
if not API_KEY: missing.append("API_KEY") |
|
|
if not HOST: missing.append("HOST") |
|
|
if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME") |
|
|
if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME") |
|
|
if missing: |
|
|
return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state |
|
|
|
|
|
state["messages"].append({"role": "user", "content": user_message}) |
|
|
|
|
|
if not can_call_now(state): |
|
|
remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0))) |
|
|
remaining = max(1, remaining) |
|
|
msg = f"Rate limit in effect. Please wait ~{remaining} seconds." |
|
|
state["messages"].append({"role": "assistant", "content": msg}) |
|
|
return msg, state |
|
|
|
|
|
rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K) |
|
|
msgs = [{"role": "system", "content": rag_ctx}] |
|
|
msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:]) |
|
|
|
|
|
try: |
|
|
reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000) |
|
|
record_call_time(state) |
|
|
except Exception as e: |
|
|
reply = f"Request failed: {e}" |
|
|
|
|
|
state["messages"].append({"role": "assistant", "content": reply}) |
|
|
return reply, state |
|
|
|
|
|
|
|
|
with gr.Blocks(title=f"{APP_Name} v{APP_Version}", css=CSS) as demo: |
|
|
header_logo_src = logo_data_uri(COMPANY_LOGO) |
|
|
logo_html = f"<img src='{header_logo_src}' alt='logo'>" if header_logo_src else "" |
|
|
gr.HTML(f""" |
|
|
<div class="app-header"> |
|
|
{logo_html} |
|
|
<div class="app-header-text"> |
|
|
<div class="app-title">{APP_Name}</div> |
|
|
<div class="app-sub">v{APP_Version} • {OWNER_NAME}</div> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
state = gr.State(init_state()) |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
|
|
|
chatbot = gr.Chatbot(label="Chat", height=520, type="messages", value=[]) |
|
|
|
|
|
txt = gr.Textbox( |
|
|
placeholder="Ask anything about the Ghaymah documentation…", |
|
|
label="Your message", |
|
|
lines=2, |
|
|
autofocus=True, |
|
|
) |
|
|
with gr.Row(): |
|
|
send_btn = gr.Button("Send", variant="primary") |
|
|
clear_btn = gr.Button("Clear") |
|
|
|
|
|
|
|
|
with gr.Column(scale=1, min_width=300): |
|
|
gr.Image( |
|
|
value="download.jpeg", |
|
|
interactive=False, |
|
|
show_label=False, |
|
|
container=False, |
|
|
show_fullscreen_button=False, |
|
|
) |
|
|
gr.Markdown( |
|
|
"Vector store: **Connected** \n" |
|
|
f"Embedder: `{Embed_Model_Name or 'unset'}` \n" |
|
|
f"RPM limit: **{RPM_LIMIT}** \n" |
|
|
) |
|
|
|
|
|
def _on_user_submit(user_input, chat_messages): |
|
|
try: |
|
|
if not user_input: |
|
|
return "", (chat_messages or []) |
|
|
chat_messages = chat_messages or [] |
|
|
updated = chat_messages + [{"role": "user", "content": user_input}] |
|
|
|
|
|
return "", updated |
|
|
except Exception as e: |
|
|
print("[on_submit][ERROR]", repr(e)) |
|
|
return user_input, (chat_messages or []) |
|
|
|
|
|
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot]) |
|
|
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot]) |
|
|
|
|
|
def _bot_step(chat_messages, state): |
|
|
try: |
|
|
chat_messages = chat_messages or [] |
|
|
last_user = None |
|
|
for msg in reversed(chat_messages): |
|
|
if msg.get("role") == "user" and isinstance(msg.get("content"), str): |
|
|
last_user = msg["content"] |
|
|
break |
|
|
if last_user is None: |
|
|
print("[bot_step] no user message found") |
|
|
return chat_messages, state |
|
|
|
|
|
|
|
|
bot_reply, new_state = respond(last_user, state) |
|
|
|
|
|
updated = chat_messages + [{"role": "assistant", "content": bot_reply}] |
|
|
return updated, new_state |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
updated = (chat_messages or []) + [ |
|
|
{"role": "assistant", "content": f"⚠️ Internal error: {e}"} |
|
|
] |
|
|
return updated, state |
|
|
|
|
|
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])\ |
|
|
.then(_bot_step, [chatbot, state], [chatbot, state]) |
|
|
|
|
|
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])\ |
|
|
.then(_bot_step, [chatbot, state], [chatbot, state]) |
|
|
|
|
|
def _clear(): |
|
|
print("[clear] resetting state and chat") |
|
|
return [], init_state() |
|
|
|
|
|
clear_btn.click(_clear, outputs=[chatbot, state]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue() |
|
|
demo.launch(debug=True,server_name="0.0.0.0" ,server_port=7860) |
|
|
|