Ahmed-El-Sharkawy's picture
Update app.py
27946eb verified
import os, time, sys, asyncio
from typing import List, Dict
import gradio as gr
from dotenv import load_dotenv
from openai import OpenAI
import base64
from embedder import EmbeddingModel
from Reranker import Reranker
if sys.platform.startswith("win"):
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception:
pass
# Env
load_dotenv()
APP_Name = os.getenv("APP_Name", "RAG chatbot in Ghaymah documentation")
APP_Version = os.getenv("APP_Version", "1.0.0")
API_KEY = os.getenv("API_KEY")
HOST = os.getenv("HOST")
Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME")
Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME")
K = int(os.getenv("K", "8"))
TOP_N = int(os.getenv("TOP_N", "5"))
RPM_LIMIT = 20
MIN_SECONDS_BETWEEN = 3
N_DIM = 384
# OpenAI client
client = None
if API_KEY:
client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems")
CSS = """
.app-header{display:flex;align-items:center;gap:12px;justify-content:center;margin:6px 0 16px}
.app-header img{height:60px;border-radius:12px}
.app-title{font-weight:800;font-size:28px;line-height:1.1}
.app-sub{opacity:.7;font-size:14px}
"""
COMPANY_LOGO = "download.jpeg"
OWNER_NAME = "ENG. Ahmed Yasser El Sharkawy"
def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str:
delays = [5, 10, 20]
attempt = 0
while True:
try:
resp = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
timeout=60,
)
return resp.choices[0].message.content
except Exception as e:
msg = str(e)
if "429" in msg or "Rate Limit" in msg:
if attempt < len(delays):
time.sleep(delays[attempt]); attempt += 1
continue
raise
def logo_data_uri(path: str) -> str:
if not os.path.exists(path):
return ""
ext = os.path.splitext(path)[1].lower()
mime = {
".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".webp": "image/webp", ".gif": "image/gif"
}.get(ext, "image/png")
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
return f"data:{mime};base64,{b64}"
def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str:
Embedder = EmbeddingModel(model_name=Embed_Model_Name)
RankerModel = Reranker(model_name=Reranker_Model_Name)
results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST)
Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N)
snippets, sources = [], []
for p in Top_sort_results:
txt = (p.get("text") or "").strip()
if not txt: continue
src = p.get("source")
if isinstance(src, str) and src: sources.append(src)
snippets.append(txt)
if not snippets:
return ("You are ghaymah expert . follow instraction to be strict RAG assistant. No context was retrieved from the vector store for this query. "
"If the answer is not present, say do not mention in ghaymah documentation.")
header = ("You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets. "
"If the answer is not present, say do not mention in ghaymah documentation.")
body_budget = max_total_chars - len(header)
body_parts, used = [], 0
for snip in snippets:
piece = snip + "\n\n"
if used + len(piece) <= body_budget:
body_parts.append(piece); used += len(piece)
else:
break
seen, uniq_sources = set(), []
for s in sources:
if s not in seen:
uniq_sources.append(s); seen.add(s)
footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else ""
return (header + "".join(body_parts) + footer).strip()
SYSTEM_SEED = "You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets."
def init_state():
return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None}
def can_call_now(state: dict) -> bool:
last = state.get("last_call_ts")
return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN
def record_call_time(state: dict):
state["last_call_ts"] = time.time()
def respond(user_message: str, state: dict):
missing = []
if not API_KEY: missing.append("API_KEY")
if not HOST: missing.append("HOST")
if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME")
if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME")
if missing:
return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state
state["messages"].append({"role": "user", "content": user_message})
if not can_call_now(state):
remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0)))
remaining = max(1, remaining)
msg = f"Rate limit in effect. Please wait ~{remaining} seconds."
state["messages"].append({"role": "assistant", "content": msg})
return msg, state
rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K)
msgs = [{"role": "system", "content": rag_ctx}]
msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:])
try:
reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000)
record_call_time(state)
except Exception as e:
reply = f"Request failed: {e}"
state["messages"].append({"role": "assistant", "content": reply})
return reply, state
# Gradio UI
with gr.Blocks(title=f"{APP_Name} v{APP_Version}", css=CSS) as demo:
header_logo_src = logo_data_uri(COMPANY_LOGO)
logo_html = f"<img src='{header_logo_src}' alt='logo'>" if header_logo_src else ""
gr.HTML(f"""
<div class="app-header">
{logo_html}
<div class="app-header-text">
<div class="app-title">{APP_Name}</div>
<div class="app-sub">v{APP_Version}{OWNER_NAME}</div>
</div>
</div>
""")
state = gr.State(init_state())
with gr.Row():
# LEFT: chat + input
with gr.Column(scale=3):
chatbot = gr.Chatbot(label="Chat", height=520, type="messages", value=[])
txt = gr.Textbox(
placeholder="Ask anything about the Ghaymah documentation…",
label="Your message",
lines=2,
autofocus=True,
)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear")
# RIGHT
with gr.Column(scale=1, min_width=300):
gr.Image(
value="download.jpeg",
interactive=False,
show_label=False,
container=False,
show_fullscreen_button=False,
)
gr.Markdown(
"Vector store: **Connected** \n"
f"Embedder: `{Embed_Model_Name or 'unset'}` \n"
f"RPM limit: **{RPM_LIMIT}** \n"
)
def _on_user_submit(user_input, chat_messages):
try:
if not user_input:
return "", (chat_messages or [])
chat_messages = chat_messages or []
updated = chat_messages + [{"role": "user", "content": user_input}]
# print("[on_submit] user:", user_input)
return "", updated
except Exception as e:
print("[on_submit][ERROR]", repr(e))
return user_input, (chat_messages or [])
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])
def _bot_step(chat_messages, state):
try:
chat_messages = chat_messages or []
last_user = None
for msg in reversed(chat_messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
last_user = msg["content"]
break
if last_user is None:
print("[bot_step] no user message found")
return chat_messages, state
# print("[bot_step] responding to:", last_user)
bot_reply, new_state = respond(last_user, state)
updated = chat_messages + [{"role": "assistant", "content": bot_reply}]
return updated, new_state
except Exception as e:
# print("[bot_step][ERROR]", repr(e))
updated = (chat_messages or []) + [
{"role": "assistant", "content": f"⚠️ Internal error: {e}"}
]
return updated, state
txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])\
.then(_bot_step, [chatbot, state], [chatbot, state])
send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])\
.then(_bot_step, [chatbot, state], [chatbot, state])
def _clear():
print("[clear] resetting state and chat")
return [], init_state()
clear_btn.click(_clear, outputs=[chatbot, state])
if __name__ == "__main__":
demo.queue()
demo.launch(debug=True,server_name="0.0.0.0" ,server_port=7860)