# autotrack.py — Unlimited milestones + auto-delete on drawdown + video ≥3× + daily summary hooks import os import re import time import asyncio import aiohttp import sqlite3 from dataclasses import dataclass, field from typing import Optional, Dict, Tuple from datetime import datetime, timezone, timedelta from telethon import TelegramClient, events # ========================= # ENV / CONFIG # ========================= API_ID = int(os.environ.get("API_ID", "0")) # TIDAK dipakai langsung di file ini API_HASH = os.environ.get("API_HASH", "") # — client disuntik dari botsignal STRING_SESSION = os.environ.get("STRING_SESSION", "") # — # Announce ke grup kamu (dipakai juga sebagai sumber pesan) TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll") # Polling interval harga TRACK_POLL_SECS = int(os.environ.get("TRACK_POLL_SECS", "20")) # Ambang reply awal (default 1.5×) REPLY_FROM_MULTIPLE = float(os.environ.get("REPLY_FROM_MULTIPLE", "1.5")) # Unlimited mode: selalu lanjut 2×, 3×, 4×, ... STOP_WHEN_HIT = False # Abaikan pesan lebih tua dari (startup - buffer) BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3")) # Marker antirecursive: ditambahkan ke semua pengumuman bot (milestone) BOT_MARKER = "【MT-AUTOTRACK】" # Lokasi DB milik botsignal (untuk ambil msg_id pesan awal + summary hooks) BOTSIGNAL_DB = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db") # NEW: Auto-delete bila drawdown >= threshold (rasio relatif terhadap entry) # Contoh: 0.5 berarti -50%. Set 0 untuk mematikan. DROP_DELETE_RATIO = float(os.environ.get("DROP_DELETE_RATIO", "0.5")) # (opsional) juga hapus thread balasan bot yang bertanda BOT_MARKER DELETE_THREAD_REPLIES = os.environ.get("DELETE_THREAD_REPLIES", "0") == "1" # >>> VIDEO milestone ≥ 3× MILESTONE_VIDEO = os.environ.get("MILESTONE_VIDEO", "Generating_To_The_Moon_Animation.mp4") # FIX: gunakan absolute agar aman di Docker/HF VIDEO_PATH = os.path.abspath(MILESTONE_VIDEO) VIDEO_MIN_MULTIPLE = float(os.environ.get("VIDEO_MIN_MULTIPLE", "3.0")) # ========================= # HTTP endpoints # ========================= DEXSCREENER_TOKEN_URL = "https://api.dexscreener.com/latest/dex/tokens/" JUPITER_URL = "https://price.jup.ag/v6/price?ids=" # free (Solana only) # ========================= # Helpers # ========================= def _fmt_money(x: Optional[float]) -> str: if x is None: return "?" if x >= 1: return f"${x:,.4f}" return f"${x:.8f}" def _fmt_big(x: Optional[float]) -> str: if x is None: return "?" if x >= 1_000_000_000: return f"${x/1_000_000_000:.2f}B" if x >= 1_000_000: return f"${x/1_000_000:.2f}M" if x >= 1_000: return f"${x/1_000:.2f}K" return f"${x:.0f}" def _milestone_badge(m: float, basis: str) -> str: unit = "Market Cap" if basis == "mcap" else "Price" if m < 2.0 - 1e-9: return f"🟨 **ARMED — {m:.1f}× {unit}**" elif abs(m - 2.0) < 1e-9:return f"🟩 **DOUBLE UP — 2× {unit}!**" elif abs(m - 3.0) < 1e-9:return f"🔷 **TRIPLE — 3× {unit}!**" elif 3.0 < m < 6.0: return f"🔶 **RALLY — {m:.0f}× {unit}!**" elif 6.0 <= m < 10.0: return f"🟥 **OVERDRIVE — {m:.0f}× {unit}!**" else: return f"🟣 **GOD CANDLE — {m:.0f}× {unit}!!**" # ========================= # CA extraction # ========================= CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*(\S+)", re.IGNORECASE) def extract_ca(text: str) -> Optional[str]: if not text: return None m = CA_LABEL_RE.search(text) if m: cand = m.group(1) if CA_EVM_RE.fullmatch(cand): return cand.lower() if CA_SOL_RE.fullmatch(cand): return cand m2 = CA_EVM_RE.search(text or "") if m2: return m2.group(0).lower() m3 = CA_SOL_RE.search(text or "") if m3: return m3.group(0) return None def ca_key_for_db(ca: str) -> Optional[str]: if not ca: return None if CA_EVM_RE.fullmatch(ca): return f"ca:evm:{ca.lower()}" if CA_SOL_RE.fullmatch(ca): return f"ca:sol:{ca}" return None def ensure_db(): try: conn = sqlite3.connect(BOTSIGNAL_DB) conn.executescript(""" CREATE TABLE IF NOT EXISTS last_posted ( keyword TEXT PRIMARY KEY, msg_id INTEGER NOT NULL, tier TEXT ); """) conn.commit() conn.close() except Exception as e: print(f"[AUTOTRACK] DB init error: {e}") def lookup_origin_msg_id(keyword: str) -> Optional[int]: try: conn = sqlite3.connect(BOTSIGNAL_DB) cur = conn.cursor() cur.execute("SELECT msg_id FROM last_posted WHERE keyword = ?", (keyword,)) row = cur.fetchone() conn.close() if row and isinstance(row[0], int): return row[0] return None except Exception as e: print(f"[TRACK][DB] lookup error: {e}") return None # ========================= # SUMMARY HOOKS (harian) # ========================= def _summary_db(): conn = sqlite3.connect(BOTSIGNAL_DB) conn.execute("PRAGMA journal_mode=WAL;") return conn def summary_init_tables(): try: conn = _summary_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS summary_milestones ( keyword TEXT NOT NULL, reply_msg_id INTEGER, multiple REAL, replied_at INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS summary_outcomes ( keyword TEXT PRIMARY KEY, is_deleted INTEGER DEFAULT 0 ); """) conn.commit() conn.close() except Exception as e: print(f"[SUMMARY] init tables failed: {e}") def _summary_log_milestone(keyword: str, reply_msg_id: Optional[int], multiple: float, ts_utc: int): try: conn = _summary_db() conn.execute( "INSERT INTO summary_milestones(keyword, reply_msg_id, multiple, replied_at) VALUES(?,?,?,?)", (keyword, reply_msg_id, float(multiple), int(ts_utc)) ) conn.commit() conn.close() except Exception as e: print(f"[SUMMARY] log milestone failed: {e}") def _summary_mark_deleted(keyword: str): try: conn = _summary_db() conn.execute( "INSERT INTO summary_outcomes(keyword, is_deleted) VALUES(?,1) " "ON CONFLICT(keyword) DO UPDATE SET is_deleted=1", (keyword,) ) conn.commit() conn.close() except Exception as e: print(f"[SUMMARY] mark deleted failed: {e}") # ========================= # Tracker core (UNLIMITED) # ========================= @dataclass class TrackItem: ca: str basis: str = "mcap" # "mcap" or "price" entry_price: Optional[float] = None entry_mcap: Optional[float] = None symbol_hint: Optional[float] = None source_link: Optional[str] = None poll_secs: int = 20 next_milestone: float = field(default_factory=lambda: REPLY_FROM_MULTIPLE) started_at: float = field(default_factory=time.time) class PriceTracker: def __init__(self, client: TelegramClient, announce_chat): self.client = client self.announce_chat = announce_chat self._tasks: Dict[str, asyncio.Task] = {} self._items: Dict[str, TrackItem] = {} # -------- HTTP helpers -------- async def _get(self, url: str, headers: Optional[Dict[str, str]] = None, timeout: int = 8): tout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(timeout=tout, headers=headers) as sess: async with sess.get(url) as r: if r.status != 200: return None return await r.json() async def _dexscreener_price(self, ca: str) -> Optional[Tuple[Optional[float], Optional[str], Optional[float]]]: data = await self._get(DEXSCREENER_TOKEN_URL + ca) if not data: return None pairs = data.get("pairs") or [] if not pairs: return None best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0)) price = float(best.get("priceUsd")) if best.get("priceUsd") else None symbol = ((best.get("baseToken") or {}).get("symbol")) or None mcap = None fdv = best.get("fdv") if isinstance(fdv, (int, float)) and fdv > 0: mcap = float(fdv) mc = best.get("marketCap") if isinstance(mc, (int, float)) and mc > 0: mcap = float(mc) return (price, symbol, mcap) async def _jupiter_price(self, ca: str) -> Optional[float]: try: data = await self._get(JUPITER_URL + ca) if not data: return None d = (data.get("data") or {}).get(ca) if not d: return None p = d.get("price") return float(p) if p is not None else None except: return None async def _get_snapshot(self, ca: str) -> Optional[Dict[str, Optional[float]]]: res = await self._dexscreener_price(ca) if res: price, sym, mcap = res if price is None: jp = await self._jupiter_price(ca) price = jp if jp is not None else None return {"price": price, "mcap": mcap, "symbol_hint": sym} jp = await self._jupiter_price(ca) if jp is not None: return {"price": jp, "mcap": None, "symbol_hint": None} return None # -------- Announce helpers -------- def _name(self, item: TrackItem) -> str: s = item.symbol_hint or "" return f"{s} ({item.ca[:4]}…{item.ca[-4:]})" if s else item.ca def _milestone_text(self, item: TrackItem, m: float, ratio: float, cur_price: Optional[float], cur_mcap: Optional[float]) -> str: # NOTE: Dexs link diset ke pola Solana untuk CA sol; untuk EVM bisa kamu tingkatkan bila mau dexs_link = f"https://dexscreener.com/solana/{item.ca}" if CA_SOL_RE.fullmatch(item.ca) else f"https://dexscreener.com/ethereum/{item.ca.lower()}" axiom_link = "https://axiom.trade/@1144321" if item.basis == "mcap": change = f"{_fmt_big(item.entry_mcap)} → {_fmt_big(cur_mcap)} (~{ratio:.2f}×)" else: change = f"{_fmt_money(item.entry_price)} → {_fmt_money(cur_price)} (~{ratio:.2f}×)" milestone_line = _milestone_badge(m, item.basis) elapsed = int(time.time() - item.started_at) mm, ss = divmod(elapsed, 60) header = "💎 [MidasTouch Signal] 💎\n━━━━━━━━━━━━━━━━" footer = "━━━━━━━━━━━━━━━━" body = ( f"{header}\n" f"{milestone_line}\n" f"{self._name(item)}\n" f"{change}\n" f"⏱️ {mm}m {ss}s since call\n" f"🔎 [Dexscreener]({dexs_link})\n" f"🛒 [Trade on Axiom]({axiom_link})\n" f"CA: `{item.ca}`\n" f"{footer}" ) return body def _next_target_after(self, current_target: float) -> float: return 2.0 if current_target < 2.0 else current_target + 1.0 async def _delete_origin_and_replies(self, ca: str): try: key = ca_key_for_db(ca) msg_id = lookup_origin_msg_id(key) if key else None if not msg_id: return await self.client.delete_messages(self.announce_chat, msg_id) print(f"[TRACK] Deleted origin message for {ca}") # === Summary: mark lose try: if key: _summary_mark_deleted(key) except Exception as e: print(f"[SUMMARY] mark deleted hook err: {e}") if DELETE_THREAD_REPLIES: try: async for m in self.client.iter_messages(self.announce_chat, limit=100, reply_to=msg_id): txt = (m.message or "") if hasattr(m, "message") else "" if BOT_MARKER in (txt or ""): await self.client.delete_messages(self.announce_chat, m.id) print(f"[TRACK] Deleted thread replies for {ca}") except Exception as e: print(f"[TRACK] delete thread replies failed: {e}") except Exception as e: print(f"[TRACK] delete origin failed: {e}") # -------- Loop -------- async def _loop(self, item: TrackItem): try: snap = await self._get_snapshot(item.ca) if not snap: print(f"[TRACK] init snapshot failed for {item.ca}") return if item.entry_price is None: item.entry_price = snap.get("price") if item.entry_mcap is None: item.entry_mcap = snap.get("mcap") if not item.symbol_hint and snap.get("symbol_hint"): item.symbol_hint = snap.get("symbol_hint") if item.basis == "mcap" and not item.entry_mcap: item.basis = "price" while True: try: snap = await self._get_snapshot(item.ca) if not snap: await asyncio.sleep(item.poll_secs) continue cur_price = snap.get("price") cur_mcap = snap.get("mcap") if not item.symbol_hint and snap.get("symbol_hint"): item.symbol_hint = snap.get("symbol_hint") if item.basis == "mcap": if not (item.entry_mcap and cur_mcap): await asyncio.sleep(item.poll_secs); continue ratio = (cur_mcap / item.entry_mcap) if item.entry_mcap > 0 else 0.0 else: if not (item.entry_price and cur_price): await asyncio.sleep(item.poll_secs); continue ratio = (cur_price / item.entry_price) if item.entry_price > 0 else 0.0 # Auto-delete on drawdown if DROP_DELETE_RATIO > 0 and ratio <= DROP_DELETE_RATIO: await self._delete_origin_and_replies(item.ca) return # stop tracking setelah dihapus # Milestones while ratio >= item.next_milestone: text = self._milestone_text(item, item.next_milestone, ratio, cur_price, cur_mcap) key = ca_key_for_db(item.ca) reply_to_id = lookup_origin_msg_id(key) if key else None sent_msg = None if item.next_milestone >= VIDEO_MIN_MULTIPLE and os.path.isfile(VIDEO_PATH): try: sent_msg = await self.client.send_file( self.announce_chat, VIDEO_PATH, caption=f"{text}\n\n{BOT_MARKER}", reply_to=reply_to_id if reply_to_id else None, force_document=False ) except Exception as e: print(f"[TRACK] gagal kirim video: {e}") else: sent_msg = await self.client.send_message( self.announce_chat, f"{text}\n\n{BOT_MARKER}", reply_to=reply_to_id if reply_to_id else None, link_preview=False ) # === Summary: log milestone (win signal) try: if key: ts_utc = int(time.time()) rep_id = getattr(sent_msg, "id", None) if sent_msg else None _summary_log_milestone(key, rep_id, item.next_milestone, ts_utc) except Exception as e: print(f"[SUMMARY] milestone hook err: {e}") item.next_milestone = self._next_target_after(item.next_milestone) await asyncio.sleep(item.poll_secs) except Exception as e: print(f"[TRACK] loop error: {e}") await asyncio.sleep(item.poll_secs) except Exception as e: print(f"[TRACK] fatal loop init error for {item.ca}: {e}") # -------- Public -------- def is_tracking(self, ca: str) -> bool: t = self._tasks.get(ca) return bool(t) and not t.done() async def start(self, ca: str, *, basis: str = "mcap", entry_price: Optional[float] = None, entry_mcap: Optional[float] = None, poll_secs: int = 20, source_link: Optional[str] = None, symbol_hint: Optional[str] = None): ca = ca.strip() if self.is_tracking(ca): return item = TrackItem( ca=ca, basis=basis.lower(), entry_price=entry_price, entry_mcap=entry_mcap, poll_secs=poll_secs, source_link=source_link, symbol_hint=symbol_hint, next_milestone=REPLY_FROM_MULTIPLE, ) self._tasks[ca] = asyncio.create_task(self._loop(item)) # ========================= # Setup: attach to existing client from botsignal # ========================= client: TelegramClient | None = None startup_time_utc: Optional[datetime] = None me_user_id: Optional[int] = None tracker: PriceTracker | None = None CUTOFF_BUFFER = timedelta(minutes=BACKFILL_BUFFER_MINUTES) def _is_old_message(msg_dt: Optional[datetime]) -> bool: if not isinstance(msg_dt, datetime): return False return msg_dt.replace(tzinfo=timezone.utc) < (startup_time_utc - CUTOFF_BUFFER) def _is_bot_own_message(event) -> bool: txt = (event.raw_text or "") if hasattr(event, "raw_text") else "" return BOT_MARKER in (txt or "") async def on_new_autotrack_message(event): try: if _is_bot_own_message(event): return msg = event.message if _is_old_message(getattr(msg, "date", None)): return text = msg.message or (getattr(msg, "raw_text", None) or "") ca = extract_ca(text) if not ca: return source_link = None try: chat = await event.get_chat() uname = getattr(chat, "username", None) mid = getattr(msg, "id", None) if uname and mid: source_link = f"https://t.me/{uname}/{mid}" except: pass await tracker.start( ca=ca, basis="mcap", poll_secs=TRACK_POLL_SECS, source_link=source_link, ) except Exception as e: print(f"[AUTOTRACK] error: {e}") def setup_autotrack(shared_client: TelegramClient, announce_chat: str | None = None): """ Dipanggil dari botsignal.py setelah client dibuat & (idealnya) sebelum start(). """ global client, tracker, startup_time_utc client = shared_client ensure_db() summary_init_tables() # <<< summary tables ac = announce_chat or TARGET_CHAT tracker = PriceTracker(client, announce_chat=ac) startup_time_utc = datetime.now(timezone.utc) # DAFTARKAN HANDLER DENGAN NAMA BERBEDA (FIX BENTROK) client.add_event_handler(on_new_autotrack_message, events.NewMessage(chats=(TARGET_CHAT,))) print("[AUTOTRACK] attached to shared client; listening on", ac) # Tidak ada build_client() maupun main() di sini — kita sengaja # agar autotrack SELALU menumpang client milik botsignal.