import streamlit as st from datetime import datetime, timedelta import pytz from google_auth_oauthlib.flow import Flow from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError import json import re import ast import base64 import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import os os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" os.environ["HF_HOME"] = "/tmp/huggingface_cache" os.environ["HF_DATASETS_CACHE"] = "/tmp/huggingface_cache" os.environ["HF_METRICS_CACHE"] = "/tmp/huggingface_cache" from transformers import AutoModelForCausalLM, AutoTokenizer import torch st.set_page_config(page_title="ChronoCall-Q", page_icon="🗓️") st.markdown(""" """, unsafe_allow_html=True) # --- Model --- def convert_to_dict(text): match = re.search(r"\n(.*?)\n", text) if match: tool_dict_str = match.group(1) try: result = ast.literal_eval(tool_dict_str) return(result) except Exception as e: return({}) else: return({}) # --- Model --- model_name_or_path = "TechitoTamani/Qwen3-0.6B_FinetuneWithMyData-Merged" json_path = os.path.join(os.path.dirname(__file__), 'tools.json') with open(json_path, 'r', encoding='utf-8') as f: TOOLS = json.load(f) @st.cache_resource def load_model_and_tokenizer(): tokenizer = AutoTokenizer.from_pretrained(model_name_or_path) model = AutoModelForCausalLM.from_pretrained( model_name_or_path, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True, ) return tokenizer, model tokenizer, model = load_model_and_tokenizer() def get_model_answer(messages): text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, tools=TOOLS, enable_thinking=False ) inputs = tokenizer(text, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate(**inputs, max_new_tokens=128) output_text = tokenizer.batch_decode(outputs)[0][len(text):] return output_text # --- Calendar --- SCOPES = ['https://www.googleapis.com/auth/calendar'] css_path = os.path.join(os.path.dirname(__file__), 'style.css') with open(css_path, 'r', encoding='utf-8') as f: custom_css = f"" st.markdown(custom_css, unsafe_allow_html=True) def get_base64_image(image_filename): image_path = os.path.join(os.path.dirname(__file__), image_filename) with open(image_path, "rb") as img_file: b64_data = base64.b64encode(img_file.read()).decode() return b64_data def create_flow(): return Flow.from_client_config( { "web": { "client_id": os.environ["client_id"], "client_secret": os.environ["client_secret"], "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [os.environ["redirect_uri"]], } }, scopes=SCOPES, redirect_uri=os.environ["redirect_uri"] ) def generate_auth_url(flow): auth_url, _ = flow.authorization_url(prompt='consent', access_type='offline', include_granted_scopes='true') return auth_url def create_service(creds): return build("calendar", "v3", credentials=creds) # --- Adding --- def extract_time_or_dash(iso_time: str) -> str: try: dt = datetime.fromisoformat(iso_time) if "T" in iso_time: return dt.strftime("%H:%M") else: return "-" except ValueError: return "-" def handle_calendar_action(service, action_data): name = action_data.get("name") args = action_data.get("arguments", {}) if name == "add_event_date": date_str = args["date"] time_str = args["time"] title = args["title"] dt_start = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M") dt_end = dt_start + timedelta(hours=1) event = { 'summary': title, 'start': { 'dateTime': dt_start.isoformat(), 'timeZone': 'Asia/Bangkok', }, 'end': { 'dateTime': dt_end.isoformat(), 'timeZone': 'Asia/Bangkok', }, } created = service.events().insert(calendarId='primary', body=event).execute() st.success(f"✅ เพิ่มกิจกรรม: {title} [ดูใน Calendar]({created.get('htmlLink')})") elif name == "delete_event_date": date_str = args["date"] title = args["title"] events = get_events_by_date_and_title(service, date_str, title) if events: for event in events: service.events().delete(calendarId='primary', eventId=event['id']).execute() st.success(f"🗑 ลบนัด: {title} ในวันที่ {date_str} แล้ว") else: st.warning(f"ไม่พบนัด: {title} ในวันที่ {date_str}") elif name == "update_event": date_str = args["date"] new_time_str = args["time"] title = args["title"] events = get_events_by_date_and_title(service, date_str, title) if events: for event in events: new_start = datetime.strptime(f"{date_str} {new_time_str}", "%Y-%m-%d %H:%M") new_end = new_start + timedelta(hours=1) event["start"]["dateTime"] = new_start.isoformat() event["end"]["dateTime"] = new_end.isoformat() service.events().update(calendarId='primary', eventId=event["id"], body=event).execute() st.success(f"✏️ เปลี่ยนเวลานัด: {title} เป็น {new_time_str}") else: st.warning(f"ไม่พบนัด: {title} ในวันที่ {date_str}") elif name == "view_event_date": date_str = args["date"] events = get_events_by_date(service, date_str) if not events: st.info("ไม่มีนัดในวันนี้") else: st.write(f"📅 นัดทั้งหมดในวันที่ {date_str}:") for e in events: date = e['start'].get('dateTime', e['start'].get('date')) time = extract_time_or_dash(date) st.write(f"- {e['summary']} เวลา: {time}") def get_events_by_date(service, date_str): date_obj = datetime.strptime(date_str, "%Y-%m-%d") start = date_obj.strftime("%Y-%m-%dT00:00:00Z") end = (date_obj + timedelta(days=1)).strftime("%Y-%m-%dT00:00:00Z") # # DEBUG # st.write(f"Fetching events for date: {date_str}") # st.write(f"timeMin = {start}, timeMax = {end}") try: events_result = service.events().list( calendarId='primary', timeMin=start, timeMax=end, singleEvents=True, orderBy='startTime' ).execute() return events_result.get('items', []) except HttpError as error: st.error(f"❌ Google Calendar API error: {error}") return [] except Exception as e: st.error(f"❌ Unknown error: {e}") return [] def get_events_by_date_and_title(service, date_str, title): events = get_events_by_date(service, date_str) return [e for e in events if e.get("summary") == title] # --- Email --- sender = os.environ["sender"] receiver = os.environ["receiver"] password = os.environ["password"] def send_email(user_email): subject = "New Beta Tester - ChronoCall-Q" body = f"""\ มีผู้สนใจเข้าร่วม Beta Test อีเมล: {user_email} """ msg = MIMEMultipart() msg["From"] = sender msg["To"] = receiver msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) try: with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: server.login(sender, password) server.sendmail(sender, receiver, msg.as_string()) return True except Exception as e: st.error(f"เกิดข้อผิดพลาด: {e}") return False # --- Streamlit --- def main(): # ดึง query params params = st.query_params code = params.get("code", None) # ถ้ายังไม่ได้ login if "credentials" not in st.session_state: if code: flow = create_flow() try: flow.fetch_token(code=code) creds = flow.credentials st.session_state["credentials"] = { "token": creds.token, "refresh_token": creds.refresh_token, "token_uri": creds.token_uri, "client_id": creds.client_id, "client_secret": creds.client_secret, "scopes": creds.scopes } st.success("🎉 ล็อกอินสำเร็จ! พร้อมใช้งานแล้ว") st.rerun() except Exception as e: st.error(f"เกิดข้อผิดพลาด: {e}") return else: flow = create_flow() auth_url = generate_auth_url(flow) # --- Title + Login --- st.markdown(f"""
ChronoCall-Q
""", unsafe_allow_html=True) # --- Emali sender --- st.markdown("""

