Spaces:
Sleeping
Sleeping
| # autotrack.py β Unlimited milestones + auto-delete on drawdown + video β₯3Γ + daily summary hooks | |
| import os | |
| import re | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import sqlite3 | |
| from dataclasses import dataclass, field | |
| from typing import Optional, Dict, Tuple | |
| from datetime import datetime, timezone, timedelta | |
| from telethon import TelegramClient, events | |
| # ========================= | |
| # ENV / CONFIG | |
| # ========================= | |
| API_ID = int(os.environ.get("API_ID", "0")) # TIDAK dipakai langsung di file ini | |
| API_HASH = os.environ.get("API_HASH", "") # β client disuntik dari botsignal | |
| STRING_SESSION = os.environ.get("STRING_SESSION", "") # β | |
| # Announce ke grup kamu (dipakai juga sebagai sumber pesan) | |
| TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll") | |
| # Polling interval harga | |
| TRACK_POLL_SECS = int(os.environ.get("TRACK_POLL_SECS", "20")) | |
| # Ambang reply awal (default 1.5Γ) | |
| REPLY_FROM_MULTIPLE = float(os.environ.get("REPLY_FROM_MULTIPLE", "1.5")) | |
| # Unlimited mode: selalu lanjut 2Γ, 3Γ, 4Γ, ... | |
| STOP_WHEN_HIT = False | |
| # Abaikan pesan lebih tua dari (startup - buffer) | |
| BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3")) | |
| # Marker antirecursive: ditambahkan ke semua pengumuman bot (milestone) | |
| BOT_MARKER = "γMT-AUTOTRACKγ" | |
| # Lokasi DB milik botsignal (untuk ambil msg_id pesan awal + summary hooks) | |
| BOTSIGNAL_DB = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db") | |
| # NEW: Auto-delete bila drawdown >= threshold (rasio relatif terhadap entry) | |
| # Contoh: 0.5 berarti -50%. Set 0 untuk mematikan. | |
| DROP_DELETE_RATIO = float(os.environ.get("DROP_DELETE_RATIO", "0.5")) | |
| # (opsional) juga hapus thread balasan bot yang bertanda BOT_MARKER | |
| DELETE_THREAD_REPLIES = os.environ.get("DELETE_THREAD_REPLIES", "0") == "1" | |
| # >>> VIDEO milestone β₯ 3Γ | |
| MILESTONE_VIDEO = os.environ.get("MILESTONE_VIDEO", "Generating_To_The_Moon_Animation.mp4") | |
| # FIX: gunakan absolute agar aman di Docker/HF | |
| VIDEO_PATH = os.path.abspath(MILESTONE_VIDEO) | |
| VIDEO_MIN_MULTIPLE = float(os.environ.get("VIDEO_MIN_MULTIPLE", "3.0")) | |
| # ========================= | |
| # HTTP endpoints | |
| # ========================= | |
| DEXSCREENER_TOKEN_URL = "https://api.dexscreener.com/latest/dex/tokens/" | |
| JUPITER_URL = "https://price.jup.ag/v6/price?ids=" # free (Solana only) | |
| # ========================= | |
| # Helpers | |
| # ========================= | |
| def _fmt_money(x: Optional[float]) -> str: | |
| if x is None: | |
| return "?" | |
| if x >= 1: | |
| return f"${x:,.4f}" | |
| return f"${x:.8f}" | |
| def _fmt_big(x: Optional[float]) -> str: | |
| if x is None: | |
| return "?" | |
| if x >= 1_000_000_000: | |
| return f"${x/1_000_000_000:.2f}B" | |
| if x >= 1_000_000: | |
| return f"${x/1_000_000:.2f}M" | |
| if x >= 1_000: | |
| return f"${x/1_000:.2f}K" | |
| return f"${x:.0f}" | |
| def _milestone_badge(m: float, basis: str) -> str: | |
| unit = "Market Cap" if basis == "mcap" else "Price" | |
| if m < 2.0 - 1e-9: return f"π¨ **ARMED β {m:.1f}Γ {unit}**" | |
| elif abs(m - 2.0) < 1e-9:return f"π© **DOUBLE UP β 2Γ {unit}!**" | |
| elif abs(m - 3.0) < 1e-9:return f"π· **TRIPLE β 3Γ {unit}!**" | |
| elif 3.0 < m < 6.0: return f"πΆ **RALLY β {m:.0f}Γ {unit}!**" | |
| elif 6.0 <= m < 10.0: return f"π₯ **OVERDRIVE β {m:.0f}Γ {unit}!**" | |
| else: return f"π£ **GOD CANDLE β {m:.0f}Γ {unit}!!**" | |
| # ========================= | |
| # CA extraction | |
| # ========================= | |
| CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") | |
| CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") | |
| CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*(\S+)", re.IGNORECASE) | |
| def extract_ca(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| m = CA_LABEL_RE.search(text) | |
| if m: | |
| cand = m.group(1) | |
| if CA_EVM_RE.fullmatch(cand): return cand.lower() | |
| if CA_SOL_RE.fullmatch(cand): return cand | |
| m2 = CA_EVM_RE.search(text or "") | |
| if m2: return m2.group(0).lower() | |
| m3 = CA_SOL_RE.search(text or "") | |
| if m3: return m3.group(0) | |
| return None | |
| def ca_key_for_db(ca: str) -> Optional[str]: | |
| if not ca: | |
| return None | |
| if CA_EVM_RE.fullmatch(ca): return f"ca:evm:{ca.lower()}" | |
| if CA_SOL_RE.fullmatch(ca): return f"ca:sol:{ca}" | |
| return None | |
| def ensure_db(): | |
| try: | |
| conn = sqlite3.connect(BOTSIGNAL_DB) | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS last_posted ( | |
| keyword TEXT PRIMARY KEY, | |
| msg_id INTEGER NOT NULL, | |
| tier TEXT | |
| ); | |
| """) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"[AUTOTRACK] DB init error: {e}") | |
| def lookup_origin_msg_id(keyword: str) -> Optional[int]: | |
| try: | |
| conn = sqlite3.connect(BOTSIGNAL_DB) | |
| cur = conn.cursor() | |
| cur.execute("SELECT msg_id FROM last_posted WHERE keyword = ?", (keyword,)) | |
| row = cur.fetchone() | |
| conn.close() | |
| if row and isinstance(row[0], int): | |
| return row[0] | |
| return None | |
| except Exception as e: | |
| print(f"[TRACK][DB] lookup error: {e}") | |
| return None | |
| # ========================= | |
| # SUMMARY HOOKS (harian) | |
| # ========================= | |
| def _summary_db(): | |
| conn = sqlite3.connect(BOTSIGNAL_DB) | |
| conn.execute("PRAGMA journal_mode=WAL;") | |
| return conn | |
| def summary_init_tables(): | |
| try: | |
| conn = _summary_db() | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS summary_milestones ( | |
| keyword TEXT NOT NULL, | |
| reply_msg_id INTEGER, | |
| multiple REAL, | |
| replied_at INTEGER NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS summary_outcomes ( | |
| keyword TEXT PRIMARY KEY, | |
| is_deleted INTEGER DEFAULT 0 | |
| ); | |
| """) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"[SUMMARY] init tables failed: {e}") | |
| def _summary_log_milestone(keyword: str, reply_msg_id: Optional[int], multiple: float, ts_utc: int): | |
| try: | |
| conn = _summary_db() | |
| conn.execute( | |
| "INSERT INTO summary_milestones(keyword, reply_msg_id, multiple, replied_at) VALUES(?,?,?,?)", | |
| (keyword, reply_msg_id, float(multiple), int(ts_utc)) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"[SUMMARY] log milestone failed: {e}") | |
| def _summary_mark_deleted(keyword: str): | |
| try: | |
| conn = _summary_db() | |
| conn.execute( | |
| "INSERT INTO summary_outcomes(keyword, is_deleted) VALUES(?,1) " | |
| "ON CONFLICT(keyword) DO UPDATE SET is_deleted=1", | |
| (keyword,) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"[SUMMARY] mark deleted failed: {e}") | |
| # ========================= | |
| # Tracker core (UNLIMITED) | |
| # ========================= | |
| class TrackItem: | |
| ca: str | |
| basis: str = "mcap" # "mcap" or "price" | |
| entry_price: Optional[float] = None | |
| entry_mcap: Optional[float] = None | |
| symbol_hint: Optional[float] = None | |
| source_link: Optional[str] = None | |
| poll_secs: int = 20 | |
| next_milestone: float = field(default_factory=lambda: REPLY_FROM_MULTIPLE) | |
| started_at: float = field(default_factory=time.time) | |
| class PriceTracker: | |
| def __init__(self, client: TelegramClient, announce_chat): | |
| self.client = client | |
| self.announce_chat = announce_chat | |
| self._tasks: Dict[str, asyncio.Task] = {} | |
| self._items: Dict[str, TrackItem] = {} | |
| # -------- HTTP helpers -------- | |
| async def _get(self, url: str, headers: Optional[Dict[str, str]] = None, timeout: int = 8): | |
| tout = aiohttp.ClientTimeout(total=timeout) | |
| async with aiohttp.ClientSession(timeout=tout, headers=headers) as sess: | |
| async with sess.get(url) as r: | |
| if r.status != 200: | |
| return None | |
| return await r.json() | |
| async def _dexscreener_price(self, ca: str) -> Optional[Tuple[Optional[float], Optional[str], Optional[float]]]: | |
| data = await self._get(DEXSCREENER_TOKEN_URL + ca) | |
| if not data: | |
| return None | |
| pairs = data.get("pairs") or [] | |
| if not pairs: | |
| return None | |
| best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0)) | |
| price = float(best.get("priceUsd")) if best.get("priceUsd") else None | |
| symbol = ((best.get("baseToken") or {}).get("symbol")) or None | |
| mcap = None | |
| fdv = best.get("fdv") | |
| if isinstance(fdv, (int, float)) and fdv > 0: | |
| mcap = float(fdv) | |
| mc = best.get("marketCap") | |
| if isinstance(mc, (int, float)) and mc > 0: | |
| mcap = float(mc) | |
| return (price, symbol, mcap) | |
| async def _jupiter_price(self, ca: str) -> Optional[float]: | |
| try: | |
| data = await self._get(JUPITER_URL + ca) | |
| if not data: | |
| return None | |
| d = (data.get("data") or {}).get(ca) | |
| if not d: | |
| return None | |
| p = d.get("price") | |
| return float(p) if p is not None else None | |
| except: | |
| return None | |
| async def _get_snapshot(self, ca: str) -> Optional[Dict[str, Optional[float]]]: | |
| res = await self._dexscreener_price(ca) | |
| if res: | |
| price, sym, mcap = res | |
| if price is None: | |
| jp = await self._jupiter_price(ca) | |
| price = jp if jp is not None else None | |
| return {"price": price, "mcap": mcap, "symbol_hint": sym} | |
| jp = await self._jupiter_price(ca) | |
| if jp is not None: | |
| return {"price": jp, "mcap": None, "symbol_hint": None} | |
| return None | |
| # -------- Announce helpers -------- | |
| def _name(self, item: TrackItem) -> str: | |
| s = item.symbol_hint or "" | |
| return f"{s} ({item.ca[:4]}β¦{item.ca[-4:]})" if s else item.ca | |
| def _milestone_text(self, item: TrackItem, m: float, ratio: float, | |
| cur_price: Optional[float], cur_mcap: Optional[float]) -> str: | |
| # NOTE: Dexs link diset ke pola Solana untuk CA sol; untuk EVM bisa kamu tingkatkan bila mau | |
| dexs_link = f"https://dexscreener.com/solana/{item.ca}" if CA_SOL_RE.fullmatch(item.ca) else f"https://dexscreener.com/ethereum/{item.ca.lower()}" | |
| axiom_link = "https://axiom.trade/@1144321" | |
| if item.basis == "mcap": | |
| change = f"{_fmt_big(item.entry_mcap)} β {_fmt_big(cur_mcap)} (~{ratio:.2f}Γ)" | |
| else: | |
| change = f"{_fmt_money(item.entry_price)} β {_fmt_money(cur_price)} (~{ratio:.2f}Γ)" | |
| milestone_line = _milestone_badge(m, item.basis) | |
| elapsed = int(time.time() - item.started_at) | |
| mm, ss = divmod(elapsed, 60) | |
| header = "π [MidasTouch Signal] π\nββββββββββββββββ" | |
| footer = "ββββββββββββββββ" | |
| body = ( | |
| f"{header}\n" | |
| f"{milestone_line}\n" | |
| f"{self._name(item)}\n" | |
| f"{change}\n" | |
| f"β±οΈ {mm}m {ss}s since call\n" | |
| f"π [Dexscreener]({dexs_link})\n" | |
| f"π [Trade on Axiom]({axiom_link})\n" | |
| f"CA: `{item.ca}`\n" | |
| f"{footer}" | |
| ) | |
| return body | |
| def _next_target_after(self, current_target: float) -> float: | |
| return 2.0 if current_target < 2.0 else current_target + 1.0 | |
| async def _delete_origin_and_replies(self, ca: str): | |
| try: | |
| key = ca_key_for_db(ca) | |
| msg_id = lookup_origin_msg_id(key) if key else None | |
| if not msg_id: | |
| return | |
| await self.client.delete_messages(self.announce_chat, msg_id) | |
| print(f"[TRACK] Deleted origin message for {ca}") | |
| # === Summary: mark lose | |
| try: | |
| if key: | |
| _summary_mark_deleted(key) | |
| except Exception as e: | |
| print(f"[SUMMARY] mark deleted hook err: {e}") | |
| if DELETE_THREAD_REPLIES: | |
| try: | |
| async for m in self.client.iter_messages(self.announce_chat, limit=100, reply_to=msg_id): | |
| txt = (m.message or "") if hasattr(m, "message") else "" | |
| if BOT_MARKER in (txt or ""): | |
| await self.client.delete_messages(self.announce_chat, m.id) | |
| print(f"[TRACK] Deleted thread replies for {ca}") | |
| except Exception as e: | |
| print(f"[TRACK] delete thread replies failed: {e}") | |
| except Exception as e: | |
| print(f"[TRACK] delete origin failed: {e}") | |
| # -------- Loop -------- | |
| async def _loop(self, item: TrackItem): | |
| try: | |
| snap = await self._get_snapshot(item.ca) | |
| if not snap: | |
| print(f"[TRACK] init snapshot failed for {item.ca}") | |
| return | |
| if item.entry_price is None: | |
| item.entry_price = snap.get("price") | |
| if item.entry_mcap is None: | |
| item.entry_mcap = snap.get("mcap") | |
| if not item.symbol_hint and snap.get("symbol_hint"): | |
| item.symbol_hint = snap.get("symbol_hint") | |
| if item.basis == "mcap" and not item.entry_mcap: | |
| item.basis = "price" | |
| while True: | |
| try: | |
| snap = await self._get_snapshot(item.ca) | |
| if not snap: | |
| await asyncio.sleep(item.poll_secs) | |
| continue | |
| cur_price = snap.get("price") | |
| cur_mcap = snap.get("mcap") | |
| if not item.symbol_hint and snap.get("symbol_hint"): | |
| item.symbol_hint = snap.get("symbol_hint") | |
| if item.basis == "mcap": | |
| if not (item.entry_mcap and cur_mcap): | |
| await asyncio.sleep(item.poll_secs); continue | |
| ratio = (cur_mcap / item.entry_mcap) if item.entry_mcap > 0 else 0.0 | |
| else: | |
| if not (item.entry_price and cur_price): | |
| await asyncio.sleep(item.poll_secs); continue | |
| ratio = (cur_price / item.entry_price) if item.entry_price > 0 else 0.0 | |
| # Auto-delete on drawdown | |
| if DROP_DELETE_RATIO > 0 and ratio <= DROP_DELETE_RATIO: | |
| await self._delete_origin_and_replies(item.ca) | |
| return # stop tracking setelah dihapus | |
| # Milestones | |
| while ratio >= item.next_milestone: | |
| text = self._milestone_text(item, item.next_milestone, ratio, cur_price, cur_mcap) | |
| key = ca_key_for_db(item.ca) | |
| reply_to_id = lookup_origin_msg_id(key) if key else None | |
| sent_msg = None | |
| if item.next_milestone >= VIDEO_MIN_MULTIPLE and os.path.isfile(VIDEO_PATH): | |
| try: | |
| sent_msg = await self.client.send_file( | |
| self.announce_chat, | |
| VIDEO_PATH, | |
| caption=f"{text}\n\n{BOT_MARKER}", | |
| reply_to=reply_to_id if reply_to_id else None, | |
| force_document=False | |
| ) | |
| except Exception as e: | |
| print(f"[TRACK] gagal kirim video: {e}") | |
| else: | |
| sent_msg = await self.client.send_message( | |
| self.announce_chat, | |
| f"{text}\n\n{BOT_MARKER}", | |
| reply_to=reply_to_id if reply_to_id else None, | |
| link_preview=False | |
| ) | |
| # === Summary: log milestone (win signal) | |
| try: | |
| if key: | |
| ts_utc = int(time.time()) | |
| rep_id = getattr(sent_msg, "id", None) if sent_msg else None | |
| _summary_log_milestone(key, rep_id, item.next_milestone, ts_utc) | |
| except Exception as e: | |
| print(f"[SUMMARY] milestone hook err: {e}") | |
| item.next_milestone = self._next_target_after(item.next_milestone) | |
| await asyncio.sleep(item.poll_secs) | |
| except Exception as e: | |
| print(f"[TRACK] loop error: {e}") | |
| await asyncio.sleep(item.poll_secs) | |
| except Exception as e: | |
| print(f"[TRACK] fatal loop init error for {item.ca}: {e}") | |
| # -------- Public -------- | |
| def is_tracking(self, ca: str) -> bool: | |
| t = self._tasks.get(ca) | |
| return bool(t) and not t.done() | |
| async def start(self, ca: str, *, basis: str = "mcap", | |
| entry_price: Optional[float] = None, | |
| entry_mcap: Optional[float] = None, | |
| poll_secs: int = 20, | |
| source_link: Optional[str] = None, | |
| symbol_hint: Optional[str] = None): | |
| ca = ca.strip() | |
| if self.is_tracking(ca): | |
| return | |
| item = TrackItem( | |
| ca=ca, | |
| basis=basis.lower(), | |
| entry_price=entry_price, | |
| entry_mcap=entry_mcap, | |
| poll_secs=poll_secs, | |
| source_link=source_link, | |
| symbol_hint=symbol_hint, | |
| next_milestone=REPLY_FROM_MULTIPLE, | |
| ) | |
| self._tasks[ca] = asyncio.create_task(self._loop(item)) | |
| # ========================= | |
| # Setup: attach to existing client from botsignal | |
| # ========================= | |
| client: TelegramClient | None = None | |
| startup_time_utc: Optional[datetime] = None | |
| me_user_id: Optional[int] = None | |
| tracker: PriceTracker | None = None | |
| CUTOFF_BUFFER = timedelta(minutes=BACKFILL_BUFFER_MINUTES) | |
| def _is_old_message(msg_dt: Optional[datetime]) -> bool: | |
| if not isinstance(msg_dt, datetime): | |
| return False | |
| return msg_dt.replace(tzinfo=timezone.utc) < (startup_time_utc - CUTOFF_BUFFER) | |
| def _is_bot_own_message(event) -> bool: | |
| txt = (event.raw_text or "") if hasattr(event, "raw_text") else "" | |
| return BOT_MARKER in (txt or "") | |
| async def on_new_autotrack_message(event): | |
| try: | |
| if _is_bot_own_message(event): | |
| return | |
| msg = event.message | |
| if _is_old_message(getattr(msg, "date", None)): | |
| return | |
| text = msg.message or (getattr(msg, "raw_text", None) or "") | |
| ca = extract_ca(text) | |
| if not ca: | |
| return | |
| source_link = None | |
| try: | |
| chat = await event.get_chat() | |
| uname = getattr(chat, "username", None) | |
| mid = getattr(msg, "id", None) | |
| if uname and mid: | |
| source_link = f"https://t.me/{uname}/{mid}" | |
| except: | |
| pass | |
| await tracker.start( | |
| ca=ca, | |
| basis="mcap", | |
| poll_secs=TRACK_POLL_SECS, | |
| source_link=source_link, | |
| ) | |
| except Exception as e: | |
| print(f"[AUTOTRACK] error: {e}") | |
| def setup_autotrack(shared_client: TelegramClient, announce_chat: str | None = None): | |
| """ | |
| Dipanggil dari botsignal.py setelah client dibuat & (idealnya) sebelum start(). | |
| """ | |
| global client, tracker, startup_time_utc | |
| client = shared_client | |
| ensure_db() | |
| summary_init_tables() # <<< summary tables | |
| ac = announce_chat or TARGET_CHAT | |
| tracker = PriceTracker(client, announce_chat=ac) | |
| startup_time_utc = datetime.now(timezone.utc) | |
| # DAFTARKAN HANDLER DENGAN NAMA BERBEDA (FIX BENTROK) | |
| client.add_event_handler(on_new_autotrack_message, events.NewMessage(chats=(TARGET_CHAT,))) | |
| print("[AUTOTRACK] attached to shared client; listening on", ac) | |
| # Tidak ada build_client() maupun main() di sini β kita sengaja | |
| # agar autotrack SELALU menumpang client milik botsignal. | |