Hothaifa's picture
Update app.py
e494cc6 verified
import pandas as pd
#1
import datetime
import os
from dotenv import load_dotenv
from typing import Optional, List
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
import faiss
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import re, gdown, time
from rapidfuzz import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel
from collections import defaultdict, deque
from deep_translator import GoogleTranslator
from google import genai
from google.genai import types
DATA_DIR = "data"
os.makedirs(DATA_DIR, exist_ok=True)
load_dotenv()
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "TOKEN")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "CHAT_ID")
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
print(f"[DEBUG] Gemini Key Loaded: {'Yes' if GEMINI_API_KEY else 'No'}")
gemini_client = None
if GEMINI_API_KEY:
try:
gemini_client = genai.Client(api_key=GEMINI_API_KEY)
print("[INFO] Gemini Client Initialized Successfully.")
except Exception as e:
print(f"[ERROR] Failed to init Gemini Client: {e}")
app = FastAPI(title="Hajeen Islamic QA API")
DISCLAIMERS = {
"ar": "",
"de": "\n\n(Hinweis: Automatisch übersetzt. Konsultieren Sie das arabische Original.)",
"ru": "\n\n(Примечание: Автоматический перевод. См. оригинал на арабском.)",
"en": "\n\n(Note: Automatically translated. Refer to the Arabic original.)"
}
def translate_wrapper(text: str, source: str, target: str) -> str:
"""تستخدم لترجمة النصوص عند الحاجة"""
if not text or source == target:
return text
try:
return GoogleTranslator(source=source, target=target).translate(text[:4500])
except Exception as e:
print(f"[TRANSLATION ERROR] {e}")
return text
def translate_error_detail(detail: str, target_lang: str):
if not detail or target_lang == 'ar':
return detail
try:
return GoogleTranslator(source='ar', target=target_lang).translate(detail)
except Exception:
return detail
def clean_gemini_response(text: str) -> str:
"""تنظيف رد جيميني من النجوم وتنسيقات المارك داون المزعجة"""
if not text: return ""
text = text.replace("**", "")
text = re.sub(r"^\s*\*\s", "- ", text, flags=re.MULTILINE)
text = text.replace("##", "")
text = text.replace("#", "")
return text.strip()
def normalize_text(text: str) -> str:
text = re.sub(r"[\u064B-\u0652]", "", text)
text = re.sub(r"[إأآا]", "ا", text)
text = re.sub(r"[ى]", "ي", text)
text = re.sub(r"[ة]", "ه", text)
return " ".join(text.split()).lower()
def normalize_ar(s: str) -> str:
if not isinstance(s, str):
s = "" if s is None else str(s)
s = normalize_text(s)
s = re.sub(r"[^\u0621-\u064A0-9\s]", " ", s)
return re.sub(r"\s+", " ", s).strip()
# قوائم الحظر
FORBIDDEN_WORDS = {
"insult": {"حمار", "غبي", "كلب", "حقير", "شرموط", "شرموطة", "عاهرة", "قحبة", "كس", "لعنة", "خرا", "منيك"},
"fitna": {"طائفية", "داعش", "شيعة", "سنة", "إلحاد"},
"off_topic": {"ميسي", "رونالدو", "أفلام", "أغاني", "طقس", "بورصة"}
}
def check_query_safety(query: str):
nq = normalize_text(query)
whitelist = {"الله", "الرسول", "حكم", "صلاة", "صيام", "حديث"}
if any(w in nq for w in whitelist): return None
for category, words in FORBIDDEN_WORDS.items():
if any(w in nq for w in words):
return category
return None
def run_hadith_guard(q_norm: str):
if any(w in q_norm for w in FORBIDDEN_WORDS["insult"]):
return ("insult", "ألفاظ غير لائقة.")
if any(w in q_norm for w in FORBIDDEN_WORDS["fitna"]):
return ("fitna", "خارج النطاق.")
return (None, None)
# Rate Limiter
RATE_LIMIT_WINDOW = 60
RATE_LIMIT_MAX = 20
_request_log = defaultdict(deque)
def check_rate_limit(ip: str):
now = time.time()
dq = _request_log[ip]
while dq and now - dq[0] > RATE_LIMIT_WINDOW:
dq.popleft()
if len(dq) >= RATE_LIMIT_MAX:
raise HTTPException(status_code=429, detail="Rate limit exceeded.")
dq.append(now)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_FILE_ID = "1GMG6fVxhUuBEAHP91c8RAUdUJh5TxY5O"
EMBEDDINGS_FILE_ID = "1MCIJ4zZRfTC9ZEy-CLvcvNbRdjTFnw5q"
ID_BUKHARI = os.environ.get("ID_BUKHARI")
ID_MUSLIM = os.environ.get("ID_MUSLIM")
ID_MUSNAD = os.environ.get("ID_MUSNAD")
data_path = os.path.join(DATA_DIR, 'cleaned_fatwas_v2.csv')
embeddings_path = os.path.join(DATA_DIR, 'questions_embeddings_arabert.npy')
learned_data_path = os.path.join(DATA_DIR, 'learned_fatwas.csv')
FEEDBACK_FILE = os.path.join(DATA_DIR, "feedback.csv")
PATHS = {
"bukhari": os.path.join(DATA_DIR, "sahih_bukhari_clean.csv"),
"muslim": os.path.join(DATA_DIR, "sahih_muslim_clean.csv"),
"musnad": os.path.join(DATA_DIR, "musnad_ahmed_clean.csv"),
}
def safe_download(file_id, output_path):
if not file_id or os.path.exists(output_path): return
try:
gdown.download(id=file_id, output=output_path, quiet=False)
except Exception as e:
print(f"[DOWNLOAD ERROR] {e}")
model_name = 'aubmindlab/bert-base-arabertv2'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
vectorizer = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1)
df_main = pd.DataFrame()
df_learned = pd.DataFrame()
df_all = pd.DataFrame()
tfidf_matrix = None
index = None
question_embeddings = None
def load_hadith_corpora(paths: dict) -> pd.DataFrame:
all_dfs = []
for src, path in paths.items():
if os.path.exists(path):
try:
df = pd.read_csv(path)
if "matn" in df.columns: df["matn_full"] = df["matn"]
if "matn_clean" not in df.columns:
df["matn_clean"] = df["matn_full"].fillna("").apply(normalize_ar)
df["source"] = src
df["hadith_number"] = df["hadith_number"].astype(str).str.replace(r"\D","",regex=True)
if "grading" not in df.columns:
df["grading"] = ""
else:
df["grading"] = df["grading"].fillna("").astype(str)
all_dfs.append(df[["source","hadith_number","matn_full","matn_clean","grading"]])
except Exception as e:
print(f"Error loading {src}: {e}")
if all_dfs:
return pd.concat(all_dfs, ignore_index=True)
return pd.DataFrame(columns=["source","hadith_number","matn_full","matn_clean","grading"])
@app.on_event("startup")
async def startup_event():
global df_main, df_learned, df_all, tfidf_matrix, index, question_embeddings
print("[STARTUP] Downloading assets...")
safe_download(DATA_FILE_ID, data_path)
safe_download(EMBEDDINGS_FILE_ID, embeddings_path)
safe_download(ID_BUKHARI, PATHS["bukhari"])
safe_download(ID_MUSLIM, PATHS["muslim"])
safe_download(ID_MUSNAD, PATHS["musnad"])
if os.path.exists(data_path):
df_main = pd.read_csv(data_path)
if os.path.exists(learned_data_path):
df_learned = pd.read_csv(learned_data_path)
else:
df_learned = pd.DataFrame(columns=['question','answer','source_url','title','word_count','score'])
if "score" not in df_learned.columns: df_learned["score"] = 50
if os.path.exists(embeddings_path):
question_embeddings = np.load(embeddings_path)
index = faiss.IndexFlatL2(question_embeddings.shape[1])
index.add(question_embeddings.astype('float32'))
df_all = load_hadith_corpora(PATHS)
if not df_all.empty:
tfidf_matrix = vectorizer.fit_transform(df_all["matn_clean"])
print(f"[STARTUP] Hadith Database loaded: {len(df_all)} records. TF-IDF ready.")
else:
print("[STARTUP] WARNING: Hadith database is empty!")
print("[STARTUP] All systems ready.")
def get_embedding_for_query(text: str):
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
outputs = model(**inputs)
return outputs.last_hidden_state[:, 0, :].cpu().numpy()
def ask_gemini_with_search(query: str, lang: str = "ar"):
if not gemini_client: return None
model_id = "gemini-2.0-flash"
sys_instruction = f"""
أنت مساعد إسلامي فقيه ومحترم.
مهمتك: الإجابة على الأسئلة الشرعية والفتاوى بدقة بناءً على نتائج البحث الموثوقة.
تعليمات هامة جداً:
. أنت مختص في الفتاوي والأسألة الشرعية لا تدخل في أمور أخرى، إذا كان السؤال خارج نطاق الشريعة او أنه لا يبدو ك فتوى او سؤال ديني أو أستفسار ديني لا تجيب مثال إذا كان السؤال (ما هي عاصمة سوريا، كم سعر الدولار، هل أنت نموذج لغوي، هل يمكنك تعديل كود،) أعتذر بأدب وقول (أنا نموذج لغوي تعلمت على الأجابة على الأسألة الشرعية والدينية من مصادر أهل السنة والجماعة، أما سؤالك فيمكنك العثور على أجابة له في Google أو من Gemini أو أنماط أخرى).
. أذا تم الأستفسار عن أمر ديني أجب وأحرص على الشرح أذا كان أستفسار وليس سؤال، أما أذا سؤال لا تكثر من الحشو وأعط الزبدة مع الحفاظ على جمالية الجواب.
. المستخدم ليس دائماً على حق، ربما يسهو أو يخطأ، أذا اخطأ صحح له أتباعاً لمنهج أهل السنة والجماعة فقط، وحاول مساعدته في العثور على أجابة (دينية فقط) أما دون ذلك أجب بأعتذار كما ذكرت سابقاً.
. أحرص على عدم الأجابة على اسألة سياسية او علمية.
. أذا تم فتح نقاش معك قل أنا لا أصلح للنقاشات (أنا فقط أبحث وأجيب على اسألة فتوى او أستفسار) وشكراً
. ميولك يجب أن تكون لمنهج أهل السنة والجماعة فقط أما دون ذلك أعتذر بأدب وقل هداكم الله.
. أبدأ أجاباتك دائما بـ (الحمدلله والصلاة والسلام على رسول الله أما بعد : واختم جوابك بـ (والله أعلم).
. لغة الإجابة: المستخدم يسأل بلغة الكود ({lang}). يجب أن تكون إجابتك بالكامل بهذه اللغة ({lang}). لا تجب بالعربية إذا كان السؤال بغيرها.
. استخدم "بحث Google" دائماً للتأكد من المعلومات من مصادر مثل (إسلام ويب، الإسلام سؤال وجواب، ابن باز).
. الاختصار المفيد: لا تكثر من الحشو، وأعط الزبدة مع الدليل.
. اذكر دائماً أن مصادرك هي محرك بحث Google وبالتحديد موقعين أسلام ويب وإسلام سؤال وجواب .
"""
try:
response = gemini_client.models.generate_content(
model=model_id,
contents=[types.Content(role="user", parts=[types.Part.from_text(text=f"{sys_instruction}\n\nالسؤال: {query}")] )],
config=types.GenerateContentConfig(
tools=[types.Tool(google_search=types.GoogleSearch())],
temperature=0.3
)
)
raw_text = ""
if response.text:
raw_text = response.text
elif response.candidates and response.candidates[0].content.parts:
for part in response.candidates[0].content.parts:
if part.text: raw_text += part.text
clean_text = clean_gemini_response(raw_text)
return clean_text
except Exception as e:
print(f"[GEMINI ERROR] {e}")
return None
class SearchRequest(BaseModel):
query: str
top_k: int = 1
lang: str = "ar"
class FeedbackRequest(BaseModel):
question: str
answer: str
useful: str
comment: str = ""
class HadithSearchRequest(BaseModel):
query: str
top_k: int = 5
sources: Optional[list[str]] = None
lang: str = "ar"
class HadithByIdRequest(BaseModel):
source: Optional[str] = None
hadith_number: str
@app.post("/search")
def search(request: SearchRequest):
q = request.query.strip()
target_lang = request.lang or "ar"
guard = check_query_safety(q)
if guard:
error_msg = translate_error_detail("عذراً، السؤال غير مناسب.", target_lang)
raise HTTPException(status_code=400, detail=error_msg)
if not df_learned.empty:
row = df_learned[df_learned["question"] == q]
if not row.empty:
ans = row.iloc[0]["answer"]
if target_lang != "ar":
ans = translate_wrapper(ans, "ar", target_lang)
return {"results": [{
"question": q,
"answer": ans,
"source": "قاعدة تعلم ذاتي",
"score": 100
}]}
is_arabic_query = any("\u0600" <= c <= "\u06FF" for c in q)
if is_arabic_query and index is not None:
query_emb = get_embedding_for_query(q)
distances, indices = index.search(query_emb.astype('float32'), request.top_k)
similarity = 1 / (1 + distances[0][0]) * 100
if similarity > 85:
idx = indices[0][0]
result = df_main.iloc[idx]
final_ans = result["الجواب"]
if target_lang != "ar":
final_ans = translate_wrapper(final_ans, "ar", target_lang) + DISCLAIMERS.get(target_lang, "")
return {"results": [{
"question": q,
"answer": final_ans,
"source": "قاعدة البيانات الأساسية (محلي)",
"score": int(similarity)
}]}
print(f"[INFO] Asking Gemini: {q} (Lang: {target_lang})")
gemini_answer = ask_gemini_with_search(q, lang=target_lang)
if gemini_answer:
is_response_arabic = any("\u0600" <= c <= "\u06FF" for c in gemini_answer[:50])
if target_lang != "ar" and is_response_arabic:
print("[INFO] Gemini returned Arabic despite instruction. Translating manually...")
gemini_answer = translate_wrapper(gemini_answer, "ar", target_lang)
new_row = pd.DataFrame([{
"question": q, "answer": gemini_answer,
"source_url": "Gemini", "title": "Gemini",
"word_count": len(gemini_answer.split()), "score": 90
}])
new_row.to_csv(learned_data_path, mode='a', header=not os.path.exists(learned_data_path), index=False)
return {"results": [{
"question": q,
"answer": gemini_answer,
"source": "Gemini AI (Web Search)",
"score": 95
}]}
error_msg = translate_error_detail("لم يتم العثور على إجابة.", target_lang)
raise HTTPException(status_code=404, detail=error_msg)
@app.post("/feedback")
def feedback(req: FeedbackRequest):
if not os.path.exists(FEEDBACK_FILE):
pd.DataFrame([req.dict()]).to_csv(FEEDBACK_FILE, index=False)
else:
pd.DataFrame([req.dict()]).to_csv(FEEDBACK_FILE, mode='a', header=False, index=False)
return {"message": "تم حفظ التقييم."}
SOURCE_ALIAS = {
"bukhari": "صحيح البخاري", "muslim": "صحيح مسلم", "musnad": "مسند أحمد",
"صحيح البخاري": "صحيح البخاري", "صحيح مسلم": "صحيح مسلم", "مسند أحمد": "مسند أحمد"
}
VALID_SOURCES = set(SOURCE_ALIAS.values())
def normalize_sources(sources):
if not sources: return None
norm = set()
for s in sources:
if not s: continue
key = str(s).strip().lower()
mapped = SOURCE_ALIAS.get(key)
if mapped in VALID_SOURCES:
norm.add(mapped)
return norm or None
def filter_mask_by_sources(df, sources):
wanted = normalize_sources(sources)
if not wanted:
return pd.Series(True, index=df.index)
return df["source"].isin(wanted)
def build_result_row(row, score):
return {
"source": row["source"],
"hadith_number": str(row["hadith_number"]),
"matn": row["matn_full"][:1000] + "..." if len(row["matn_full"]) > 1000 else row["matn_full"],
"grading": row.get("grading", ""),
"score": round(float(score), 2)
}
@app.post("/hadith/search")
def hadith_search(req: HadithSearchRequest, request: Request):
ip = request.client.host or "unknown"
check_rate_limit(ip)
lang = req.lang or "ar"
q = (req.query or "").strip()
q_for_search = q
if lang != "ar":
q_for_search = translate_wrapper(q, source=lang, target="ar")
print(f"[HADITH-LANG] {q} -> {q_for_search}")
q_norm = normalize_ar(q_for_search)
if not hasattr(vectorizer, 'vocabulary_') or tfidf_matrix is None or df_all.empty:
error_ar = "عفواً، بيانات الأحاديث لم يتم تحميلها بشكل صحيح على الخادم (خطأ في الإعداد)."
error_detail = translate_error_detail(error_ar, lang)
print(f"[ERROR] Hadith data (TFIDF/df_all) not loaded or globals missing.")
raise HTTPException(status_code=500, detail=error_detail)
kind, msg = run_hadith_guard(q_norm)
if kind:
error_detail = translate_error_detail(msg, lang)
raise HTTPException(status_code=400, detail=error_detail)
words = q_norm.split()
if len(words) < 4:
error_ar = "أدخل 4 كلمات على الأقل من نص الحديث (بالعربية)."
error_detail = translate_error_detail(error_ar, lang)
raise HTTPException(status_code=400, detail=error_detail)
if len(words) > 15:
q_norm = " ".join(words[:15])
try:
mask = filter_mask_by_sources(df_all, req.sources)
idxs = df_all[mask].index.values
if len(idxs) == 0:
error_ar = "لا توجد مصادر مطابقة للفلتر."
error_detail = translate_error_detail(error_ar, lang)
raise HTTPException(status_code=404, detail=error_detail)
q_vec = vectorizer.transform([q_norm])
sims = linear_kernel(q_vec, tfidf_matrix[idxs]).flatten()
top_idx_local = sims.argsort()[::-1][:20]
candidates = []
for i in top_idx_local:
gi = idxs[i]
row = df_all.iloc[gi]
base_score = sims[i] * 100
fuzz_score = fuzz.token_set_ratio(q_norm, row["matn_clean"])
source_bonus = 15 if row["source"] == "صحيح البخاري" else (10 if row["source"] == "صحيح مسلم" else 0)
grading_val = str(row.get("grading", "") or "")
grading_bonus = 5 if "صحيح" in grading_val else 0
final_score = base_score * 0.5 + fuzz_score * 0.5 + source_bonus + grading_bonus
candidates.append((row, final_score))
candidates = sorted(candidates, key=lambda x: x[1], reverse=True)[:(req.top_k or 5)]
results = []
for row, score in candidates:
res = build_result_row(row, score)
if lang != "ar":
res["matn"] = translate_wrapper(res["matn"], "ar", lang) + DISCLAIMERS.get(lang, "")
results.append(res)
return {"query": q, "results": results}
except Exception as e:
print(f"[HADITH-SEARCH-CRITICAL-FAIL] Error: {e}")
error_ar = "حدث خطأ داخلي غير متوقع أثناء معالجة البحث عن الأحاديث."
error_detail = translate_error_detail(error_ar, lang)
raise HTTPException(status_code=500, detail=error_detail)
@app.post("/hadith/by_id")
def hadith_by_id(req: HadithByIdRequest, request: Request):
check_rate_limit(request.client.host or "unknown")
if df_all.empty: raise HTTPException(status_code=503, detail="Database not ready")
num = re.sub(r"\D", "", str(req.hadith_number))
mask = df_all["hadith_number"] == num
rows = df_all[mask]
if rows.empty:
raise HTTPException(status_code=404, detail="Not found")
return [build_result_row(r, 100) for _, r in rows.iterrows()]