import asyncio import os import re import io import sqlite3 import hashlib import random from collections import deque, defaultdict from datetime import datetime, timedelta, timezone from mimetypes import guess_extension from typing import List, Tuple, Optional, Dict import aiohttp from rapidfuzz import fuzz from telethon import TelegramClient, events from telethon.sessions import StringSession, MemorySession from telethon.errors import FloodWaitError # ========================= # Attach autotrack (pakai client yang sama) # ========================= print("[BOOT] setting up autotrack…", flush=True) try: from autotrack import setup_autotrack # pastikan autotrack.py versi fix _HAS_AUTOTRACK = True except Exception as e: print("[BOOT] autotrack import failed:", repr(e), flush=True) _HAS_AUTOTRACK = False # ========================= # ENV / CONFIG # ========================= API_ID = int(os.environ.get("API_ID", "0")) API_HASH = os.environ.get("API_HASH", "") STRING_SESSION = os.environ.get("STRING_SESSION", "") # Target posting / reply TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll") # ====== Leaderboard config (Phanes) ====== LEADERBOARD_GROUP = os.environ.get("LEADERBOARD_GROUP", "https://t.me/+IaefBrMZwW5kMTEx") LB_TRIGGER = os.environ.get("LB_TRIGGER", "/lb") LEADERBOARD_BOT = os.environ.get("LEADERBOARD_BOT", "@PhanesGreenBot") LB_REQUIRE_MIN_RANKS = int(os.environ.get("LB_REQUIRE_MIN_RANKS", "0")) LEADERBOARD_COOLDOWN_SEC = int(os.environ.get("LEADERBOARD_COOLDOWN_SEC", "600")) # ====== Scheduler interval (acak per JAM) ====== LB_INTERVAL_MIN_HOURS = int(os.environ.get("LB_INTERVAL_MIN_HOURS", "3")) LB_INTERVAL_MAX_HOURS = int(os.environ.get("LB_INTERVAL_MAX_HOURS", "20")) # ====== Dedup & window ====== INITIAL_BACKFILL = 2 DEDUP_BUFFER_SIZE = int(os.environ.get("DEDUP_BUFFER_SIZE", "800")) CLASS_WINDOW_MINUTES = int(os.environ.get("CLASS_WINDOW_MINUTES", "180")) BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3")) # ====== Gate Core/Support ====== SUPPORT_MIN_UNIQUE = int(os.environ.get("SUPPORT_MIN_UNIQUE", "2")) LOW_MIN_UNIQUE = int(os.environ.get("LOW_MIN_UNIQUE", "2")) # ====== Behavior ====== DRY_RUN = os.environ.get("DRY_RUN", "0") == "1" UPDATE_STRATEGY = os.environ.get("UPDATE_STRATEGY", "reply").lower() # edit|reply|new (TIDAK dipakai utk reply lagi) UPDATE_COOLDOWN_SEC = int(os.environ.get("UPDATE_COOLDOWN_SEC", "5")) # ====== Media flags ====== INCLUDE_MEDIA = os.environ.get("INCLUDE_MEDIA", "0") == "1" ALLOW_GIFS_VIDEOS = os.environ.get("ALLOW_GIFS_VIDEOS", "0") == "1" MAX_MEDIA_MB = int(os.environ.get("MAX_MEDIA_MB", "8")) # ====== Keywords & filter ====== THEME_KEYWORDS = [kw.strip().lower() for kw in os.environ.get( "THEME_KEYWORDS", "pump,call,entry,entries,sl,tp,launch,buy,gem,bnb,eth,btc,sol,moon,ath,breakout,sol,$,aped" ).split(",") if kw.strip()] KEYWORD_WEIGHT = float(os.environ.get("KEYWORD_WEIGHT", "1.0")) FUZZ_WEIGHT = float(os.environ.get("FUZZ_WEIGHT", "0.7")) RELEVANCE_THRESHOLD = float(os.environ.get("RELEVANCE_THRESHOLD", "0.6")) EXCLUDE_PHRASES = [p.strip().lower() for p in os.environ.get( "EXCLUDE_PHRASES", "achievement unlocked,call profit:,achieving +" ).split(",") if p.strip()] # ====== Milestones label (untuk copy di pesan awal CA) ====== _M_RAW = os.environ.get("MILESTONES", "1.5,2") try: _M_LIST = [x.strip() for x in _M_RAW.split(",") if x.strip()] except Exception: _M_LIST = ["1.5", "2"] try: if not _M_LIST: _M_LIST = ["1.5", "2"] MILESTONES_LABEL = " • ".join(f"{m}×" for m in _M_LIST) except Exception: _M_LIST = ["1.5", "2"] MILESTONES_LABEL = "1.5× • 2×" # ====== Chains (Dexscreener) DEXSCREENER_TOKEN_URL = "https://api.dexscreener.com/latest/dex/tokens/" CHAIN_HINTS = { "bsc": "bsc", "bnb": "bsc", "binance": "bsc", "eth": "ethereum", "ethereum": "ethereum", "base": "base", "coinbase": "base", "sol": "solana", "solana": "solana", "pump.fun": "solana" } # ====== DB ====== DB_PATH = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db") # ========================= # Sources (CORE & SUPPORT) # ========================= CORE_CHATS = [ "https://t.me/PEPE_Calls28", "https://t.me/SephirothGemCalls1", "https://t.me/HenryGems", "https://t.me/ChinaPumpCommunity", "https://t.me/GM_Degencalls", "https://t.me/Enthucalls", "https://t.me/kobecalls", "https://t.me/oyugu", "https://t.me/houseofdegeneracy", "https://t.me/jackjhonnychannel", "https://t.me/Kulture_Kall", "https://t.me/Zen_call", "https://t.me/ryoshigamble", "https://t.me/Bane_calls", "https://t.me/ghostfacecallerchannel", "https://t.me/HAZARDPRINT", "https://t.me/luigiscalls", "https://t.me/KingIceCalls", "https://t.me/onchainalphatrench", "https://t.me/BedrockFoundation", "https://t.me/klevercalls", "https://t.me/Degen_Dynasty", "https://t.me/DustKcall", "https://t.me/RIOAlfaCalls", "https://t.me/chefsplayground", "https://t.me/NiksGambles", "https://t.me/erregualr", "https://t.me/LiterallyShadow", "https://t.me/CallsbyGem", "https://t.me/trendingfomo", "https://t.me/TopWhaleCalls", "https://t.me/MemeVilleCrypto", "https://t.me/insider_callzz", "https://t.me/watisdes", "https://t.me/chrisalphacalls", ] SUPPORT_CHATS = [ "https://t.me/TheDonALPHAJournal", "https://t.me/DrakeETL", "https://t.me/AnimeGems", "https://t.me/veigarcalls", "https://t.me/primegems", "https://t.me/Superapedegenlife", "https://t.me/dr_crypto_channel", "https://t.me/savascalls", "https://t.me/Tanjirocall", "https://t.me/ChapoInsider", "https://t.me/millionsgems", "https://t.me/Milagrosdegencalls", "https://t.me/kariusgemscalls", "https://t.me/Dwen_Exchange", "https://t.me/bat_gamble", "https://t.me/BatmanGamble", "https://t.me/hulkgemscalls_real", "https://t.me/MineGems", "https://t.me/chengambles", "https://t.me/BurritoSignal", "https://t.me/michacalls", "https://t.me/GemsmineEth", "https://t.me/AurumGemss", "https://t.me/monacocalls", "https://t.me/APOLLO_DEGEN_CALLS", "https://t.me/maybac_whales", "https://t.me/DONALD_CALL", "https://t.me/ZionGems", "https://t.me/maniacapes", "https://t.me/NatsuCryptoGems", "https://t.me/nicksdegens", "https://t.me/nopaidcallshere", "https://t.me/CCMFreeSignal", "https://t.me/SBCtrades", "https://t.me/illeaglegamblers", "https://t.me/PrintingShitcoin", ] SOURCE_CHATS = CORE_CHATS + SUPPORT_CHATS # ========================= # Boot client # ========================= def build_client() -> TelegramClient: if STRING_SESSION: print(">> Using StringSession (persistent).") return TelegramClient(StringSession(STRING_SESSION), API_ID, API_HASH) print(">> Using MemorySession (login tiap run).") return TelegramClient(MemorySession(), API_ID, API_HASH) client = build_client() # Attach autotrack ke client yang sama if _HAS_AUTOTRACK: try: setup_autotrack(client, announce_chat=TARGET_CHAT) print("[BOOT] autotrack attached ✓", flush=True) except Exception as e: import traceback print("[BOOT] autotrack attach failed:", repr(e), flush=True) traceback.print_exc() # ========================= # State & DB # ========================= recent_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) recent_content_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) recent_entity_keys: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) chat_roles: Dict[int, str] = {} # id_chat -> "core"/"support" startup_time_utc = datetime.now(timezone.utc) def _db(): conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL;") return conn def _init_db(): conn = _db() conn.executescript( """ CREATE TABLE IF NOT EXISTS last_posted ( keyword TEXT PRIMARY KEY, msg_id INTEGER NOT NULL, tier TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS kw_group_seen ( keyword TEXT NOT NULL, group_key TEXT NOT NULL, last_ts INTEGER NOT NULL, PRIMARY KEY (keyword, group_key) ); """ ) conn.commit() conn.close() # === Tambahan tabel summary === def db_summary_init(): conn = _db() conn.executescript(""" CREATE TABLE IF NOT EXISTS summary_calls ( keyword TEXT PRIMARY KEY, posted_at INTEGER NOT NULL, tier TEXT NOT NULL, msg_id INTEGER NOT NULL ); """) conn.commit() conn.close() def db_summary_log_call(keyword: str, msg_id: int, tier: str, ts_utc: int): try: conn = _db() conn.execute( "INSERT OR REPLACE INTO summary_calls(keyword, posted_at, tier, msg_id) VALUES(?,?,?,?)", (keyword, ts_utc, tier, msg_id) ) conn.commit() conn.close() except Exception as e: print(f"[SUMMARY] log call failed: {e}") _init_db() db_summary_init() # >>> DEFERRED INIT STATE (hindari NameError saat import oleh server.py) last_posted = {} keyword_group_last_seen = {} def _late_init_state(): global last_posted, keyword_group_last_seen last_posted, keyword_group_last_seen = db_load_state() # ========================= # Utils # ========================= def debug_log(reason: str, content: str = "") -> None: short = (content or "").replace("\n", " ")[:160] print(f"[DEBUG] {reason}: {short}") def normalize_for_filter(text: str) -> str: if not text: return "" s = re.sub(r"(?m)^>.*", "", text) # strip blockquotes s = re.sub(r"\s+", " ", s).strip() return s def _tokenize_words(s: str) -> List[str]: return re.findall(r"[a-z0-9\$\#]{1,64}", s.lower()) def _windows(tokens: List[str], size: int = 20): for i in range(0, len(tokens), size): yield " ".join(tokens[i : i + size]) # --- CA patterns & cleaners --- CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") # Solana CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*\S+", re.IGNORECASE) def _strip_urls_and_mentions(s: str) -> str: s = re.sub(r"https?://\S+", "", s) s = re.sub(r"t\.me/[A-Za-z0-9_]+", "", s) s = re.sub(r"@[A-Za-z0-9_]+", "", s) return re.sub(r"\s+", " ", s).strip() def strip_contracts_for_scoring(s: str) -> str: s0 = _strip_urls_and_mentions(s) s1 = CA_LABEL_RE.sub(" ", s0) s2 = CA_EVM_RE.sub(" ", s1) s3 = CA_SOL_RE.sub(" ", s2) return re.sub(r"\s+", " ", s3).strip() def score_relevance(text: str, keywords: List[str]) -> float: if not text: return 0.0 t = strip_contracts_for_scoring(text).lower() exact_hits = 0 for kw in set(keywords): if kw in t or re.search(rf"\b{re.escape(kw)}\b", t): exact_hits += 1 exact_score = exact_hits * KEYWORD_WEIGHT tokens = _tokenize_words(t) if not tokens: return exact_score scores = [] for w in _windows(tokens, 20): best = 0.0 for kw in keywords: sc = fuzz.partial_ratio(kw, w) / 100.0 if sc > best: best = sc scores.append(best) fuzzy_top3 = sorted(scores, reverse=True)[:3] fuzzy_score = (sum(fuzzy_top3) / max(1, len(fuzzy_top3))) * FUZZ_WEIGHT if fuzzy_top3 else 0.0 return exact_score + fuzzy_score def hash_for_dedup(text: str, msg) -> str: parts = [text or ""] if getattr(msg, "id", None) is not None: parts.append(str(msg.id)) doc = getattr(msg, "document", None) if doc and getattr(doc, "id", None) is not None: parts.append(f"doc:{doc.id}") if getattr(msg, "photo", None) is not None: ph = msg.photo ph_id = getattr(ph, "id", None) if ph_id is not None: parts.append(f"photo:{ph_id}") raw = "|".join(parts).encode("utf-8", errors="ignore") return hashlib.sha1(raw).hexdigest() def content_only_hash(text: str) -> str: norm = _strip_urls_and_mentions(normalize_for_filter(text)) return hashlib.sha1(norm.encode("utf-8", errors="ignore")).hexdigest() # ========================= # Class aggregator (windowed) # ========================= def _prune_expired(now: datetime) -> None: window = timedelta(minutes=CLASS_WINDOW_MINUTES) cutoff = now - window for kw, m in list(keyword_group_last_seen.items()): for gk, ts in list(m.items()): if ts < cutoff: del m[gk] if not m: del keyword_group_last_seen[kw] db_prune_expired(cutoff) def _classify_by_unique(unique_groups: int) -> Tuple[str, int]: # (+1 di tiap tier): FOMO ≥ 9, Strong ≥ 6, Medium ≥ 4, Low < 4 if unique_groups >= 9: return "FOMO 🔥", unique_groups elif unique_groups >= 6: return "Strong 💪", unique_groups elif unique_groups >= 4: return "Medium ⚡", unique_groups else: return "Low 🌱", unique_groups def update_and_classify(keyword: str, group_key: str, now: Optional[datetime] = None) -> Tuple[str, int, bool]: if not now: now = datetime.now(timezone.utc) _prune_expired(now) bucket = keyword_group_last_seen.get(keyword, {}) is_new_group = group_key not in bucket bucket[group_key] = now keyword_group_last_seen[keyword] = bucket db_upsert_kw_seen(keyword, group_key, now) class_label, unique_groups = _classify_by_unique(len(bucket)) return class_label, unique_groups, is_new_group # ========================= # Sentence-level invite filter # ========================= INVITE_PATTERNS = [ r"\bjoin\b", r"\bjoin (us|our|channel|group)\b", r"\bdm\b", r"\bdm (me|gw|gue|gua|saya|admin)\b", r"\bpm\b", r"\binbox\b", r"\bcontact\b", r"\bhubungi\b", r"\bkontak\b", r"\bvip\b", r"\bpremium\b", r"\bberbayar\b", r"\bpaid\b", r"\bexclusive\b", r"\bwhitelist\b", r"\bprivate( group| channel)?\b", r"\bmembership?\b", r"\bsubscribe\b", r"\blangganan\b", r"\bpromo\b", r"\bpromosi\b", r"\biklan\b", r"(t\.me\/joinchat|t\.me\/\+|telegram\.me\/|discord\.gg\/|wa\.me\/|whatsapp\.com\/)", r"(bit\.ly|tinyurl\.com|linktr\.ee)", ] INVITE_REGEXES = [re.compile(p, re.IGNORECASE) for p in INVITE_PATTERNS] WHITELIST_STRONG_SIGNAL = [ r"\$[a-z0-9]{2,10}", r"\b(entry|entries|buy|sell)\b", r"\bsl\b", r"\btp\b", r"\btp\d\b", ] WHITELIST_REGEXES = [re.compile(p, re.IGNORECASE) for p in WHITELIST_STRONG_SIGNAL] def _is_invite_sentence(s: str) -> bool: t = s.strip() if not t: return False if any(r.search(t) for r in WHITELIST_REGEXES): return False return any(r.search(t) for r in INVITE_REGEXES) def filter_invite_sentences(text: str) -> str: if not text: return text parts = re.split(r'(?<=[\.\!\?])\s+|\n+', text, flags=re.UNICODE) kept = [p.strip() for p in parts if p and not _is_invite_sentence(p)] cleaned = "\n".join(kept).strip() cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned # ========================= # Media helpers # ========================= def is_image_message(msg) -> bool: if getattr(msg, "photo", None): return True doc = getattr(msg, "document", None) if not doc: return False mt = (getattr(doc, "mime_type", "") or "").lower() SKIP_STICKERS = True if mt == "image/webp" and SKIP_STICKERS: return False if mt.startswith("image/"): if mt == "image/gif": return ALLOW_GIFS_VIDEOS return True if mt.startswith("video/"): return ALLOW_GIFS_VIDEOS return False def media_too_big(msg) -> bool: doc = getattr(msg, "document", None) if not doc: return False size = getattr(doc, "size", None) if size is None: return False return (size / (1024 * 1024)) > MAX_MEDIA_MB # ========================= # CA helpers & message builder # ========================= TIER_ORDER = {"Low 🌱": 0, "Medium ⚡": 1, "Strong 💪": 2, "FOMO 🔥": 3} def _is_ca_key(keyword: str) -> bool: return keyword.startswith("ca:evm:") or keyword.startswith("ca:sol:") def _ca_from_key(keyword: str) -> str: if keyword.startswith("ca:evm:"): return keyword.split("ca:evm:", 1)[1] if keyword.startswith("ca:sol:"): return keyword.split("ca:sol:", 1)[1] return "" def _fmt_big_usd(x): if x is None: return "—" try: x = float(x) except: return "—" if x >= 1_000_000_000: return f"${x/1_000_000_000:.2f}B" if x >= 1_000_000: return f"${x/1_000_000:.2f}M" if x >= 1_000: return f"${x/1_000:.2f}K" return f"${x:.0f}" def _guess_chain_from_text(t: str) -> Optional[str]: t = (t or "").lower() for k, v in CHAIN_HINTS.items(): if re.search(rf"(^|\W){re.escape(k)}(\W|$)", t): return v return None async def _fetch_best_chain_for_ca(ca: str) -> Optional[str]: try: timeout = aiohttp.ClientTimeout(total=8) async with aiohttp.ClientSession(timeout=timeout) as sess: async with sess.get(DEXSCREENER_TOKEN_URL + ca) as r: if r.status != 200: return None data = await r.json() pairs = (data or {}).get("pairs") or [] if not pairs: return None best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0)) chain_id = (best.get("chainId") or "").lower().strip() return chain_id or None except: return None async def _dexs_link_universal(ca: str, context_text: Optional[str] = None) -> str: # Solana if CA_SOL_RE.fullmatch(ca): return f"https://dexscreener.com/solana/{ca}" # EVM if CA_EVM_RE.fullmatch(ca): hint = _guess_chain_from_text(context_text or "") if hint: return f"https://dexscreener.com/{hint}/{ca.lower()}" chain = await _fetch_best_chain_for_ca(ca) if chain: return f"https://dexscreener.com/{chain}/{ca.lower()}" return f"https://dexscreener.com/ethereum/{ca.lower()}" # non-CA fallback return f"https://dexscreener.com/token/{ca}" async def _fetch_initial_mcap(ca: str): try: timeout = aiohttp.ClientTimeout(total=8) async with aiohttp.ClientSession(timeout=timeout) as sess: async with sess.get(DEXSCREENER_TOKEN_URL + ca) as r: if r.status != 200: return None data = await r.json() pairs = (data or {}).get("pairs") or [] if not pairs: return None best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0)) mc = best.get("marketCap") fdv = best.get("fdv") if isinstance(mc, (int, float)) and mc > 0: return float(mc) if isinstance(fdv, (int, float)) and fdv > 0: return float(fdv) return None except: return None def build_midas_message_for_ca(ca: str, tier_label: str, *, mcap_value=None, dexs_link: Optional[str] = None) -> str: axiom_link = "https://axiom.trade/@1144321" mcap_line = f"MCAP (est.): {_fmt_big_usd(mcap_value)}" first_alert = _M_LIST[0] if _M_LIST else "1.5" lines = [ "✨ **MidasTouch — Fresh Alpha Drop**", "", f"**Tier Now:** {tier_label}", "", "Here’s a fresh contract worth a look:", f"**CA**: `{ca}`", "", mcap_line, "", "Quick actions:", f"• 🔎 [Open on Dexscreener]({dexs_link})", f"• 🛒 [Trade with Axiom]({axiom_link})", "", f"Auto-track armed → first alert at **{first_alert}×**; we hunt momentum to price discovery — no ceilings. 🎯", "", "Plan the trade, trade the plan. Cut losers quick, compound the wins.", ] return "\n".join(lines) # ========================= # Post helpers # ========================= async def _send_initial(src_msg, text: str) -> int: if DRY_RUN: print("[DRY_RUN] send_initial:", text[:140]) return -1 if INCLUDE_MEDIA and is_image_message(src_msg) and not media_too_big(src_msg): try: if getattr(src_msg, "photo", None): m = await client.send_file(TARGET_CHAT, src_msg.photo, caption=text, caption_entities=None, force_document=False) return m.id doc = getattr(src_msg, "document", None) if doc: data = await client.download_media(src_msg, file=bytes) if data: bio = io.BytesIO(data) ext = ".jpg" mt = (getattr(doc, "mime_type", "") or "").lower() if mt: ext_guess = guess_extension(mt) or ".jpg" if ext_guess == ".jpe": ext_guess = ".jpg" ext = ext_guess bio.name = f"media{ext}" m = await client.send_file(TARGET_CHAT, bio, caption=text, caption_entities=None, force_document=False) return m.id except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) return await _send_initial(src_msg, text) except Exception as e: debug_log("Gagal kirim media awal, fallback text", str(e)) try: m = await client.send_message(TARGET_CHAT, text, link_preview=True) return m.id except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) return await _send_initial(src_msg, text) async def post_or_update(keyword: str, body: str, new_tier: str, src_msg, *, update_like: bool = False, allow_tier_upgrade: bool = True) -> None: prev = last_posted.get(keyword) if _is_ca_key(keyword): ca_val = _ca_from_key(keyword) mcap_val = await _fetch_initial_mcap(ca_val) dexs_link = await _dexs_link_universal(ca_val, body) text_to_send = build_midas_message_for_ca(ca_val, new_tier, mcap_value=mcap_val, dexs_link=dexs_link) else: # fallback (non-CA) — jarang dipakai karena filter CA-only text_to_send = f"[{new_tier}]\n\n{body}" # --- CASE: first time post this CA key --- if not prev: msg_id = await _send_initial(src_msg, text_to_send) last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} if msg_id != -1: db_save_last_posted(keyword, msg_id, new_tier) # ===== Summary log: catat post awal ===== try: now_utc = int(datetime.now(timezone.utc).timestamp()) db_summary_log_call(keyword, msg_id, new_tier, now_utc) except Exception as e: print(f"[SUMMARY] log call failed: {e}") # >>> Autotrack manual arm: start tracker begitu pesan awal terkirim try: from autotrack import tracker if tracker is not None and _is_ca_key(keyword): ca_val = _ca_from_key(keyword) # tidak await: biar non-blocking asyncio.create_task(tracker.start(ca=ca_val, basis="mcap")) print(f"[BOOT] autotrack armed manually for {ca_val}") except Exception as e: print(f"[BOOT] autotrack manual start failed: {e}") return # --- CASE: tier upgrade path --- # SELALU EDIT post awal saat tier naik (tanpa reply), supaya reply HANYA dari autotrack if TIER_ORDER.get(new_tier, 0) > TIER_ORDER.get(prev["tier"], 0): try: await client.edit_message(TARGET_CHAT, prev["msg_id"], text_to_send) prev["tier"] = new_tier if prev["msg_id"] != -1: db_save_last_posted(keyword, prev["msg_id"], new_tier) except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) try: await client.edit_message(TARGET_CHAT, prev["msg_id"], text_to_send) prev["tier"] = new_tier if prev["msg_id"] != -1: db_save_last_posted(keyword, prev["msg_id"], new_tier) except Exception as e2: debug_log("Edit gagal (tier naik), fallback kirim baru (no-reply)", str(e2)) msg_id = await _send_initial(src_msg, text_to_send) last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} if msg_id != -1: db_save_last_posted(keyword, msg_id, new_tier) return # --- CASE: update-like tanpa upgrade tier --- # Semua reply/update-like diserahkan ke AUTOTRACK — botsignal TIDAK kirim reply if not update_like: return debug_log("Skip reply/update karena handled by autotrack", keyword) return # ========================= # Raw forward helper # ========================= async def send_as_is(msg, text_override: Optional[str] = None) -> None: if DRY_RUN: print("[DRY_RUN] send_as_is:", (text_override or msg.message or "")[:140]) return if text_override is not None: orig_text = text_override entities = None else: orig_text = msg.message or (getattr(msg, "raw_text", None) or "") entities = getattr(msg, "entities", None) if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg): try: if getattr(msg, "photo", None): await client.send_file(TARGET_CHAT, msg.photo, caption=orig_text, caption_entities=entities, force_document=False) return doc = getattr(msg, "document", None) if doc: data = await client.download_media(msg, file=bytes) if data: bio = io.BytesIO(data) ext = ".jpg" mt = (getattr(doc, "mime_type", "") or "").lower() if mt: ext_guess = guess_extension(mt) or ".jpg" if ext_guess == ".jpe": ext_guess = ".jpg" ext = ext_guess bio.name = f"media{ext}" await client.send_file(TARGET_CHAT, bio, caption=orig_text, caption_entities=entities, force_document=False) return except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) except Exception as e: debug_log("Gagal kirim sebagai media, fallback ke text", str(e)) try: await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) # ========================= # Keyword & entity extraction # ========================= TICKER_CLEAN_RE = re.compile(r"\$[A-Za-z0-9]{2,12}") TICKER_NOISY_RE = re.compile(r"\$[A-Za-z0-9](?:[^A-Za-z0-9]+[A-Za-z0-9]){1,11}") def _extract_tickers(text_norm: str) -> List[str]: found = [] for m in TICKER_CLEAN_RE.finditer(text_norm): found.append(m.group(0).lower()) for m in TICKER_NOISY_RE.finditer(text_norm): raw = m.group(0) norm = "$" + re.sub(r"[^A-Za-z0-9]+", "", raw[1:]) if 3 <= len(norm) <= 13: found.append(norm.lower()) seen = set(); uniq = [] for x in found: if x not in seen: uniq.append(x); seen.add(x) return uniq def extract_entity_key(text: str) -> Optional[str]: t = normalize_for_filter(text) m_evm = CA_EVM_RE.search(t) m_sol = CA_SOL_RE.search(t) if m_evm or m_sol: if m_evm: return f"ca:evm:{m_evm.group(0).lower()}" else: return f"ca:sol:{m_sol.group(0)}" # jika ingin dukung ticker: return "ticker:" tickers = _extract_tickers(t.lower()) if tickers: return f"ticker:{tickers[0][1:].lower()}" return None # ========================= # Phanes / leaderboard filter # ========================= PHANES_BOT_ID: Optional[int] = None LB_TEXT_RE = re.compile(r"(🏆\s*Leaderboard|📊\s*Group\s*Stats)", re.IGNORECASE) ZERO_WIDTH_RE = re.compile(r"[\u200b\u200c\u200d\u2060\ufeff]") def _hash_text_1line(t: str) -> str: t1 = re.sub(r"\s+", " ", (t or "")).strip() return hashlib.sha1(t1.encode("utf-8", errors="ignore")).hexdigest() def _normalize_lb_for_hash(t: str) -> str: if not t: return "" t = ZERO_WIDTH_RE.sub("", t) t = re.sub(r"\b\d+(\.\d+)?%?\b", "", t) # angka volatile t = re.sub(r"\s+", " ", t).strip().lower() return t def _is_true_leaderboard(text: str) -> bool: if not text: return False if not re.search(r"🏆\s*Leaderboard", text): return False if not re.search(r"📊\s*Group\s*Stats", text): return False if LB_REQUIRE_MIN_RANKS > 0: ranks = re.findall(r"(?m)^\s*[\W\s]*\d{1,2}\s+.+\[[\d\.]+x\]\s*$", text) if len(ranks) < LB_REQUIRE_MIN_RANKS: return False return True async def _is_phanes_and_not_leaderboard(msg, text: str) -> bool: try: if getattr(msg, "via_bot_id", None) and PHANES_BOT_ID is not None: if int(msg.via_bot_id) == int(PHANES_BOT_ID): return not _is_true_leaderboard(text or "") sender = await msg.get_sender() uname = (getattr(sender, "username", "") or "").lower() if uname == LEADERBOARD_BOT.lstrip("@").lower(): return not _is_true_leaderboard(text or "") except: pass return False # ========================= # Core processing # ========================= def _role_of(chat_id: int) -> str: return chat_roles.get(chat_id, "support") def _unique_counts_by_role(keyword: str) -> Tuple[int, int]: bucket = keyword_group_last_seen.get(keyword, {}) core_ids, sup_ids = set(), set() for gk in bucket.keys(): role = chat_roles.get(int(gk), "support") (core_ids if role == "core" else sup_ids).add(gk) return len(core_ids), len(sup_ids) async def process_message(msg, source_chat_id: int) -> None: orig_text = msg.message or (getattr(msg, "raw_text", None) or "") text_norm = normalize_for_filter(orig_text).lower() if await _is_phanes_and_not_leaderboard(msg, orig_text): debug_log("Skip: pesan Phanes non-leaderboard", orig_text) return for phrase in EXCLUDE_PHRASES: if phrase.lower() in text_norm: debug_log("Dilewati karena EXCLUDE_PHRASES", orig_text) return entity_key = extract_entity_key(orig_text) # STRICT: Only process messages that contain a Contract Address (CA) if not (entity_key and entity_key.startswith("ca:")): debug_log("Bukan pesan CA, dilewati", orig_text) return duplicate_entity = bool(entity_key and entity_key in recent_entity_keys) UPDATE_HINTS = [ r"\bupdate\b", r"\bupd\b", r"\bmcap\b", r"\bmarket\s*cap\b", r"\bhit\b", r"\btp\d?\b", r"\btarget\b", r"\bath\b", r"\bnew\s*high\b", r"\bliq(uidity)?\b", r"\bvolume\b", r"\bnow\b", r"⇒|->|→", r"\bfrom\b.*\bto\b", r"\b\d+(\.\d+)?\s*[km]?\b", ] UPDATE_RE = re.compile("|".join(UPDATE_HINTS), re.IGNORECASE) update_like = bool(UPDATE_RE.search(orig_text or "")) if duplicate_entity and not update_like: debug_log("Entity-duplicate (non-update), dilewati", orig_text) return ch = content_only_hash(orig_text) if ch in recent_content_hashes: debug_log("Content-duplicate (global), dilewati", orig_text) return recent_content_hashes.append(ch) h = hash_for_dedup(text_norm, msg) if h in recent_hashes: debug_log("Duplikat (pesan/media), dilewati", orig_text) return recent_hashes.append(h) score = score_relevance(text_norm, THEME_KEYWORDS) allow_by_ca = bool(entity_key and entity_key.startswith("ca:")) debug_log(f"Skor relevansi={score:.2f} | allow_by_ca={allow_by_ca}", orig_text) if score < RELEVANCE_THRESHOLD and not allow_by_ca: return # tentukan role role = _role_of(source_chat_id) # update bucket unik group_key = str(source_chat_id) now = datetime.now(timezone.utc) class_label, unique_groups, is_new_group = update_and_classify(entity_key, group_key, now) # RULE: Low harus >= LOW_MIN_UNIQUE unik if TIER_ORDER.get(class_label, 0) == TIER_ORDER["Low 🌱"] and unique_groups < LOW_MIN_UNIQUE: debug_log(f"Tahan Low: butuh >= {LOW_MIN_UNIQUE} call (unique_groups={unique_groups})", orig_text) return # Rule lama: tahan support bila belum ada core & support unik belum cukup if role != "core": core_u, sup_u = _unique_counts_by_role(entity_key) if core_u < 1 and sup_u < SUPPORT_MIN_UNIQUE: debug_log(f"Support ditahan (core_u={core_u}, sup_u<{SUPPORT_MIN_UNIQUE})", orig_text) return # RULE BARU (khusus support): Support minimal Medium if role == "support" and TIER_ORDER.get(class_label, 0) < TIER_ORDER["Medium ⚡"]: debug_log("Support Low diblok (minimal Medium)", orig_text) return # bersihkan ajakan/iklan cleaned_body = filter_invite_sentences(orig_text) if not cleaned_body.strip(): debug_log("Semua kalimat terfilter (kosong), dilewati", orig_text) return # cutoff backfill cutoff = startup_time_utc - timedelta(minutes=CLASS_WINDOW_MINUTES + BACKFILL_BUFFER_MINUTES) if getattr(msg, "date", None): msg_dt = msg.date if isinstance(msg_dt, datetime) and msg_dt.replace(tzinfo=timezone.utc) < cutoff: debug_log("Lama (lewat cutoff backfill safety), dilewati", orig_text) return if entity_key: recent_entity_keys.append(entity_key) await post_or_update( entity_key, cleaned_body, class_label, msg, update_like=update_like, allow_tier_upgrade=is_new_group ) debug_log( f"Posted/Edited (role={role}, unique_groups={unique_groups}, new_group={is_new_group}, key={entity_key}, tier={class_label}, update_like={update_like})", orig_text, ) # ========================= # Event handlers # ========================= @client.on(events.NewMessage(chats=SOURCE_CHATS)) async def on_new_message(event): try: # map chat id ke role saat pertama kali terlihat cid = abs(event.chat_id) if cid not in chat_roles: # default ke "support" chat_roles[cid] = "support" await process_message(event.message, source_chat_id=cid) except Exception as e: print(f"Process error di chat {event.chat_id}: {e}") @client.on(events.NewMessage(chats=(LEADERBOARD_GROUP,))) async def on_leaderboard_reply(event): try: msg = event.message text = msg.message or (getattr(msg, "raw_text", "") or "") if not text: return ok_source = False if getattr(msg, "via_bot_id", None) and PHANES_BOT_ID is not None: if int(msg.via_bot_id) == int(PHANES_BOT_ID): ok_source = True if not ok_source: try: sender = await event.get_sender() uname = (getattr(sender, "username", "") or "").lower() if uname == LEADERBOARD_BOT.lstrip("@").lower(): ok_source = True except: pass if not ok_source: return if not _is_true_leaderboard(text): return # cooldown anti-spam global _last_lb_hash, _last_lb_ts try: _last_lb_hash except NameError: _last_lb_hash = None _last_lb_ts = None h = _hash_text_1line(_normalize_lb_for_hash(text)) nowt = asyncio.get_event_loop().time() if _last_lb_hash == h and _last_lb_ts is not None and (nowt - _last_lb_ts) < LEADERBOARD_COOLDOWN_SEC: return _last_lb_hash = h _last_lb_ts = nowt await send_as_is(msg) debug_log("Forward Leaderboard", text[:120]) except Exception as e: debug_log("LB forward error", str(e)) # ========================= # Scheduler: /lb acak # ========================= async def periodic_lb_trigger(): try: lb_ent = await client.get_entity(LEADERBOARD_GROUP) except Exception as e: print(f"[LB-SCHED] Gagal resolve LEADERBOARD_GROUP: {e} — retry 5m") await asyncio.sleep(300) return asyncio.create_task(periodic_lb_trigger()) while True: try: wait_min = max(1, LB_INTERVAL_MIN_HOURS) wait_max = max(wait_min + 1, LB_INTERVAL_MAX_HOURS) delta_hr = random.randint(wait_min, wait_max) jitter_min = random.randint(1, 10) sleep_s = delta_hr * 3600 + jitter_min * 60 print(f"[LB-SCHED] Next /lb in ~{delta_hr}h {jitter_min}m") await asyncio.sleep(sleep_s) await client.send_message(lb_ent, LB_TRIGGER) print("[LB-SCHED] /lb sent") except Exception as e: print("[LB-SCHED] error:", e) await asyncio.sleep(60) # ========================= # Main # ========================= async def main(): await client.start() _late_init_state() # <<< load DB state setelah fungsi2 terdefinisi & client start # (opsional) mulai scheduler leaderboard try: asyncio.create_task(periodic_lb_trigger()) except Exception as e: print("[LB-SCHED] not started:", e) print("INFO: Application startup complete.") print("INFO: Uvicorn running on http://0.0.0.0:7860 (Press CTRL+C to quit)") await client.run_until_disconnected() if __name__ == "__main__": asyncio.run(main()) # ===== Background runner hooks for uvicorn/server.py ===== _AUTOTRACK_ATTACHED_GUARD = True # autotrack sudah di-attach saat import _BOT_STARTED = False _BG_TASKS = [] def db_load_state(): conn = _db() last = {} for kw, mid, tier in conn.execute("SELECT keyword, msg_id, tier FROM last_posted"): last[kw] = {"msg_id": mid, "tier": tier} kw_map: defaultdict[str, dict[str, datetime]] = defaultdict(dict) for kw, gk, ts in conn.execute("SELECT keyword, group_key, last_ts FROM kw_group_seen"): kw_map[kw][gk] = datetime.fromtimestamp(ts, tz=timezone.utc) conn.close() return last, kw_map def db_save_last_posted(keyword: str, msg_id: int, tier: str): conn = _db() conn.execute( "INSERT INTO last_posted(keyword, msg_id, tier) VALUES(?,?,?) " "ON CONFLICT(keyword) DO UPDATE SET msg_id=excluded.msg_id, tier=excluded.tier", (keyword, msg_id, tier), ) conn.commit() conn.close() def db_upsert_kw_seen(keyword: str, group_key: str, ts: datetime): conn = _db() conn.execute( "INSERT INTO kw_group_seen(keyword, group_key, last_ts) VALUES(?,?,?) " "ON CONFLICT(keyword, group_key) DO UPDATE SET last_ts=excluded.last_ts", (keyword, group_key, int(ts.timestamp())), ) conn.commit() conn.close() def db_prune_expired(cutoff: datetime): conn = _db() conn.execute("DELETE FROM kw_group_seen WHERE last_ts < ?", (int(cutoff.timestamp()),)) conn.commit() conn.close() async def start_bot_background(): """ Dipanggil 1x saat startup server ASGI. Menjalankan Telethon client + scheduler di background. """ global _BOT_STARTED, _BG_TASKS if _BOT_STARTED: print("[BOOT] background bot already started; skip") return await client.start() _late_init_state() # <<< pastikan state ke-load saat server start background try: _BG_TASKS.append(asyncio.create_task(periodic_lb_trigger())) except Exception as e: print("[LB-SCHED] not started in background:", e) _BG_TASKS.append(asyncio.create_task(client.run_until_disconnected())) _BOT_STARTED = True print("[BOOT] background bot started ✓") async def stop_bot_background(): """ (Opsional) panggil saat shutdown untuk mematikan rapi. """ global _BOT_STARTED try: for t in list(_BG_TASKS): t.cancel() except Exception: pass try: await client.disconnect() except Exception: pass _BOT_STARTED = False print("[BOOT] background bot stopped ✓")