|
|
import os, time, sys, asyncio
|
|
|
from typing import List, Dict
|
|
|
import gradio as gr
|
|
|
from dotenv import load_dotenv
|
|
|
from openai import OpenAI
|
|
|
|
|
|
|
|
|
if sys.platform.startswith("win"):
|
|
|
try:
|
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
API_KEY = os.getenv("API_KEY")
|
|
|
HOST = os.getenv("HOST")
|
|
|
Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME")
|
|
|
Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME")
|
|
|
K = int(os.getenv("K", "8"))
|
|
|
TOP_N = int(os.getenv("TOP_N", "5"))
|
|
|
|
|
|
RPM_LIMIT = 2
|
|
|
MIN_SECONDS_BETWEEN = 30
|
|
|
N_DIM = 384
|
|
|
|
|
|
|
|
|
client = None
|
|
|
if API_KEY:
|
|
|
client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems")
|
|
|
|
|
|
|
|
|
from embedder import EmbeddingModel
|
|
|
from Reranker import Reranker
|
|
|
|
|
|
def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str:
|
|
|
delays = [5, 10, 20]
|
|
|
attempt = 0
|
|
|
while True:
|
|
|
try:
|
|
|
resp = client.chat.completions.create(
|
|
|
model=model,
|
|
|
messages=messages,
|
|
|
max_tokens=max_tokens,
|
|
|
temperature=0.3,
|
|
|
timeout=60,
|
|
|
)
|
|
|
return resp.choices[0].message.content
|
|
|
except Exception as e:
|
|
|
msg = str(e)
|
|
|
if "429" in msg or "Rate Limit" in msg:
|
|
|
if attempt < len(delays):
|
|
|
time.sleep(delays[attempt]); attempt += 1
|
|
|
continue
|
|
|
raise
|
|
|
|
|
|
def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str:
|
|
|
Embedder = EmbeddingModel(model_name=Embed_Model_Name)
|
|
|
RankerModel = Reranker(model_name=Reranker_Model_Name)
|
|
|
results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST)
|
|
|
Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N)
|
|
|
|
|
|
snippets, sources = [], []
|
|
|
for p in Top_sort_results:
|
|
|
txt = (p.get("text") or "").strip()
|
|
|
if not txt: continue
|
|
|
src = p.get("source")
|
|
|
if isinstance(src, str) and src: sources.append(src)
|
|
|
snippets.append(txt)
|
|
|
|
|
|
if not snippets:
|
|
|
return ("You are a strict RAG assistant. No context was retrieved from the vector store for this query. "
|
|
|
"If the answer is not present, say you don’t know.")
|
|
|
|
|
|
header = ("You are a strict RAG assistant. Answer ONLY using the provided context snippets. "
|
|
|
"If the answer is not present, say you don’t know. ")
|
|
|
body_budget = max_total_chars - len(header)
|
|
|
body_parts, used = [], 0
|
|
|
for snip in snippets:
|
|
|
piece = snip + "\n\n"
|
|
|
if used + len(piece) <= body_budget:
|
|
|
body_parts.append(piece); used += len(piece)
|
|
|
else:
|
|
|
break
|
|
|
seen, uniq_sources = set(), []
|
|
|
for s in sources:
|
|
|
if s not in seen:
|
|
|
uniq_sources.append(s); seen.add(s)
|
|
|
footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else ""
|
|
|
return (header + "".join(body_parts) + footer).strip()
|
|
|
|
|
|
SYSTEM_SEED = "You are a strict RAG assistant. Answer ONLY using the provided context."
|
|
|
def init_state():
|
|
|
return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None}
|
|
|
|
|
|
def can_call_now(state: dict) -> bool:
|
|
|
last = state.get("last_call_ts")
|
|
|
return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN
|
|
|
|
|
|
def record_call_time(state: dict):
|
|
|
state["last_call_ts"] = time.time()
|
|
|
|
|
|
def respond(user_message: str, state: dict):
|
|
|
|
|
|
missing = []
|
|
|
if not API_KEY: missing.append("API_KEY")
|
|
|
if not HOST: missing.append("HOST")
|
|
|
if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME")
|
|
|
if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME")
|
|
|
if missing:
|
|
|
return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state
|
|
|
|
|
|
state["messages"].append({"role": "user", "content": user_message})
|
|
|
|
|
|
if not can_call_now(state):
|
|
|
remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0)))
|
|
|
remaining = max(1, remaining)
|
|
|
msg = f"Rate limit in effect. Please wait ~{remaining} seconds."
|
|
|
state["messages"].append({"role": "assistant", "content": msg})
|
|
|
return msg, state
|
|
|
|
|
|
rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K)
|
|
|
msgs = [{"role": "system", "content": rag_ctx}]
|
|
|
msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:])
|
|
|
|
|
|
try:
|
|
|
reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000)
|
|
|
record_call_time(state)
|
|
|
except Exception as e:
|
|
|
reply = f"Request failed: {e}"
|
|
|
|
|
|
state["messages"].append({"role": "assistant", "content": reply})
|
|
|
return reply, state
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Ghaymah Chatbot (Gradio)") as demo:
|
|
|
gr.Markdown("# 🤖 Ghaymah Chatbot (Gradio)")
|
|
|
gr.Markdown(
|
|
|
"Vector store: **Connected** \n"
|
|
|
f"Embedder: `{Embed_Model_Name or 'unset'}` \n"
|
|
|
f"RPM limit: **{RPM_LIMIT}** (min {MIN_SECONDS_BETWEEN}s between calls) \n"
|
|
|
f"Gradio version: `{gr.__version__}`"
|
|
|
)
|
|
|
|
|
|
state = gr.State(init_state())
|
|
|
|
|
|
|
|
|
chatbot = gr.Chatbot(label="Chat", height=520, type="messages", value=[])
|
|
|
|
|
|
with gr.Row():
|
|
|
txt = gr.Textbox(
|
|
|
placeholder="Ask anything about the Ghaymah documentation…",
|
|
|
label="Your message",
|
|
|
lines=2,
|
|
|
autofocus=True,
|
|
|
)
|
|
|
with gr.Row():
|
|
|
send_btn = gr.Button("Send", variant="primary")
|
|
|
clear_btn = gr.Button("Clear")
|
|
|
|
|
|
|
|
|
def _on_user_submit(user_input, chat_messages):
|
|
|
try:
|
|
|
if not user_input:
|
|
|
return "", (chat_messages or [])
|
|
|
chat_messages = chat_messages or []
|
|
|
updated = chat_messages + [{"role": "user", "content": user_input}]
|
|
|
print("[on_submit] user:", user_input)
|
|
|
return "", updated
|
|
|
except Exception as e:
|
|
|
print("[on_submit][ERROR]", repr(e))
|
|
|
|
|
|
return user_input, (chat_messages or [])
|
|
|
|
|
|
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])
|
|
|
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])
|
|
|
|
|
|
|
|
|
def _bot_step(chat_messages, state):
|
|
|
try:
|
|
|
chat_messages = chat_messages or []
|
|
|
last_user = None
|
|
|
for msg in reversed(chat_messages):
|
|
|
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
|
|
|
last_user = msg["content"]
|
|
|
break
|
|
|
if last_user is None:
|
|
|
print("[bot_step] no user message found")
|
|
|
return chat_messages, state
|
|
|
|
|
|
print("[bot_step] responding to:", last_user)
|
|
|
bot_reply, new_state = respond(last_user, state)
|
|
|
|
|
|
updated = chat_messages + [{"role": "assistant", "content": bot_reply}]
|
|
|
return updated, new_state
|
|
|
|
|
|
except Exception as e:
|
|
|
print("[bot_step][ERROR]", repr(e))
|
|
|
|
|
|
updated = (chat_messages or []) + [
|
|
|
{"role": "assistant", "content": f"⚠️ Internal error: {e}"}
|
|
|
]
|
|
|
return updated, state
|
|
|
|
|
|
|
|
|
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])\
|
|
|
.then(_bot_step, [chatbot, state], [chatbot, state])
|
|
|
|
|
|
|
|
|
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])\
|
|
|
.then(_bot_step, [chatbot, state], [chatbot, state])
|
|
|
|
|
|
def _clear():
|
|
|
print("[clear] resetting state and chat")
|
|
|
return [], init_state()
|
|
|
|
|
|
clear_btn.click(_clear, outputs=[chatbot, state])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
demo.queue()
|
|
|
demo.launch(debug=True)
|
|
|
|