bot-signal-telegram / autotrack.py
agus1111's picture
Update autotrack.py
c501fa1 verified
# autotrack.py β€” Unlimited milestones + auto-delete on drawdown + video β‰₯3Γ— + daily summary hooks
import os
import re
import time
import asyncio
import aiohttp
import sqlite3
from dataclasses import dataclass, field
from typing import Optional, Dict, Tuple
from datetime import datetime, timezone, timedelta
from telethon import TelegramClient, events
# =========================
# ENV / CONFIG
# =========================
API_ID = int(os.environ.get("API_ID", "0")) # TIDAK dipakai langsung di file ini
API_HASH = os.environ.get("API_HASH", "") # β€” client disuntik dari botsignal
STRING_SESSION = os.environ.get("STRING_SESSION", "") # β€”
# Announce ke grup kamu (dipakai juga sebagai sumber pesan)
TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll")
# Polling interval harga
TRACK_POLL_SECS = int(os.environ.get("TRACK_POLL_SECS", "20"))
# Ambang reply awal (default 1.5Γ—)
REPLY_FROM_MULTIPLE = float(os.environ.get("REPLY_FROM_MULTIPLE", "1.5"))
# Unlimited mode: selalu lanjut 2Γ—, 3Γ—, 4Γ—, ...
STOP_WHEN_HIT = False
# Abaikan pesan lebih tua dari (startup - buffer)
BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3"))
# Marker antirecursive: ditambahkan ke semua pengumuman bot (milestone)
BOT_MARKER = "【MT-AUTOTRACK】"
# Lokasi DB milik botsignal (untuk ambil msg_id pesan awal + summary hooks)
BOTSIGNAL_DB = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db")
# NEW: Auto-delete bila drawdown >= threshold (rasio relatif terhadap entry)
# Contoh: 0.5 berarti -50%. Set 0 untuk mematikan.
DROP_DELETE_RATIO = float(os.environ.get("DROP_DELETE_RATIO", "0.5"))
# (opsional) juga hapus thread balasan bot yang bertanda BOT_MARKER
DELETE_THREAD_REPLIES = os.environ.get("DELETE_THREAD_REPLIES", "0") == "1"
# >>> VIDEO milestone β‰₯ 3Γ—
MILESTONE_VIDEO = os.environ.get("MILESTONE_VIDEO", "Generating_To_The_Moon_Animation.mp4")
# FIX: gunakan absolute agar aman di Docker/HF
VIDEO_PATH = os.path.abspath(MILESTONE_VIDEO)
VIDEO_MIN_MULTIPLE = float(os.environ.get("VIDEO_MIN_MULTIPLE", "3.0"))
# =========================
# HTTP endpoints
# =========================
DEXSCREENER_TOKEN_URL = "https://api.dexscreener.com/latest/dex/tokens/"
JUPITER_URL = "https://price.jup.ag/v6/price?ids=" # free (Solana only)
# =========================
# Helpers
# =========================
def _fmt_money(x: Optional[float]) -> str:
if x is None:
return "?"
if x >= 1:
return f"${x:,.4f}"
return f"${x:.8f}"
def _fmt_big(x: Optional[float]) -> str:
if x is None:
return "?"
if x >= 1_000_000_000:
return f"${x/1_000_000_000:.2f}B"
if x >= 1_000_000:
return f"${x/1_000_000:.2f}M"
if x >= 1_000:
return f"${x/1_000:.2f}K"
return f"${x:.0f}"
def _milestone_badge(m: float, basis: str) -> str:
unit = "Market Cap" if basis == "mcap" else "Price"
if m < 2.0 - 1e-9: return f"🟨 **ARMED β€” {m:.1f}Γ— {unit}**"
elif abs(m - 2.0) < 1e-9:return f"🟩 **DOUBLE UP β€” 2Γ— {unit}!**"
elif abs(m - 3.0) < 1e-9:return f"πŸ”· **TRIPLE β€” 3Γ— {unit}!**"
elif 3.0 < m < 6.0: return f"πŸ”Ά **RALLY β€” {m:.0f}Γ— {unit}!**"
elif 6.0 <= m < 10.0: return f"πŸŸ₯ **OVERDRIVE β€” {m:.0f}Γ— {unit}!**"
else: return f"🟣 **GOD CANDLE β€” {m:.0f}Γ— {unit}!!**"
# =========================
# CA extraction
# =========================
CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b")
CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b")
CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*(\S+)", re.IGNORECASE)
def extract_ca(text: str) -> Optional[str]:
if not text:
return None
m = CA_LABEL_RE.search(text)
if m:
cand = m.group(1)
if CA_EVM_RE.fullmatch(cand): return cand.lower()
if CA_SOL_RE.fullmatch(cand): return cand
m2 = CA_EVM_RE.search(text or "")
if m2: return m2.group(0).lower()
m3 = CA_SOL_RE.search(text or "")
if m3: return m3.group(0)
return None
def ca_key_for_db(ca: str) -> Optional[str]:
if not ca:
return None
if CA_EVM_RE.fullmatch(ca): return f"ca:evm:{ca.lower()}"
if CA_SOL_RE.fullmatch(ca): return f"ca:sol:{ca}"
return None
def ensure_db():
try:
conn = sqlite3.connect(BOTSIGNAL_DB)
conn.executescript("""
CREATE TABLE IF NOT EXISTS last_posted (
keyword TEXT PRIMARY KEY,
msg_id INTEGER NOT NULL,
tier TEXT
);
""")
conn.commit()
conn.close()
except Exception as e:
print(f"[AUTOTRACK] DB init error: {e}")
def lookup_origin_msg_id(keyword: str) -> Optional[int]:
try:
conn = sqlite3.connect(BOTSIGNAL_DB)
cur = conn.cursor()
cur.execute("SELECT msg_id FROM last_posted WHERE keyword = ?", (keyword,))
row = cur.fetchone()
conn.close()
if row and isinstance(row[0], int):
return row[0]
return None
except Exception as e:
print(f"[TRACK][DB] lookup error: {e}")
return None
# =========================
# SUMMARY HOOKS (harian)
# =========================
def _summary_db():
conn = sqlite3.connect(BOTSIGNAL_DB)
conn.execute("PRAGMA journal_mode=WAL;")
return conn
def summary_init_tables():
try:
conn = _summary_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS summary_milestones (
keyword TEXT NOT NULL,
reply_msg_id INTEGER,
multiple REAL,
replied_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS summary_outcomes (
keyword TEXT PRIMARY KEY,
is_deleted INTEGER DEFAULT 0
);
""")
conn.commit()
conn.close()
except Exception as e:
print(f"[SUMMARY] init tables failed: {e}")
def _summary_log_milestone(keyword: str, reply_msg_id: Optional[int], multiple: float, ts_utc: int):
try:
conn = _summary_db()
conn.execute(
"INSERT INTO summary_milestones(keyword, reply_msg_id, multiple, replied_at) VALUES(?,?,?,?)",
(keyword, reply_msg_id, float(multiple), int(ts_utc))
)
conn.commit()
conn.close()
except Exception as e:
print(f"[SUMMARY] log milestone failed: {e}")
def _summary_mark_deleted(keyword: str):
try:
conn = _summary_db()
conn.execute(
"INSERT INTO summary_outcomes(keyword, is_deleted) VALUES(?,1) "
"ON CONFLICT(keyword) DO UPDATE SET is_deleted=1",
(keyword,)
)
conn.commit()
conn.close()
except Exception as e:
print(f"[SUMMARY] mark deleted failed: {e}")
# =========================
# Tracker core (UNLIMITED)
# =========================
@dataclass
class TrackItem:
ca: str
basis: str = "mcap" # "mcap" or "price"
entry_price: Optional[float] = None
entry_mcap: Optional[float] = None
symbol_hint: Optional[float] = None
source_link: Optional[str] = None
poll_secs: int = 20
next_milestone: float = field(default_factory=lambda: REPLY_FROM_MULTIPLE)
started_at: float = field(default_factory=time.time)
class PriceTracker:
def __init__(self, client: TelegramClient, announce_chat):
self.client = client
self.announce_chat = announce_chat
self._tasks: Dict[str, asyncio.Task] = {}
self._items: Dict[str, TrackItem] = {}
# -------- HTTP helpers --------
async def _get(self, url: str, headers: Optional[Dict[str, str]] = None, timeout: int = 8):
tout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=tout, headers=headers) as sess:
async with sess.get(url) as r:
if r.status != 200:
return None
return await r.json()
async def _dexscreener_price(self, ca: str) -> Optional[Tuple[Optional[float], Optional[str], Optional[float]]]:
data = await self._get(DEXSCREENER_TOKEN_URL + ca)
if not data:
return None
pairs = data.get("pairs") or []
if not pairs:
return None
best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0))
price = float(best.get("priceUsd")) if best.get("priceUsd") else None
symbol = ((best.get("baseToken") or {}).get("symbol")) or None
mcap = None
fdv = best.get("fdv")
if isinstance(fdv, (int, float)) and fdv > 0:
mcap = float(fdv)
mc = best.get("marketCap")
if isinstance(mc, (int, float)) and mc > 0:
mcap = float(mc)
return (price, symbol, mcap)
async def _jupiter_price(self, ca: str) -> Optional[float]:
try:
data = await self._get(JUPITER_URL + ca)
if not data:
return None
d = (data.get("data") or {}).get(ca)
if not d:
return None
p = d.get("price")
return float(p) if p is not None else None
except:
return None
async def _get_snapshot(self, ca: str) -> Optional[Dict[str, Optional[float]]]:
res = await self._dexscreener_price(ca)
if res:
price, sym, mcap = res
if price is None:
jp = await self._jupiter_price(ca)
price = jp if jp is not None else None
return {"price": price, "mcap": mcap, "symbol_hint": sym}
jp = await self._jupiter_price(ca)
if jp is not None:
return {"price": jp, "mcap": None, "symbol_hint": None}
return None
# -------- Announce helpers --------
def _name(self, item: TrackItem) -> str:
s = item.symbol_hint or ""
return f"{s} ({item.ca[:4]}…{item.ca[-4:]})" if s else item.ca
def _milestone_text(self, item: TrackItem, m: float, ratio: float,
cur_price: Optional[float], cur_mcap: Optional[float]) -> str:
# NOTE: Dexs link diset ke pola Solana untuk CA sol; untuk EVM bisa kamu tingkatkan bila mau
dexs_link = f"https://dexscreener.com/solana/{item.ca}" if CA_SOL_RE.fullmatch(item.ca) else f"https://dexscreener.com/ethereum/{item.ca.lower()}"
axiom_link = "https://axiom.trade/@1144321"
if item.basis == "mcap":
change = f"{_fmt_big(item.entry_mcap)} β†’ {_fmt_big(cur_mcap)} (~{ratio:.2f}Γ—)"
else:
change = f"{_fmt_money(item.entry_price)} β†’ {_fmt_money(cur_price)} (~{ratio:.2f}Γ—)"
milestone_line = _milestone_badge(m, item.basis)
elapsed = int(time.time() - item.started_at)
mm, ss = divmod(elapsed, 60)
header = "πŸ’Ž [MidasTouch Signal] πŸ’Ž\n━━━━━━━━━━━━━━━━"
footer = "━━━━━━━━━━━━━━━━"
body = (
f"{header}\n"
f"{milestone_line}\n"
f"{self._name(item)}\n"
f"{change}\n"
f"⏱️ {mm}m {ss}s since call\n"
f"πŸ”Ž [Dexscreener]({dexs_link})\n"
f"πŸ›’ [Trade on Axiom]({axiom_link})\n"
f"CA: `{item.ca}`\n"
f"{footer}"
)
return body
def _next_target_after(self, current_target: float) -> float:
return 2.0 if current_target < 2.0 else current_target + 1.0
async def _delete_origin_and_replies(self, ca: str):
try:
key = ca_key_for_db(ca)
msg_id = lookup_origin_msg_id(key) if key else None
if not msg_id:
return
await self.client.delete_messages(self.announce_chat, msg_id)
print(f"[TRACK] Deleted origin message for {ca}")
# === Summary: mark lose
try:
if key:
_summary_mark_deleted(key)
except Exception as e:
print(f"[SUMMARY] mark deleted hook err: {e}")
if DELETE_THREAD_REPLIES:
try:
async for m in self.client.iter_messages(self.announce_chat, limit=100, reply_to=msg_id):
txt = (m.message or "") if hasattr(m, "message") else ""
if BOT_MARKER in (txt or ""):
await self.client.delete_messages(self.announce_chat, m.id)
print(f"[TRACK] Deleted thread replies for {ca}")
except Exception as e:
print(f"[TRACK] delete thread replies failed: {e}")
except Exception as e:
print(f"[TRACK] delete origin failed: {e}")
# -------- Loop --------
async def _loop(self, item: TrackItem):
try:
snap = await self._get_snapshot(item.ca)
if not snap:
print(f"[TRACK] init snapshot failed for {item.ca}")
return
if item.entry_price is None:
item.entry_price = snap.get("price")
if item.entry_mcap is None:
item.entry_mcap = snap.get("mcap")
if not item.symbol_hint and snap.get("symbol_hint"):
item.symbol_hint = snap.get("symbol_hint")
if item.basis == "mcap" and not item.entry_mcap:
item.basis = "price"
while True:
try:
snap = await self._get_snapshot(item.ca)
if not snap:
await asyncio.sleep(item.poll_secs)
continue
cur_price = snap.get("price")
cur_mcap = snap.get("mcap")
if not item.symbol_hint and snap.get("symbol_hint"):
item.symbol_hint = snap.get("symbol_hint")
if item.basis == "mcap":
if not (item.entry_mcap and cur_mcap):
await asyncio.sleep(item.poll_secs); continue
ratio = (cur_mcap / item.entry_mcap) if item.entry_mcap > 0 else 0.0
else:
if not (item.entry_price and cur_price):
await asyncio.sleep(item.poll_secs); continue
ratio = (cur_price / item.entry_price) if item.entry_price > 0 else 0.0
# Auto-delete on drawdown
if DROP_DELETE_RATIO > 0 and ratio <= DROP_DELETE_RATIO:
await self._delete_origin_and_replies(item.ca)
return # stop tracking setelah dihapus
# Milestones
while ratio >= item.next_milestone:
text = self._milestone_text(item, item.next_milestone, ratio, cur_price, cur_mcap)
key = ca_key_for_db(item.ca)
reply_to_id = lookup_origin_msg_id(key) if key else None
sent_msg = None
if item.next_milestone >= VIDEO_MIN_MULTIPLE and os.path.isfile(VIDEO_PATH):
try:
sent_msg = await self.client.send_file(
self.announce_chat,
VIDEO_PATH,
caption=f"{text}\n\n{BOT_MARKER}",
reply_to=reply_to_id if reply_to_id else None,
force_document=False
)
except Exception as e:
print(f"[TRACK] gagal kirim video: {e}")
else:
sent_msg = await self.client.send_message(
self.announce_chat,
f"{text}\n\n{BOT_MARKER}",
reply_to=reply_to_id if reply_to_id else None,
link_preview=False
)
# === Summary: log milestone (win signal)
try:
if key:
ts_utc = int(time.time())
rep_id = getattr(sent_msg, "id", None) if sent_msg else None
_summary_log_milestone(key, rep_id, item.next_milestone, ts_utc)
except Exception as e:
print(f"[SUMMARY] milestone hook err: {e}")
item.next_milestone = self._next_target_after(item.next_milestone)
await asyncio.sleep(item.poll_secs)
except Exception as e:
print(f"[TRACK] loop error: {e}")
await asyncio.sleep(item.poll_secs)
except Exception as e:
print(f"[TRACK] fatal loop init error for {item.ca}: {e}")
# -------- Public --------
def is_tracking(self, ca: str) -> bool:
t = self._tasks.get(ca)
return bool(t) and not t.done()
async def start(self, ca: str, *, basis: str = "mcap",
entry_price: Optional[float] = None,
entry_mcap: Optional[float] = None,
poll_secs: int = 20,
source_link: Optional[str] = None,
symbol_hint: Optional[str] = None):
ca = ca.strip()
if self.is_tracking(ca):
return
item = TrackItem(
ca=ca,
basis=basis.lower(),
entry_price=entry_price,
entry_mcap=entry_mcap,
poll_secs=poll_secs,
source_link=source_link,
symbol_hint=symbol_hint,
next_milestone=REPLY_FROM_MULTIPLE,
)
self._tasks[ca] = asyncio.create_task(self._loop(item))
# =========================
# Setup: attach to existing client from botsignal
# =========================
client: TelegramClient | None = None
startup_time_utc: Optional[datetime] = None
me_user_id: Optional[int] = None
tracker: PriceTracker | None = None
CUTOFF_BUFFER = timedelta(minutes=BACKFILL_BUFFER_MINUTES)
def _is_old_message(msg_dt: Optional[datetime]) -> bool:
if not isinstance(msg_dt, datetime):
return False
return msg_dt.replace(tzinfo=timezone.utc) < (startup_time_utc - CUTOFF_BUFFER)
def _is_bot_own_message(event) -> bool:
txt = (event.raw_text or "") if hasattr(event, "raw_text") else ""
return BOT_MARKER in (txt or "")
async def on_new_autotrack_message(event):
try:
if _is_bot_own_message(event):
return
msg = event.message
if _is_old_message(getattr(msg, "date", None)):
return
text = msg.message or (getattr(msg, "raw_text", None) or "")
ca = extract_ca(text)
if not ca:
return
source_link = None
try:
chat = await event.get_chat()
uname = getattr(chat, "username", None)
mid = getattr(msg, "id", None)
if uname and mid:
source_link = f"https://t.me/{uname}/{mid}"
except:
pass
await tracker.start(
ca=ca,
basis="mcap",
poll_secs=TRACK_POLL_SECS,
source_link=source_link,
)
except Exception as e:
print(f"[AUTOTRACK] error: {e}")
def setup_autotrack(shared_client: TelegramClient, announce_chat: str | None = None):
"""
Dipanggil dari botsignal.py setelah client dibuat & (idealnya) sebelum start().
"""
global client, tracker, startup_time_utc
client = shared_client
ensure_db()
summary_init_tables() # <<< summary tables
ac = announce_chat or TARGET_CHAT
tracker = PriceTracker(client, announce_chat=ac)
startup_time_utc = datetime.now(timezone.utc)
# DAFTARKAN HANDLER DENGAN NAMA BERBEDA (FIX BENTROK)
client.add_event_handler(on_new_autotrack_message, events.NewMessage(chats=(TARGET_CHAT,)))
print("[AUTOTRACK] attached to shared client; listening on", ac)
# Tidak ada build_client() maupun main() di sini β€” kita sengaja
# agar autotrack SELALU menumpang client milik botsignal.