# app.py
# ================== الاستيراد ==================
import gradio as gr
import pandas as pd
import numpy as np
import tempfile
import os, pickle
from datetime import datetime
import io, base64
import arabic_reshaper
from bidi.algorithm import get_display
import matplotlib.pyplot as plt
from sentence_transformers import SentenceTransformer, util
from Bio import Entrez
import gdown
# --- إضافات خاصة بالـ Hugging Face dataset sync ---
from huggingface_hub import HfApi, hf_hub_download, upload_file
# ----------------------------------------------------------------
# ------------------ Data store & Google Drive sync (background sync) ------------------
import shutil, time, threading
from datetime import datetime
import os
DATA_FOLDER = "data_store"
DRIVE_FOLDER_ID = "1znvCrr63iSN2MWveAZvcXuGT3o5TfZc2" # Library_Backups folder ID on your Drive
def ensure_data_folder():
os.makedirs(DATA_FOLDER, exist_ok=True)
return DATA_FOLDER
def get_data_path(filename):
ensure_data_folder()
return os.path.join(DATA_FOLDER, filename)
# Attempt to import PyDrive2 and set up authentication functions.
try:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from oauth2client.service_account import ServiceAccountCredentials
_GD_AVAILABLE = True
except Exception:
_GD_AVAILABLE = False
def ensure_drive_auth():
"""
Authenticate to Google Drive using a Service Account JSON file (client_secrets.json)
placed inside data_store/. Works automatically on Spaces without manual login.
"""
if not _GD_AVAILABLE:
print("❌ PyDrive2 not available.")
return None
try:
service_account_path = os.path.join(DATA_FOLDER, "client_secrets.json")
if not os.path.exists(service_account_path):
print("⚠️ لم يتم العثور على ملف client_secrets.json في data_store/")
return None
# Create credentials from service account JSON
scope = ['https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name(service_account_path, scope)
gauth = GoogleAuth()
gauth.credentials = credentials
drive = GoogleDrive(gauth)
print("✅ تم الاتصال بـ Google Drive باستخدام Service Account بنجاح.")
return drive
except Exception as e:
print("❌ فشل الاتصال بـ Google Drive:", e)
return None
def upload_to_drive(local_path, remote_name=None):
"""
Uploads a local file to Google Drive inside the folder defined by DRIVE_FOLDER_ID.
"""
try:
drive = ensure_drive_auth()
if drive is None:
print("⚠️ لم يتم الاتصال بـ Google Drive، لم يتم رفع الملف:", local_path)
return False
if remote_name is None:
remote_name = os.path.basename(local_path)
q = f"'{DRIVE_FOLDER_ID}' in parents and title = '{remote_name}' and trashed=false"
file_list = drive.ListFile({'q': q}).GetList()
if file_list:
f = file_list[0]
f.SetContentFile(local_path)
f.Upload()
print(f"🔁 تم تحديث الملف الموجود في Google Drive: {remote_name}")
else:
file_metadata = {'title': remote_name, 'parents': [{'id': DRIVE_FOLDER_ID}]}
f = drive.CreateFile(file_metadata)
f.SetContentFile(local_path)
f.Upload()
print(f"✅ تم رفع ملف جديد إلى Google Drive: {remote_name}")
return True
except Exception as e:
print("❌ خطأ أثناء رفع الملف إلى Google Drive:", e)
return False
def _background_sync(interval_seconds=30):
"""
Background thread to watch for new or modified CSV/XLSX files and upload them to Google Drive automatically.
"""
def _worker():
seen = {}
while True:
try:
# scan for csv and xlsx files in project root
for fname in os.listdir():
if fname.lower().endswith(('.csv', '.xlsx')) and fname not in (os.path.basename(__file__),):
try:
src_path = os.path.join(os.getcwd(), fname)
dst_path = get_data_path(fname)
# check modification time
mtime = os.path.getmtime(src_path)
if fname not in seen or seen[fname] < mtime:
shutil.copy2(src_path, dst_path)
seen[fname] = mtime
success = upload_to_drive(dst_path, os.path.basename(dst_path))
if success:
print(f"📤 تمت مزامنة الملف مع Google Drive: {fname} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})")
else:
print(f"⚠️ فشل رفع الملف: {fname}")
except Exception as e:
print("❌ خطأ أثناء المزامنة:", e)
except Exception:
pass
time.sleep(interval_seconds)
t = threading.Thread(target=_worker, daemon=True)
t.start()
# تشغيل المزامنة التلقائية كل 20 ثانية
try:
_background_sync(interval_seconds=20)
except Exception as e:
print("⚠️ لم يتم تشغيل المزامنة التلقائية:", e)
# ------------------ end background sync ------------------
# ================== HF dataset sync configuration ==================
DATASET_REPO = "pharma-library/AI_LibExplorer"
HF_TOKEN = os.getenv("HF_TOKEN") # يجب وضع هذا التوكن في Secrets باسم HF_TOKEN
ADMIN_PASS = os.getenv("ADMIN_PASS", "pharmacy1") # ضع Secret باسم ADMIN_PASS بقيمة pharmacy1
# قائمة الملفات التي نريد التأكد من توافرها ومزامنتها
LOCAL_FILES = [
"usage_stats.xlsx",
"questions.csv",
"suggestions.csv",
"user_logs.csv",
"faculty_borrow.csv",
"bank_requests.csv"
]
def download_data_files():
"""Download listed files from dataset repo to local project root if present."""
for file_name in LOCAL_FILES:
try:
# hf_hub_download سيحفظ الملف في local_dir كما طلبنا
hf_hub_download(
repo_id=DATASET_REPO,
filename=file_name,
repo_type="dataset",
local_dir=".",
token=HF_TOKEN
)
print(f"✅ تم تنزيل {file_name} من الـ Dataset.")
except Exception as e:
# الملف غير موجود بعد في الـ dataset — ليس خطأ، سيتم إنشاؤه عند الحاجة
print(f"ℹ️ {file_name} غير موجود على الـ Dataset حالياً ({e}). سيُنشأ لاحقًا عند كتابة البيانات.")
def upload_data_file(file_path):
"""Upload given file_path (relative path) to the dataset repo (overwrites/creates)."""
try:
# upload_file من huggingface_hub سيرفع الملف إلى المسار path_in_repo المحدد
upload_file(
path_or_fileobj=file_path,
path_in_repo=os.path.basename(file_path),
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
print(f"✅ Uploaded {file_path} to {DATASET_REPO}")
except Exception as e:
print(f"⚠️ خطأ في رفع {file_path} إلى HF dataset: {e}")
# تحميل الملفات عند بداية تشغيل التطبيق
download_data_files()
# ================== end HF dataset sync ==================
# ================== دوال مساعدة ==================
def download_from_drive(file_id, output):
url = f"https://drive.google.com/uc?id={file_id}"
try:
gdown.download(url, output, quiet=True)
except Exception:
# إذا فشل التحميل من Drive نتابع لوجود ملفات محلية بالفعل
pass
# ================== إعدادات ومسارات ==================
Entrez.email = "emananter0123@gmail.com"
EMB_DIR = "embeddings_cache"
os.makedirs(EMB_DIR, exist_ok=True)
def embeddings_path(name):
return os.path.join(EMB_DIR, f"{name}_embeddings.pkl")
BOOKS_FILE, THESES_FILE, FAQ_FILE = "book.xlsx", "theses.xlsx", "faq.xlsx"
QUESTIONS_FILE = "questions.csv" # ملف حفظ الأسئلة
FACULTY_BORROW_FILE = "faculty_borrow.csv" # طلبات أعضاء هيئة التدريس للاستعارة
# محاولة تنزيل الملفات من Google Drive (إن أردتِ يمكنك حذف السطور التالية إذا الملفات محلية)
download_from_drive("1FElHiASfiVLeuHWYaqd2Q5foxWRlJT-O", BOOKS_FILE) # book.xlsx
download_from_drive("1K2Mtze6ZdvfKUsFMCOWlRBjDq-ZnJNrv", THESES_FILE) # theses.xlsx
download_from_drive("1ONjXtfv709BOxiIR0Qzmz6lQngnWEEso", FAQ_FILE) # faq.xlsx
# ================== تحميل البيانات المحلية ==================
def load_local_data():
if not os.path.exists(BOOKS_FILE) or not os.path.exists(THESES_FILE):
raise FileNotFoundError("تأكدي من رفع الملفات book.xlsx و theses.xlsx في مجلد المشروع.")
books = pd.read_excel(BOOKS_FILE).fillna("غير متوافر")
theses = pd.read_excel(THESES_FILE).fillna("غير متوافر")
# توحيد اسم عمود العنوان بين ملفات مختلفة
if "Title" not in books.columns and "العنوان" in books.columns:
books["Title"] = books["العنوان"].astype(str)
elif "Title" not in books.columns:
books["Title"] = books.iloc[:,0].astype(str)
if "Title" not in theses.columns and "العنوان" in theses.columns:
theses["Title"] = theses["العنوان"].astype(str)
elif "Title" not in theses.columns:
theses["Title"] = theses.iloc[:,0].astype(str)
return books, theses
books_df, theses_df = load_local_data()
# ================== نموذج Semantic ==================
MODEL_NAME = "all-MiniLM-L6-v2"
model = SentenceTransformer(MODEL_NAME)
def build_or_load_embeddings(df, name):
path = embeddings_path(name)
if os.path.exists(path):
try:
with open(path,"rb") as f:
emb = pickle.load(f)
if len(emb) == len(df):
return emb
except Exception:
pass
texts = df["Title"].astype(str).tolist()
emb = model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
with open(path,"wb") as f:
pickle.dump(emb,f)
return emb
books_embeddings = build_or_load_embeddings(books_df,"books")
theses_embeddings = build_or_load_embeddings(theses_df,"theses")
# ================== FAQ & مساعد ذكي ==================
if os.path.exists(FAQ_FILE):
faq_df = pd.read_excel(FAQ_FILE).fillna("")
else:
faq_df = pd.DataFrame({
"السؤال":["مواعيد المكتبة","خدمات المكتبة","كيفية الاستعارة","التسجيل في بنك المعرفة","التصوير","أقسام المكتبة","التواصل"],
"الإجابة":[
"🕘 المكتبة تعمل من السبت إلى الخميس من 9 صباحًا حتى 2 ظهرًا",
"📌 خدمات المكتبة: الإحاطة الجارية، التسجيل على بنك المعرفة، التصوير، البحث الإلكتروني، الاستعارة الخارجية",
"✅ يتم استعارة الكتب لمدة أسبوعين (أو حسب القاعدة المحلية)",
"🔗 التسجيل في بنك المعرفة عبر موقع بنك المعرفة المصري",
"🖨️ التصوير متاح وفق القواعد",
"📚 المكتبة بها: قاعة المراجع، قاعة الكتب الدراسية، قاعة الدوريات، قاعة الرسائل الجامعية",
"☎️ للتواصل: صفحة الفيسبوك"
]
})
faq_questions = faq_df["السؤال"].astype(str).tolist()
faq_answers = faq_df["الإجابة"].astype(str).tolist()
FAQ_EMB_PATH = embeddings_path("faq")
def build_or_load_faq_embeddings(questions):
if os.path.exists(FAQ_EMB_PATH):
try:
with open(FAQ_EMB_PATH,"rb") as f:
emb = pickle.load(f)
if len(emb) == len(questions):
return emb
except Exception:
pass
emb = model.encode(questions, convert_to_numpy=True, show_progress_bar=False)
with open(FAQ_EMB_PATH,"wb") as f:
pickle.dump(emb,f)
return emb
faq_embeddings = build_or_load_faq_embeddings(faq_questions)
def library_assistant_smart(question):
if not str(question).strip():
return "⚠️ من فضلك اكتب سؤالك"
q_emb = model.encode([str(question)], convert_to_numpy=True)
sims = util.cos_sim(q_emb, faq_embeddings)[0].cpu().numpy()
idx_best = int(np.argmax(sims))
best_score = sims[idx_best]
log_usage("المساعد الذكي", question, "FAQ", 1)
if best_score < 0.75:
return "❗ لم أجد إجابة مناسبة، حاول صياغة سؤالك بشكل مختلف."
return f"الإجابة الأقرب:
{faq_answers[idx_best]}
درجة التشابه: {best_score:.2f}"
# ================== CSS ==================
CUSTOM_CSS = """
"""
# ================== تتبع الاستخدام ==================
def log_usage(tab, query="", mode="", results_count=0):
file_path = "usage_stats.xlsx"
entry = pd.DataFrame([{
"التاريخ": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"التب": tab,
"الكلمة المفتاحية": query,
"نوع البحث": mode,
"عدد النتائج": results_count
}])
if os.path.exists(file_path):
try:
old = pd.read_excel(file_path)
entry = pd.concat([old, entry], ignore_index=True)
except Exception:
entry = entry
entry.to_excel(file_path, index=False)
# upload to HF dataset (auto backup)
try:
upload_data_file(file_path)
except Exception as e:
print("⚠️ خطأ أثناء رفع usage_stats:", e)
# ================== عرض إحصاءات الاستخدام ==================
def show_usage_stats():
file_path = "usage_stats.xlsx"
if not os.path.exists(file_path):
return "
📊 لا توجد بيانات بعد
" df = pd.read_excel(file_path) total = len(df) by_tab = df["التب"].value_counts() fig, ax = plt.subplots() labels = [get_display(arabic_reshaper.reshape(str(t))) for t in by_tab.index] ax.bar(labels, by_tab.values) for i, v in enumerate(by_tab.values): ax.text(i, v + 0.1, str(v), ha='center', va='bottom', fontsize=10) ax.set_title(get_display(arabic_reshaper.reshape("عدد مرات استخدام كل تب"))) ax.set_xlabel(get_display(arabic_reshaper.reshape("التب"))) ax.set_ylabel(get_display(arabic_reshaper.reshape("عدد العمليات"))) plt.xticks(rotation=30, ha='right') buf = io.BytesIO() plt.tight_layout() plt.savefig(buf, format="png") plt.close(fig) buf.seek(0) img = base64.b64encode(buf.read()).decode("utf-8") summary = f"❌ لا توجد نتائج
" html = CUSTOM_CSS + "| {col} | {val} |
|---|
❌ لا توجد نتائج
" return CUSTOM_CSS + df.to_html(escape=False, index=False, classes="styled-table") # ================== حفظ النتائج إلى ملف Excel ================== def save_to_excel(df): if df is None or (isinstance(df, pd.DataFrame) and df.empty): tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") pd.DataFrame().to_excel(tmp.name, index=False) return tmp.name tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") if isinstance(df, pd.DataFrame): df.to_excel(tmp.name, index=False) else: pd.DataFrame({"result_html":[str(df)]}).to_excel(tmp.name, index=False) return tmp.name # ================== البحث المحلي ================== def local_search_df(query, category, mode): if not query or not str(query).strip(): return "⚠️ اكتب كلمة أو جملة للبحث
", pd.DataFrame() if mode == "نصي": if category == "Books": if "العنوان" in books_df.columns: df = books_df[books_df["العنوان"].astype(str).str.contains(query, case=False, na=False)] else: df = books_df[books_df["Title"].astype(str).str.contains(query, case=False, na=False)] else: if "العنوان" in theses_df.columns: df = theses_df[theses_df["العنوان"].astype(str).str.contains(query, case=False, na=False)] else: df = theses_df[theses_df["Title"].astype(str).str.contains(query, case=False, na=False)] else: q_emb = model.encode([query], convert_to_numpy=True) if category == "Books": scores = util.cos_sim(q_emb, books_embeddings)[0].cpu().numpy() idx = np.argsort(-scores) df = books_df.iloc[idx] else: scores = util.cos_sim(q_emb, theses_embeddings)[0].cpu().numpy() idx = np.argsort(-scores) df = theses_df.iloc[idx] if df is None or df.empty: df = pd.DataFrame([{"نتيجة":"❌ لم يتم العثور على نتائج"}]) else: if "Title" in df.columns: df = df.drop(columns=["Title"]) log_usage("البحث المحلي", query, mode, len(df) if isinstance(df, pd.DataFrame) else 0) html_results = results_to_html(df) return html_results, df # ================== البحث في PubMed ================== def search_pubmed_html(query, max_results=5): if not query or not str(query).strip(): return "⚠️ اكتب كلمة للبحث
" try: handle = Entrez.esearch(db="pubmed", term=query, retmax=max_results) record = Entrez.read(handle) handle.close() ids = record.get("IdList", []) if not ids: return "❌ لا توجد نتائج
" handle = Entrez.efetch(db="pubmed", id=",".join(ids), rettype="xml", retmode="xml") records = Entrez.read(handle) handle.close() rows = [] for rec in records.get('PubmedArticle', []): try: title = rec['MedlineCitation']['Article']['ArticleTitle'] pmid = rec['MedlineCitation']['PMID'] link = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" rows.append({ "الموقع": "PubMed", "الوصف": "قاعدة بيانات PubMed", "العنوان": title, "الرابط": f"فتح" }) except Exception: continue if not rows: return "❌ لم يتم العثور على نتائج صالحة.
" df = pd.DataFrame(rows) log_usage("المصادر الخارجية", query, "PubMed", len(df)) return CUSTOM_CSS + df.to_html(escape=False, index=False, classes="styled-table") except Exception as e: return f"❌ حدث خطأ أثناء جلب البيانات من PubMed:
{str(e)}
⚠️ من فضلك اكتب كلمة للبحث
" if not site or site not in EXTERNAL_LINKS: return f"⚠️ الموقع المحدد غير معروف أو غير موجود في القائمة.
" if site == "PubMed": return search_pubmed_html(query) base = EXTERNAL_LINKS[site]["url"] desc = EXTERNAL_LINKS[site]["desc"] if "?" in base or "=" in base: link = f"{base}{query}" else: link = base df = pd.DataFrame([ {"اسم الموقع": site, "الوصف": desc, "رابط البحث": f"للوصول إلى نتيجة البحث"} ]) log_usage("المصادر الخارجية", query, site, 1) html_table = df.to_html(escape=False, index=False, classes="styled-table") return CUSTOM_CSS + f"❌ حدث خطأ أثناء تنفيذ البحث:
{str(e)}
⚠️ من فضلك اكتب كلمة للبحث
" if not site or site not in JOURNALS: return f"⚠️ الموقع المحدد غير معروف أو غير موجود في قائمة الدوريات.
" base = JOURNALS[site]["url"] desc = JOURNALS[site]["desc"] if "?" in base or "=" in base: link = f"{base}{query}" else: link = base df = pd.DataFrame([{"اسم الموقع": site, "الوصف": desc, "رابط البحث": f"للوصول إلى نتيجة البحث"}]) log_usage("الدوريات", query, site, 1) html_table = df.to_html(escape=False, index=False, classes="styled-table") return CUSTOM_CSS + f"❌ حدث خطأ أثناء تنفيذ البحث:
{str(e)}
⚠️ من فضلك اكتب كلمة للبحث
" if not site or site not in AI_TOOLS: return f"⚠️ الأداة المحددة غير معروفة أو غير موجودة في القائمة.
" base = AI_TOOLS[site]["url"] desc = AI_TOOLS[site]["desc"] if "?" in base or "=" in base: link = f"{base}{query}" else: link = base df = pd.DataFrame([{"اسم الأداة": site, "الوصف": desc, "رابط البحث": f"للوصول إلى نتيجة البحث"}]) log_usage("أدوات الذكاء الاصطناعي", query, site, 1) html_table = df.to_html(escape=False, index=False, classes="styled-table") return CUSTOM_CSS + f"❌ حدث خطأ أثناء تنفيذ البحث:
{str(e)}
❌ لا توجد أسئلة حتى الآن.
" html = CUSTOM_CSS + "