Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| #1 | |
| import datetime | |
| import os | |
| from dotenv import load_dotenv | |
| from typing import Optional, List | |
| import numpy as np | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel | |
| import faiss | |
| from fastapi import FastAPI, HTTPException, Request | |
| from pydantic import BaseModel | |
| import re, gdown, time | |
| from rapidfuzz import fuzz | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import linear_kernel | |
| from collections import defaultdict, deque | |
| from deep_translator import GoogleTranslator | |
| from google import genai | |
| from google.genai import types | |
| DATA_DIR = "data" | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| load_dotenv() | |
| TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "TOKEN") | |
| TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "CHAT_ID") | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| print(f"[DEBUG] Gemini Key Loaded: {'Yes' if GEMINI_API_KEY else 'No'}") | |
| gemini_client = None | |
| if GEMINI_API_KEY: | |
| try: | |
| gemini_client = genai.Client(api_key=GEMINI_API_KEY) | |
| print("[INFO] Gemini Client Initialized Successfully.") | |
| except Exception as e: | |
| print(f"[ERROR] Failed to init Gemini Client: {e}") | |
| app = FastAPI(title="Hajeen Islamic QA API") | |
| DISCLAIMERS = { | |
| "ar": "", | |
| "de": "\n\n(Hinweis: Automatisch übersetzt. Konsultieren Sie das arabische Original.)", | |
| "ru": "\n\n(Примечание: Автоматический перевод. См. оригинал на арабском.)", | |
| "en": "\n\n(Note: Automatically translated. Refer to the Arabic original.)" | |
| } | |
| def translate_wrapper(text: str, source: str, target: str) -> str: | |
| """تستخدم لترجمة النصوص عند الحاجة""" | |
| if not text or source == target: | |
| return text | |
| try: | |
| return GoogleTranslator(source=source, target=target).translate(text[:4500]) | |
| except Exception as e: | |
| print(f"[TRANSLATION ERROR] {e}") | |
| return text | |
| def translate_error_detail(detail: str, target_lang: str): | |
| if not detail or target_lang == 'ar': | |
| return detail | |
| try: | |
| return GoogleTranslator(source='ar', target=target_lang).translate(detail) | |
| except Exception: | |
| return detail | |
| def clean_gemini_response(text: str) -> str: | |
| """تنظيف رد جيميني من النجوم وتنسيقات المارك داون المزعجة""" | |
| if not text: return "" | |
| text = text.replace("**", "") | |
| text = re.sub(r"^\s*\*\s", "- ", text, flags=re.MULTILINE) | |
| text = text.replace("##", "") | |
| text = text.replace("#", "") | |
| return text.strip() | |
| def normalize_text(text: str) -> str: | |
| text = re.sub(r"[\u064B-\u0652]", "", text) | |
| text = re.sub(r"[إأآا]", "ا", text) | |
| text = re.sub(r"[ى]", "ي", text) | |
| text = re.sub(r"[ة]", "ه", text) | |
| return " ".join(text.split()).lower() | |
| def normalize_ar(s: str) -> str: | |
| if not isinstance(s, str): | |
| s = "" if s is None else str(s) | |
| s = normalize_text(s) | |
| s = re.sub(r"[^\u0621-\u064A0-9\s]", " ", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| # قوائم الحظر | |
| FORBIDDEN_WORDS = { | |
| "insult": {"حمار", "غبي", "كلب", "حقير", "شرموط", "شرموطة", "عاهرة", "قحبة", "كس", "لعنة", "خرا", "منيك"}, | |
| "fitna": {"طائفية", "داعش", "شيعة", "سنة", "إلحاد"}, | |
| "off_topic": {"ميسي", "رونالدو", "أفلام", "أغاني", "طقس", "بورصة"} | |
| } | |
| def check_query_safety(query: str): | |
| nq = normalize_text(query) | |
| whitelist = {"الله", "الرسول", "حكم", "صلاة", "صيام", "حديث"} | |
| if any(w in nq for w in whitelist): return None | |
| for category, words in FORBIDDEN_WORDS.items(): | |
| if any(w in nq for w in words): | |
| return category | |
| return None | |
| def run_hadith_guard(q_norm: str): | |
| if any(w in q_norm for w in FORBIDDEN_WORDS["insult"]): | |
| return ("insult", "ألفاظ غير لائقة.") | |
| if any(w in q_norm for w in FORBIDDEN_WORDS["fitna"]): | |
| return ("fitna", "خارج النطاق.") | |
| return (None, None) | |
| # Rate Limiter | |
| RATE_LIMIT_WINDOW = 60 | |
| RATE_LIMIT_MAX = 20 | |
| _request_log = defaultdict(deque) | |
| def check_rate_limit(ip: str): | |
| now = time.time() | |
| dq = _request_log[ip] | |
| while dq and now - dq[0] > RATE_LIMIT_WINDOW: | |
| dq.popleft() | |
| if len(dq) >= RATE_LIMIT_MAX: | |
| raise HTTPException(status_code=429, detail="Rate limit exceeded.") | |
| dq.append(now) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| DATA_FILE_ID = "1GMG6fVxhUuBEAHP91c8RAUdUJh5TxY5O" | |
| EMBEDDINGS_FILE_ID = "1MCIJ4zZRfTC9ZEy-CLvcvNbRdjTFnw5q" | |
| ID_BUKHARI = os.environ.get("ID_BUKHARI") | |
| ID_MUSLIM = os.environ.get("ID_MUSLIM") | |
| ID_MUSNAD = os.environ.get("ID_MUSNAD") | |
| data_path = os.path.join(DATA_DIR, 'cleaned_fatwas_v2.csv') | |
| embeddings_path = os.path.join(DATA_DIR, 'questions_embeddings_arabert.npy') | |
| learned_data_path = os.path.join(DATA_DIR, 'learned_fatwas.csv') | |
| FEEDBACK_FILE = os.path.join(DATA_DIR, "feedback.csv") | |
| PATHS = { | |
| "bukhari": os.path.join(DATA_DIR, "sahih_bukhari_clean.csv"), | |
| "muslim": os.path.join(DATA_DIR, "sahih_muslim_clean.csv"), | |
| "musnad": os.path.join(DATA_DIR, "musnad_ahmed_clean.csv"), | |
| } | |
| def safe_download(file_id, output_path): | |
| if not file_id or os.path.exists(output_path): return | |
| try: | |
| gdown.download(id=file_id, output=output_path, quiet=False) | |
| except Exception as e: | |
| print(f"[DOWNLOAD ERROR] {e}") | |
| model_name = 'aubmindlab/bert-base-arabertv2' | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModel.from_pretrained(model_name).to(device) | |
| vectorizer = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1) | |
| df_main = pd.DataFrame() | |
| df_learned = pd.DataFrame() | |
| df_all = pd.DataFrame() | |
| tfidf_matrix = None | |
| index = None | |
| question_embeddings = None | |
| def load_hadith_corpora(paths: dict) -> pd.DataFrame: | |
| all_dfs = [] | |
| for src, path in paths.items(): | |
| if os.path.exists(path): | |
| try: | |
| df = pd.read_csv(path) | |
| if "matn" in df.columns: df["matn_full"] = df["matn"] | |
| if "matn_clean" not in df.columns: | |
| df["matn_clean"] = df["matn_full"].fillna("").apply(normalize_ar) | |
| df["source"] = src | |
| df["hadith_number"] = df["hadith_number"].astype(str).str.replace(r"\D","",regex=True) | |
| if "grading" not in df.columns: | |
| df["grading"] = "" | |
| else: | |
| df["grading"] = df["grading"].fillna("").astype(str) | |
| all_dfs.append(df[["source","hadith_number","matn_full","matn_clean","grading"]]) | |
| except Exception as e: | |
| print(f"Error loading {src}: {e}") | |
| if all_dfs: | |
| return pd.concat(all_dfs, ignore_index=True) | |
| return pd.DataFrame(columns=["source","hadith_number","matn_full","matn_clean","grading"]) | |
| async def startup_event(): | |
| global df_main, df_learned, df_all, tfidf_matrix, index, question_embeddings | |
| print("[STARTUP] Downloading assets...") | |
| safe_download(DATA_FILE_ID, data_path) | |
| safe_download(EMBEDDINGS_FILE_ID, embeddings_path) | |
| safe_download(ID_BUKHARI, PATHS["bukhari"]) | |
| safe_download(ID_MUSLIM, PATHS["muslim"]) | |
| safe_download(ID_MUSNAD, PATHS["musnad"]) | |
| if os.path.exists(data_path): | |
| df_main = pd.read_csv(data_path) | |
| if os.path.exists(learned_data_path): | |
| df_learned = pd.read_csv(learned_data_path) | |
| else: | |
| df_learned = pd.DataFrame(columns=['question','answer','source_url','title','word_count','score']) | |
| if "score" not in df_learned.columns: df_learned["score"] = 50 | |
| if os.path.exists(embeddings_path): | |
| question_embeddings = np.load(embeddings_path) | |
| index = faiss.IndexFlatL2(question_embeddings.shape[1]) | |
| index.add(question_embeddings.astype('float32')) | |
| df_all = load_hadith_corpora(PATHS) | |
| if not df_all.empty: | |
| tfidf_matrix = vectorizer.fit_transform(df_all["matn_clean"]) | |
| print(f"[STARTUP] Hadith Database loaded: {len(df_all)} records. TF-IDF ready.") | |
| else: | |
| print("[STARTUP] WARNING: Hadith database is empty!") | |
| print("[STARTUP] All systems ready.") | |
| def get_embedding_for_query(text: str): | |
| inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| return outputs.last_hidden_state[:, 0, :].cpu().numpy() | |
| def ask_gemini_with_search(query: str, lang: str = "ar"): | |
| if not gemini_client: return None | |
| model_id = "gemini-2.0-flash" | |
| sys_instruction = f""" | |
| أنت مساعد إسلامي فقيه ومحترم. | |
| مهمتك: الإجابة على الأسئلة الشرعية والفتاوى بدقة بناءً على نتائج البحث الموثوقة. | |
| تعليمات هامة جداً: | |
| . أنت مختص في الفتاوي والأسألة الشرعية لا تدخل في أمور أخرى، إذا كان السؤال خارج نطاق الشريعة او أنه لا يبدو ك فتوى او سؤال ديني أو أستفسار ديني لا تجيب مثال إذا كان السؤال (ما هي عاصمة سوريا، كم سعر الدولار، هل أنت نموذج لغوي، هل يمكنك تعديل كود،) أعتذر بأدب وقول (أنا نموذج لغوي تعلمت على الأجابة على الأسألة الشرعية والدينية من مصادر أهل السنة والجماعة، أما سؤالك فيمكنك العثور على أجابة له في Google أو من Gemini أو أنماط أخرى). | |
| . أذا تم الأستفسار عن أمر ديني أجب وأحرص على الشرح أذا كان أستفسار وليس سؤال، أما أذا سؤال لا تكثر من الحشو وأعط الزبدة مع الحفاظ على جمالية الجواب. | |
| . المستخدم ليس دائماً على حق، ربما يسهو أو يخطأ، أذا اخطأ صحح له أتباعاً لمنهج أهل السنة والجماعة فقط، وحاول مساعدته في العثور على أجابة (دينية فقط) أما دون ذلك أجب بأعتذار كما ذكرت سابقاً. | |
| . أحرص على عدم الأجابة على اسألة سياسية او علمية. | |
| . أذا تم فتح نقاش معك قل أنا لا أصلح للنقاشات (أنا فقط أبحث وأجيب على اسألة فتوى او أستفسار) وشكراً | |
| . ميولك يجب أن تكون لمنهج أهل السنة والجماعة فقط أما دون ذلك أعتذر بأدب وقل هداكم الله. | |
| . أبدأ أجاباتك دائما بـ (الحمدلله والصلاة والسلام على رسول الله أما بعد : واختم جوابك بـ (والله أعلم). | |
| . لغة الإجابة: المستخدم يسأل بلغة الكود ({lang}). يجب أن تكون إجابتك بالكامل بهذه اللغة ({lang}). لا تجب بالعربية إذا كان السؤال بغيرها. | |
| . استخدم "بحث Google" دائماً للتأكد من المعلومات من مصادر مثل (إسلام ويب، الإسلام سؤال وجواب، ابن باز). | |
| . الاختصار المفيد: لا تكثر من الحشو، وأعط الزبدة مع الدليل. | |
| . اذكر دائماً أن مصادرك هي محرك بحث Google وبالتحديد موقعين أسلام ويب وإسلام سؤال وجواب . | |
| """ | |
| try: | |
| response = gemini_client.models.generate_content( | |
| model=model_id, | |
| contents=[types.Content(role="user", parts=[types.Part.from_text(text=f"{sys_instruction}\n\nالسؤال: {query}")] )], | |
| config=types.GenerateContentConfig( | |
| tools=[types.Tool(google_search=types.GoogleSearch())], | |
| temperature=0.3 | |
| ) | |
| ) | |
| raw_text = "" | |
| if response.text: | |
| raw_text = response.text | |
| elif response.candidates and response.candidates[0].content.parts: | |
| for part in response.candidates[0].content.parts: | |
| if part.text: raw_text += part.text | |
| clean_text = clean_gemini_response(raw_text) | |
| return clean_text | |
| except Exception as e: | |
| print(f"[GEMINI ERROR] {e}") | |
| return None | |
| class SearchRequest(BaseModel): | |
| query: str | |
| top_k: int = 1 | |
| lang: str = "ar" | |
| class FeedbackRequest(BaseModel): | |
| question: str | |
| answer: str | |
| useful: str | |
| comment: str = "" | |
| class HadithSearchRequest(BaseModel): | |
| query: str | |
| top_k: int = 5 | |
| sources: Optional[list[str]] = None | |
| lang: str = "ar" | |
| class HadithByIdRequest(BaseModel): | |
| source: Optional[str] = None | |
| hadith_number: str | |
| def search(request: SearchRequest): | |
| q = request.query.strip() | |
| target_lang = request.lang or "ar" | |
| guard = check_query_safety(q) | |
| if guard: | |
| error_msg = translate_error_detail("عذراً، السؤال غير مناسب.", target_lang) | |
| raise HTTPException(status_code=400, detail=error_msg) | |
| if not df_learned.empty: | |
| row = df_learned[df_learned["question"] == q] | |
| if not row.empty: | |
| ans = row.iloc[0]["answer"] | |
| if target_lang != "ar": | |
| ans = translate_wrapper(ans, "ar", target_lang) | |
| return {"results": [{ | |
| "question": q, | |
| "answer": ans, | |
| "source": "قاعدة تعلم ذاتي", | |
| "score": 100 | |
| }]} | |
| is_arabic_query = any("\u0600" <= c <= "\u06FF" for c in q) | |
| if is_arabic_query and index is not None: | |
| query_emb = get_embedding_for_query(q) | |
| distances, indices = index.search(query_emb.astype('float32'), request.top_k) | |
| similarity = 1 / (1 + distances[0][0]) * 100 | |
| if similarity > 85: | |
| idx = indices[0][0] | |
| result = df_main.iloc[idx] | |
| final_ans = result["الجواب"] | |
| if target_lang != "ar": | |
| final_ans = translate_wrapper(final_ans, "ar", target_lang) + DISCLAIMERS.get(target_lang, "") | |
| return {"results": [{ | |
| "question": q, | |
| "answer": final_ans, | |
| "source": "قاعدة البيانات الأساسية (محلي)", | |
| "score": int(similarity) | |
| }]} | |
| print(f"[INFO] Asking Gemini: {q} (Lang: {target_lang})") | |
| gemini_answer = ask_gemini_with_search(q, lang=target_lang) | |
| if gemini_answer: | |
| is_response_arabic = any("\u0600" <= c <= "\u06FF" for c in gemini_answer[:50]) | |
| if target_lang != "ar" and is_response_arabic: | |
| print("[INFO] Gemini returned Arabic despite instruction. Translating manually...") | |
| gemini_answer = translate_wrapper(gemini_answer, "ar", target_lang) | |
| new_row = pd.DataFrame([{ | |
| "question": q, "answer": gemini_answer, | |
| "source_url": "Gemini", "title": "Gemini", | |
| "word_count": len(gemini_answer.split()), "score": 90 | |
| }]) | |
| new_row.to_csv(learned_data_path, mode='a', header=not os.path.exists(learned_data_path), index=False) | |
| return {"results": [{ | |
| "question": q, | |
| "answer": gemini_answer, | |
| "source": "Gemini AI (Web Search)", | |
| "score": 95 | |
| }]} | |
| error_msg = translate_error_detail("لم يتم العثور على إجابة.", target_lang) | |
| raise HTTPException(status_code=404, detail=error_msg) | |
| def feedback(req: FeedbackRequest): | |
| if not os.path.exists(FEEDBACK_FILE): | |
| pd.DataFrame([req.dict()]).to_csv(FEEDBACK_FILE, index=False) | |
| else: | |
| pd.DataFrame([req.dict()]).to_csv(FEEDBACK_FILE, mode='a', header=False, index=False) | |
| return {"message": "تم حفظ التقييم."} | |
| SOURCE_ALIAS = { | |
| "bukhari": "صحيح البخاري", "muslim": "صحيح مسلم", "musnad": "مسند أحمد", | |
| "صحيح البخاري": "صحيح البخاري", "صحيح مسلم": "صحيح مسلم", "مسند أحمد": "مسند أحمد" | |
| } | |
| VALID_SOURCES = set(SOURCE_ALIAS.values()) | |
| def normalize_sources(sources): | |
| if not sources: return None | |
| norm = set() | |
| for s in sources: | |
| if not s: continue | |
| key = str(s).strip().lower() | |
| mapped = SOURCE_ALIAS.get(key) | |
| if mapped in VALID_SOURCES: | |
| norm.add(mapped) | |
| return norm or None | |
| def filter_mask_by_sources(df, sources): | |
| wanted = normalize_sources(sources) | |
| if not wanted: | |
| return pd.Series(True, index=df.index) | |
| return df["source"].isin(wanted) | |
| def build_result_row(row, score): | |
| return { | |
| "source": row["source"], | |
| "hadith_number": str(row["hadith_number"]), | |
| "matn": row["matn_full"][:1000] + "..." if len(row["matn_full"]) > 1000 else row["matn_full"], | |
| "grading": row.get("grading", ""), | |
| "score": round(float(score), 2) | |
| } | |
| def hadith_search(req: HadithSearchRequest, request: Request): | |
| ip = request.client.host or "unknown" | |
| check_rate_limit(ip) | |
| lang = req.lang or "ar" | |
| q = (req.query or "").strip() | |
| q_for_search = q | |
| if lang != "ar": | |
| q_for_search = translate_wrapper(q, source=lang, target="ar") | |
| print(f"[HADITH-LANG] {q} -> {q_for_search}") | |
| q_norm = normalize_ar(q_for_search) | |
| if not hasattr(vectorizer, 'vocabulary_') or tfidf_matrix is None or df_all.empty: | |
| error_ar = "عفواً، بيانات الأحاديث لم يتم تحميلها بشكل صحيح على الخادم (خطأ في الإعداد)." | |
| error_detail = translate_error_detail(error_ar, lang) | |
| print(f"[ERROR] Hadith data (TFIDF/df_all) not loaded or globals missing.") | |
| raise HTTPException(status_code=500, detail=error_detail) | |
| kind, msg = run_hadith_guard(q_norm) | |
| if kind: | |
| error_detail = translate_error_detail(msg, lang) | |
| raise HTTPException(status_code=400, detail=error_detail) | |
| words = q_norm.split() | |
| if len(words) < 4: | |
| error_ar = "أدخل 4 كلمات على الأقل من نص الحديث (بالعربية)." | |
| error_detail = translate_error_detail(error_ar, lang) | |
| raise HTTPException(status_code=400, detail=error_detail) | |
| if len(words) > 15: | |
| q_norm = " ".join(words[:15]) | |
| try: | |
| mask = filter_mask_by_sources(df_all, req.sources) | |
| idxs = df_all[mask].index.values | |
| if len(idxs) == 0: | |
| error_ar = "لا توجد مصادر مطابقة للفلتر." | |
| error_detail = translate_error_detail(error_ar, lang) | |
| raise HTTPException(status_code=404, detail=error_detail) | |
| q_vec = vectorizer.transform([q_norm]) | |
| sims = linear_kernel(q_vec, tfidf_matrix[idxs]).flatten() | |
| top_idx_local = sims.argsort()[::-1][:20] | |
| candidates = [] | |
| for i in top_idx_local: | |
| gi = idxs[i] | |
| row = df_all.iloc[gi] | |
| base_score = sims[i] * 100 | |
| fuzz_score = fuzz.token_set_ratio(q_norm, row["matn_clean"]) | |
| source_bonus = 15 if row["source"] == "صحيح البخاري" else (10 if row["source"] == "صحيح مسلم" else 0) | |
| grading_val = str(row.get("grading", "") or "") | |
| grading_bonus = 5 if "صحيح" in grading_val else 0 | |
| final_score = base_score * 0.5 + fuzz_score * 0.5 + source_bonus + grading_bonus | |
| candidates.append((row, final_score)) | |
| candidates = sorted(candidates, key=lambda x: x[1], reverse=True)[:(req.top_k or 5)] | |
| results = [] | |
| for row, score in candidates: | |
| res = build_result_row(row, score) | |
| if lang != "ar": | |
| res["matn"] = translate_wrapper(res["matn"], "ar", lang) + DISCLAIMERS.get(lang, "") | |
| results.append(res) | |
| return {"query": q, "results": results} | |
| except Exception as e: | |
| print(f"[HADITH-SEARCH-CRITICAL-FAIL] Error: {e}") | |
| error_ar = "حدث خطأ داخلي غير متوقع أثناء معالجة البحث عن الأحاديث." | |
| error_detail = translate_error_detail(error_ar, lang) | |
| raise HTTPException(status_code=500, detail=error_detail) | |
| def hadith_by_id(req: HadithByIdRequest, request: Request): | |
| check_rate_limit(request.client.host or "unknown") | |
| if df_all.empty: raise HTTPException(status_code=503, detail="Database not ready") | |
| num = re.sub(r"\D", "", str(req.hadith_number)) | |
| mask = df_all["hadith_number"] == num | |
| rows = df_all[mask] | |
| if rows.empty: | |
| raise HTTPException(status_code=404, detail="Not found") | |
| return [build_result_row(r, 100) for _, r in rows.iterrows()] |