ถ้า Login ไม่ได้ใช่ไหม?
ไม่เป็นไร! ส่งคำขอเข้าร่วม Beta Test มาให้เราก่อนได้เลย

""", unsafe_allow_html=True) email = st.text_input("กรอกอีเมลของคุณที่นี่") if st.button("ส่งคำขอเข้าร่วม Beta Test"): if email: if send_email(email): st.success("ส่งคำขอเรียบร้อยแล้ว!") else: st.error("เกิดข้อผิดพลาดในการส่ง กรุณาลองใหม่อีกครั้ง") else: st.warning("กรุณากรอกอีเมลก่อนกดส่ง") # --- pic --- img_b64 = get_base64_image("IMG_1358.jpg") st.markdown(f''' ''', unsafe_allow_html=True) st.stop() # Logged in creds = Credentials(**st.session_state["credentials"]) service = create_service(creds) st.title("ChronoCall-Q") st.caption("วิธีใช้งาน") with st.expander("การเพิ่มเหตุการณ์ (Add Event)"): st.markdown(""" เพื่อเพิ่มเหตุการณ์ใหม่ ต้องระบุ: - **ชื่อเหตุการณ์** - **วันที่** - **เวลา** > **ตัวอย่าง:** เพิ่มนัดประชุมพรุ่งนี้ตอนสิบโมง """) with st.expander("การลบเหตุการณ์ (Delete Event)"): st.markdown(""" เพื่อทำการลบเหตุการณ์ ต้องระบุ: - **ชื่อเหตุการณ์** - **วันที่** > **ตัวอย่าง:** เพิ่มนัดประชุมพรุ่งนี้ตอนสิบโมง """) with st.expander("การอัปเดตเหตุการณ์ (Update Event)"): st.markdown(""" เพื่อเปลี่ยนเวลาเหตุการณ์ในวันเดียวกัน ต้องระบุ: - **ชื่อเหตุการณ์** - **วันที่** - **เวลาใหม่** > **ตัวอย่าง:** เปลี่ยนนัดประชุมพรุ่งนี้เป็นสิบเอ็ดโมง """) with st.expander("การดูเหตุการณ์ (View Events)"): st.markdown(""" เพื่อดูรายการเหตุการณ์ในวันใดวันหนึ่ง ต้องระบุ: - **วันที่ที่ต้องการดู** > **ตัวอย่าง:** พรุ่งนี้มีนัดอะไรบ้าง """) if "user_input" not in st.session_state: st.session_state.user_input = "" messages = [ {"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\nCurrent Date: 2025-06-07.\n\nCurrent Day: Saturday."}, ] user_input = st.text_input("พิมพ์คำสั่งที่นี่ แล้วกด Enter หรือกดปุ่มยืนยัน", value=st.session_state.user_input, key="input") submit_button = st.button("ยืนยัน") if (user_input and user_input != st.session_state.user_input) or submit_button: bangkok_tz = pytz.timezone("Asia/Bangkok") now = datetime.now(bangkok_tz) current_date = now.strftime("%Y-%m-%d") current_day = now.strftime("%A") messages = [ { "role": "system", "content": f"You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\nCurrent Date: {current_date}.\n\nCurrent Day: {current_day}." }, { "role": "user", "content": user_input } ] with st.spinner("โมเดลกำลังคิด..."): response = get_model_answer(messages) st.success(f"Chrono: {response}") func_call_dict = convert_to_dict(response) handle_calendar_action(service, func_call_dict) st.session_state.user_input = "" if __name__ == "__main__": main()