RAG-Ghaymah-Documentation / app_gradio.py
Ahmed-El-Sharkawy's picture
Uploud the app
d128e02 verified
raw
history blame
8.59 kB
import os, time, sys, asyncio
from typing import List, Dict
import gradio as gr
from dotenv import load_dotenv
from openai import OpenAI
# ---- Windows event loop fix ----
if sys.platform.startswith("win"):
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception:
pass
# ---- Env ----
load_dotenv()
API_KEY = os.getenv("API_KEY")
HOST = os.getenv("HOST")
Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME")
Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME")
K = int(os.getenv("K", "8"))
TOP_N = int(os.getenv("TOP_N", "5"))
RPM_LIMIT = 2
MIN_SECONDS_BETWEEN = 30
N_DIM = 384
# ---- OpenAI client ----
client = None
if API_KEY:
client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems")
# ---- Your RAG bits ----
from embedder import EmbeddingModel
from Reranker import Reranker
def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str:
delays = [5, 10, 20]
attempt = 0
while True:
try:
resp = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
timeout=60,
)
return resp.choices[0].message.content
except Exception as e:
msg = str(e)
if "429" in msg or "Rate Limit" in msg:
if attempt < len(delays):
time.sleep(delays[attempt]); attempt += 1
continue
raise
def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str:
Embedder = EmbeddingModel(model_name=Embed_Model_Name)
RankerModel = Reranker(model_name=Reranker_Model_Name)
results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST)
Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N)
snippets, sources = [], []
for p in Top_sort_results:
txt = (p.get("text") or "").strip()
if not txt: continue
src = p.get("source")
if isinstance(src, str) and src: sources.append(src)
snippets.append(txt)
if not snippets:
return ("You are a strict RAG assistant. No context was retrieved from the vector store for this query. "
"If the answer is not present, say you don’t know.")
header = ("You are a strict RAG assistant. Answer ONLY using the provided context snippets. "
"If the answer is not present, say you don’t know. ")
body_budget = max_total_chars - len(header)
body_parts, used = [], 0
for snip in snippets:
piece = snip + "\n\n"
if used + len(piece) <= body_budget:
body_parts.append(piece); used += len(piece)
else:
break
seen, uniq_sources = set(), []
for s in sources:
if s not in seen:
uniq_sources.append(s); seen.add(s)
footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else ""
return (header + "".join(body_parts) + footer).strip()
SYSTEM_SEED = "You are a strict RAG assistant. Answer ONLY using the provided context."
def init_state():
return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None}
def can_call_now(state: dict) -> bool:
last = state.get("last_call_ts")
return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN
def record_call_time(state: dict):
state["last_call_ts"] = time.time()
def respond(user_message: str, state: dict):
# Basic env checks – we still show a bot response so the UI proves it’s working
missing = []
if not API_KEY: missing.append("API_KEY")
if not HOST: missing.append("HOST")
if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME")
if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME")
if missing:
return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state
state["messages"].append({"role": "user", "content": user_message})
if not can_call_now(state):
remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0)))
remaining = max(1, remaining)
msg = f"Rate limit in effect. Please wait ~{remaining} seconds."
state["messages"].append({"role": "assistant", "content": msg})
return msg, state
rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K)
msgs = [{"role": "system", "content": rag_ctx}]
msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:])
try:
reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000)
record_call_time(state)
except Exception as e:
reply = f"Request failed: {e}"
state["messages"].append({"role": "assistant", "content": reply})
return reply, state
# ------------------- Gradio UI: messages API (Gradio >= 5) -------------------
with gr.Blocks(title="Ghaymah Chatbot (Gradio)") as demo:
gr.Markdown("# 🤖 Ghaymah Chatbot (Gradio)")
gr.Markdown(
"Vector store: **Connected** \n"
f"Embedder: `{Embed_Model_Name or 'unset'}` \n"
f"RPM limit: **{RPM_LIMIT}** (min {MIN_SECONDS_BETWEEN}s between calls) \n"
f"Gradio version: `{gr.__version__}`"
)
state = gr.State(init_state()) # {"messages": [...], "last_call_ts": ...}
# Start with an explicit empty list so it's never None
chatbot = gr.Chatbot(label="Chat", height=520, type="messages", value=[])
with gr.Row():
txt = gr.Textbox(
placeholder="Ask anything about the Ghaymah documentation…",
label="Your message",
lines=2,
autofocus=True,
)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear")
# Step 1: add a user message immediately
def _on_user_submit(user_input, chat_messages):
try:
if not user_input:
return "", (chat_messages or [])
chat_messages = chat_messages or [] # guard for None
updated = chat_messages + [{"role": "user", "content": user_input}]
print("[on_submit] user:", user_input)
return "", updated
except Exception as e:
print("[on_submit][ERROR]", repr(e))
# keep textbox text so you can retry; don't mutate chat on error
return user_input, (chat_messages or [])
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])
# Step 2: call backend and append assistant message
def _bot_step(chat_messages, state):
try:
chat_messages = chat_messages or []
last_user = None
for msg in reversed(chat_messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
last_user = msg["content"]
break
if last_user is None:
print("[bot_step] no user message found")
return chat_messages, state
print("[bot_step] responding to:", last_user)
bot_reply, new_state = respond(last_user, state) # <-- your 2-arg respond
updated = chat_messages + [{"role": "assistant", "content": bot_reply}]
return updated, new_state
except Exception as e:
print("[bot_step][ERROR]", repr(e))
# show the error in the chat so you see *something* in the UI
updated = (chat_messages or []) + [
{"role": "assistant", "content": f"⚠️ Internal error: {e}"}
]
return updated, state
# Submit (Enter)
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])\
.then(_bot_step, [chatbot, state], [chatbot, state])
# Click (Send)
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])\
.then(_bot_step, [chatbot, state], [chatbot, state])
def _clear():
print("[clear] resetting state and chat")
return [], init_state()
clear_btn.click(_clear, outputs=[chatbot, state])
if __name__ == "__main__":
demo.queue()
demo.launch(debug=True)