samsonleegh's picture
Update app.py
370010e verified
# app.py
import os
import re
import asyncio
from datetime import datetime
from zoneinfo import ZoneInfo
import gradio as gr
import pandas as pd
from dotenv import load_dotenv
from agents import Agent, Runner, trace, Tool
from agents.mcp import MCPServerStdio
# Your local helper modules
import hype_accounts_server
from memory_utils import load_memories, save_memory, load_memories_df
load_dotenv(override=True)
# === Time / Locale ===
SGT = ZoneInfo("Asia/Singapore")
def now_sgt():
return datetime.now(SGT)
# === MCP Server Factories ===
def make_hyperliquid_trader_mcp_servers():
return [MCPServerStdio(
{"command": "python3", "args": ["-u", "hype_accounts_server.py"],
"env": {
"HYPERLIQUID_API_KEY": os.getenv("HYPERLIQUID_API_KEY"),
"HYPERLIQUID_PRIVATE_KEY": os.getenv("HYPERLIQUID_PRIVATE_KEY"),
"HYPERLIQUID_ACCOUNT_ADDRESS": os.getenv("HYPERLIQUID_ACCOUNT_ADDRESS"),
}},
client_session_timeout_seconds=30
)]
def make_crypto_news_mcp_servers():
# Uses your scraper-based news MCP to avoid API plan limits
return [MCPServerStdio(
{"command": "python3", "args": ["-u", "crypto_news_scraper_server.py"]},
client_session_timeout_seconds=30
)]
def make_technical_analyst_mcp_servers():
return [MCPServerStdio(
{"command": "python3", "args": ["-u", "hl_indicators_server.py"]},
client_session_timeout_seconds=30
)]
# === Utils for MCP lifecycle ===
async def connect_all(servers):
for s in servers:
await s.connect()
async def close_all(servers):
for s in servers:
try:
await s.close()
except Exception:
pass
# === Agent Builders ===
async def build_news_tool(news_servers) -> Tool:
instructions = (
"You are a cryptocurrency researcher. You can search and summarise the most relevant, "
"recent crypto news. If the user asks about a specific coin (e.g., HYPE, BTC, ETH, XRP), "
"focus on that. Otherwise, highlight notable events and potential long/short opportunities. "
f"Current datetime (SGT): {now_sgt():%Y-%m-%d %H:%M:%S}."
)
agent = Agent(
name="Crypto news researcher",
instructions=instructions,
model="gpt-4.1-mini",
mcp_servers=news_servers,
)
return agent.as_tool(
tool_name="crypto_news_researcher",
tool_description="Research crypto news and opportunities for a coin or broad scan."
)
async def build_ta_tool(ta_servers) -> Tool:
instructions = (
"You are a cryptocurrency perpetuals technical trading researcher.\n"
"Default interval: 1h; default lookback: 36.\n"
"Indicators: EMA(20,200), MACD(12,26,9), StochRSI(14,14,3,3), ADL, Volume.\n"
"Given a coin/interval/lookback, compute indicator state, infer trend, and propose entries, "
"exits, and stop-loss/take-profit with reasoning.\n"
f"Current datetime (SGT): {now_sgt():%Y-%m-%d %H:%M:%S}."
)
agent = Agent(
name="Crypto technical researcher",
instructions=instructions,
model="gpt-4.1-mini",
mcp_servers=ta_servers,
)
return agent.as_tool(
tool_name="crypto_technical_researcher",
tool_description="Run TA (EMA, MACD, StochRSI, ADL, Volume)."
)
async def build_trader(hyper_servers, tools: list[Tool]) -> Agent:
# Pull short memory + balances so the agent can context-switch well
past_memories = load_memories(5)
memory_text = "\n".join(past_memories) if past_memories else "No prior memories."
try:
account_details = await hype_accounts_server.get_account_details()
except Exception as e:
account_details = f"(Could not fetch account details: {e})"
instructions = f"""
You are a cryptocurrency perpetuals trader that can:
- Query account balances/positions (via MCP servers on Hyperliquid).
- Do market/news research and TA using attached tools.
- Place long/short orders when the setup has clear edge. Transaction cost: 0.04%.
- If signals are unclear, do NOT trade.
Recent notes:
{memory_text}
Account state:
{account_details}
General rules:
- Prefer confluence: trend + momentum + volume/ADL agreement.
- Always suggest stop-loss and take-profit levels.
- Keep risk per trade modest. Avoid overtrading.
"""
trader = Agent(
name="crypto_trader",
instructions=instructions,
tools=tools,
mcp_servers=hyper_servers, # these expose trading actions
model="gpt-4.1-mini",
)
return trader
# === Intent Routing ===
COMMAND_HELP = """\
You can ask in natural language, e.g.:
β€’ "Balance" / "portfolio" β€” show Hyperliquid balances/positions
β€’ "News on BTC and ETH" β€” market research
β€’ "TA HYPE 1h lookback 48" β€” technical analysis
β€’ "Long HYPE 500 at market, SL 2% TP 4%" β€” execute trade
β€’ "Short BTC 0.01 at 68000, SL 69000 TP 66000" β€” limit order example
β€’ "Summarize opportunities today" β€” broad scan (news + TA)
"""
RE_TA = re.compile(r"\bTA\s+([A-Za-z0-9_\-]+)(?:\s+(\d+[mhHdD]))?(?:\s+lookback\s+(\d+))?", re.IGNORECASE)
RE_LONG = re.compile(r"\bLONG\s+([A-Za-z0-9_\-]+)\s+([\d.]+)(?:\s+at\s+(market|mkt|[\d.]+))?(?:.*?\bSL\s+([\d.%]+))?(?:.*?\bTP\s+([\d.%]+))?", re.IGNORECASE)
RE_SHORT = re.compile(r"\bSHORT\s+([A-Za-z0-9_\-]+)\s+([\d.]+)(?:\s+at\s+(market|mkt|[\d.]+))?(?:.*?\bSL\s+([\d.%]+))?(?:.*?\bTP\s+([\d.%]+))?", re.IGNORECASE)
RE_CLOSE = re.compile(r"\b(close|exit|flatten)\s+(all|[A-Za-z0-9_\-]+)(?:\s+(\d+)%|\s+([\d.]+))?", re.IGNORECASE)
def _close_desc(coin_or_all: str, pct: str | None, qty: str | None) -> str:
coin_or_all = coin_or_all.upper()
if coin_or_all == "ALL":
return "Close ALL open positions at market"
if pct:
return f"Close {pct}% of {coin_or_all} position at market"
if qty:
return f"Reduce {coin_or_all} position by {qty} units at market"
return f"Close {coin_or_all} position at market"
def pct_or_price(s):
if not s:
return None
s = s.strip().lower()
if s.endswith("%"):
try:
return {"type": "percent", "value": float(s[:-1])}
except:
return None
try:
return {"type": "price", "value": float(s)}
except:
return None
# === Core Chatbot Handler ===
async def handle_message(message: str, history: list[tuple[str, str]]):
"""
Routes user intent to: balance, news, TA, or trade execution.
Returns markdown text.
"""
text = (message or "").strip()
ts = now_sgt().strftime("%Y-%m-%d %H:%M:%S %Z")
# Quick help
if text.lower() in {"help", "/help", "commands"}:
return f"### Commands\n{COMMAND_HELP}"
# 1) Balance / portfolio
if re.search(r"\b(balance|portfolio|positions?)\b", text, re.IGNORECASE):
try:
acct = await hype_accounts_server.get_account_details()
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M:%S %Z}] User checked balance.")
return format_account_for_chat(acct)
except Exception as e:
return f"❌ Error fetching account details: `{e}`"
# 2) TA intent
m = RE_TA.search(text)
if m:
coin = m.group(1).upper()
interval = (m.group(2) or "1h").lower()
lookback = int(m.group(3) or 36)
news_servers = [] # not needed here
ta_servers = []
try:
ta_servers = make_technical_analyst_mcp_servers()
await connect_all(ta_servers)
ta_tool = await build_ta_tool(ta_servers)
# Build a "TA-only" agent so we don't touch trading MPC here
researcher = Agent(
name="crypto_ta_agent",
instructions=f"Focus on TA for {coin} at interval {interval}, lookback {lookback}. Output indicator values and strategy.",
tools=[ta_tool],
model="gpt-4.1-mini",
)
prompt = f"Run TA for {coin} on {interval}, lookback {lookback}. Return indicators and actionable plan."
with trace("crypto_ta"):
result = await Runner.run(researcher, prompt, max_turns=12)
save_memory(f"[{ts}] TA {coin} {interval} lookback {lookback}")
return f"### πŸ”¬ TA β€” {coin} ({interval}, lookback {lookback})\n\n{result.final_output}"
except Exception as e:
return f"❌ TA error: `{e}`"
finally:
await close_all(ta_servers)
# 3) Trade intent (LONG / SHORT)
mm = RE_LONG.search(text) or RE_SHORT.search(text)
if mm:
is_long = bool(RE_LONG.search(text))
side = "LONG" if is_long else "SHORT"
coin = mm.group(1).upper()
qty = float(mm.group(2))
at = mm.group(3) # "market"/"mkt" or price
sl_raw = mm.group(4)
tp_raw = mm.group(5)
sl = pct_or_price(sl_raw)
tp = pct_or_price(tp_raw)
price_desc = "market" if (at is None or str(at).lower() in {"market", "mkt"}) else at
order_desc = f"{side} {coin} {qty} at {price_desc}"
if sl: order_desc += f", SL {sl_raw}"
if tp: order_desc += f", TP {tp_raw}"
hyper_servers = []
news_servers = []
ta_servers = []
try:
# Tools available to the *trader*: news + TA
news_servers = make_crypto_news_mcp_servers()
ta_servers = make_technical_analyst_mcp_servers()
hyper_servers = make_hyperliquid_trader_mcp_servers()
await asyncio.gather(
connect_all(news_servers),
connect_all(ta_servers),
connect_all(hyper_servers),
)
news_tool = await build_news_tool(news_servers)
ta_tool = await build_ta_tool(ta_servers)
trader = await build_trader(hyper_servers, [news_tool, ta_tool])
# Natural-language trade instruction to the trader agent.
trade_prompt = f"""
User requested: {order_desc}.
If safe and reasonable given risk rules, place the order via Hyperliquid MCP.
- If price specified (numeric), treat as limit; otherwise market.
- Always include stop-loss and take-profit (convert % to prices).
- Confirm the exact order(s) you placed and rationale in the output.
"""
with trace("trade_execution"):
result = await Runner.run(trader, trade_prompt, max_turns=20)
save_memory(f"[{ts}] Executed: {order_desc}")
return f"### 🧾 Execution β€” {order_desc}\n\n{result.final_output}"
except Exception as e:
return f"❌ Trade execution error: `{e}`"
finally:
await asyncio.gather(
close_all(news_servers),
close_all(ta_servers),
close_all(hyper_servers),
)
# 4) News intent (e.g., "news on BTC", "what's happening to HYPE")
if re.search(r"\b(news|headline|what's happening|what is happening|happening)\b", text, re.IGNORECASE):
# Try to pick coins mentioned
coins = re.findall(r"\b([A-Z]{2,6})\b", text.upper())
coins = [c for c in coins if c not in {"NEWS", "HELP"}]
topic = ", ".join(coins) if coins else "broad market"
news_servers = []
try:
news_servers = make_crypto_news_mcp_servers()
await connect_all(news_servers)
news_tool = await build_news_tool(news_servers)
researcher = Agent(
name="crypto_news_agent",
instructions=f"Focus news on: {topic}. Be concise and actionable.",
tools=[news_tool],
model="gpt-4.1-mini",
)
prompt = f"Summarize the most relevant crypto news for {topic}. Include potential trade angles."
with trace("crypto_news"):
result = await Runner.run(researcher, prompt, max_turns=12)
save_memory(f"[{ts}] News requested: {topic}")
return f"### πŸ—žοΈ News β€” {topic}\n\n{result.final_output}"
except Exception as e:
return f"❌ News error: `{e}`"
finally:
await close_all(news_servers)
# 5) Summary scan (news + TA picks)
if re.search(r"\b(opportunit|ideas|setup|summary|today)\b", text, re.IGNORECASE):
hyper_servers = []
news_servers = []
ta_servers = []
try:
news_servers = make_crypto_news_mcp_servers()
ta_servers = make_technical_analyst_mcp_servers()
hyper_servers = make_hyperliquid_trader_mcp_servers()
await asyncio.gather(
connect_all(news_servers),
connect_all(ta_servers),
connect_all(hyper_servers),
)
news_tool = await build_news_tool(news_servers)
ta_tool = await build_ta_tool(ta_servers)
trader = await build_trader(hyper_servers, [news_tool, ta_tool])
prompt = (
"Step 1: Broad news scan for major catalysts.\n"
"Step 2: Pick 3–5 coins with potential edges; run compact TA summary (1h, lookback 36).\n"
"Step 3: Recommend 1–2 best setups with entry, SL, TP and rationale. Do NOT place orders."
)
with trace("daily_opportunities"):
result = await Runner.run(trader, prompt, max_turns=24)
save_memory(f"[{ts}] Opportunity summary requested.")
return f"### πŸ“Œ Opportunities β€” {ts}\n\n{result.final_output}"
except Exception as e:
return f"❌ Summary error: `{e}`"
finally:
await asyncio.gather(
close_all(news_servers),
close_all(ta_servers),
close_all(hyper_servers),
)
# Fallback: clarify + brief help
return (
"I can help with balance, news, TA, and trade execution.\n\n"
+ COMMAND_HELP
)
mclose = RE_CLOSE.search(text)
if mclose:
# groups: verb, coin_or_all, pct (digits%), qty (number)
coin_or_all = mclose.group(2).strip()
pct = mclose.group(3) # e.g., "50" meaning 50%
qty = mclose.group(4) # absolute size to reduce
desc = _close_desc(coin_or_all, pct, qty)
hyper_servers = []
news_servers = []
ta_servers = []
try:
# Tools for trader context (optional but helpful)
news_servers = make_crypto_news_mcp_servers()
ta_servers = make_technical_analyst_mcp_servers()
hyper_servers = make_hyperliquid_trader_mcp_servers()
await asyncio.gather(
connect_all(news_servers),
connect_all(ta_servers),
connect_all(hyper_servers),
)
news_tool = await build_news_tool(news_servers)
ta_tool = await build_ta_tool(ta_servers)
trader = await build_trader(hyper_servers, [news_tool, ta_tool])
# Natural-language prompt to place the close orders via Hyperliquid MCP
# (The trader agent already has the rules + account context)
trade_prompt = f"""
User request: {desc}.
Instructions:
- If 'ALL', close every open position at market.
- If a coin is specified:
- If a percent is provided, close that % of the CURRENT open position size.
- If a qty is provided, reduce by that absolute base-asset amount.
- If neither provided, fully close that coin position.
- Include SL/TP cleanup if needed (cancel/replace any attached orders).
- If the coin has no open position, report that clearly.
- Return a concise execution summary listing each order (coin, side, size, order type, price if applicable) and rationale.
"""
with trace("close_positions"):
result = await Runner.run(trader, trade_prompt, max_turns=20)
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Close: {desc}")
return f"### 🧹 Close β€” {desc}\n\n{result.final_output}"
except Exception as e:
return f"❌ Close error: `{e}`"
finally:
await asyncio.gather(
close_all(news_servers),
close_all(ta_servers),
close_all(hyper_servers),
)
# ---------- Pretty printing for account/positions ----------
from math import isnan
def _fnum(x, decimals=2):
try:
v = float(x)
return f"{v:,.{decimals}f}"
except Exception:
return str(x)
def _fpct(x, decimals=2):
try:
v = float(x) * 100 # input is ROE like 0.0036 -> 0.36%
sign = "🟒" if v > 0 else ("πŸ”΄" if v < 0 else "βšͺ️")
return f"{sign} {v:.{decimals}f}%"
except Exception:
return "β€”"
def _pnl(x, decimals=2):
try:
v = float(x)
sign = "🟒" if v > 0 else ("πŸ”΄" if v < 0 else "βšͺ️")
return f"{sign} ${abs(v):,.{decimals}f}"
except Exception:
return "β€”"
def _side_and_abs_size(szi):
try:
v = float(szi)
side = "LONG" if v > 0 else ("SHORT" if v < 0 else "FLAT")
return side, abs(v)
except Exception:
return "β€”", szi
def format_account_for_chat(acct: dict) -> str:
"""
Converts the get_account_details() dict into a nice Markdown summary.
"""
if not isinstance(acct, dict):
return f"```\n{acct}\n```"
holdings = acct.get("holdings", []) or []
cash = acct.get("cash_balance", "0")
realized_pnl = acct.get("profit_and_loss", None)
# Totals
total_pos_value = 0.0
total_margin_used = 0.0
total_upnl = 0.0
rows_md = []
for h in holdings:
pos = h.get("position", {})
coin = pos.get("coin", "β€”")
szi = pos.get("szi", 0)
side, abs_size = _side_and_abs_size(szi)
entry = pos.get("entryPx", "β€”")
pval = pos.get("positionValue", 0)
u = pos.get("unrealizedPnl", 0)
roe = pos.get("returnOnEquity", 0)
lev = pos.get("leverage", {})
lev_str = f"{lev.get('type','β€”')}Γ—{lev.get('value','β€”')}"
liq = pos.get("liquidationPx", None)
m_used = pos.get("marginUsed", 0)
fund = pos.get("cumFunding", {}).get("sinceOpen", None)
# Totals
try: total_pos_value += float(pval)
except: pass
try: total_margin_used += float(m_used)
except: pass
try: total_upnl += float(u)
except: pass
rows_md.append(
f"| {coin} | {side} | {_fnum(abs_size, 6)} | ${_fnum(entry, 2)} | ${_fnum(pval, 2)} | {_pnl(u, 2)} | {_fpct(roe, 2)} | {lev_str} | {('β€”' if liq in (None, 'None') else '$'+_fnum(liq, 2))} | ${_fnum(m_used, 2)} | {('β€”' if fund in (None, 'None') else _fnum(fund, 6))} |"
)
header = (
"### πŸ“Š Account / Positions\n"
f"- **Cash balance:** ${_fnum(cash, 2)}\n"
f"- **Total pos. value:** ${_fnum(total_pos_value, 2)}\n"
f"- **Unrealized PnL:** {_pnl(total_upnl, 2)}\n"
f"- **Margin used (total):** ${_fnum(total_margin_used, 2)}\n"
)
if realized_pnl is not None:
header += f"- **Realized PnL (session/period):** {_pnl(realized_pnl, 2)}\n"
table_head = (
"\n| Coin | Side | Size | Entry Px | Pos. Value | uPnL | ROE | Leverage | Liq Px | Margin Used | Funding (since open) |\n"
"|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|\n"
)
table_body = "\n".join(rows_md) if rows_md else "_No open positions_"
return header + table_head + table_body
# === Gradio UI ===
with gr.Blocks(fill_height=True) as demo:
gr.Markdown("# πŸ€– Crypto Trading Copilot")
gr.Markdown(
f"Local time: **{now_sgt():%Y-%m-%d %H:%M:%S %Z}** \n"
"[OpenAI Traces](https://platform.openai.com/logs?api=traces) Β· "
"[Hyperliquid](https://app.hyperliquid.xyz/trade)"
)
with gr.Row():
quick1 = gr.Button("πŸ“Š Balance")
quick2 = gr.Button("πŸ—žοΈ News: BTC, ETH")
quick3 = gr.Button("πŸ”¬ TA: HYPE 1h")
quick4 = gr.Button("🧾 Long HYPE 500 @ market (SL 2% TP 4%)")
chatbot = gr.Chatbot(height=480, type="messages", show_copy_button=True)
user_in = gr.Textbox(placeholder="Try: TA HYPE 1h lookback 48 β€’ News on BTC β€’ Long HYPE 500 at market, SL 2% TP 4%", scale=1)
send_btn = gr.Button("Send", variant="primary")
with gr.Accordion("Memory (last 10)", open=False):
mem_table = gr.Dataframe(value=load_memories_df(10), interactive=False, wrap=True, show_label=False)
async def _respond(user_msg, chat_state):
bot_md = await handle_message(user_msg, chat_state or [])
# Log short memory line
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] {user_msg[:80]}")
# Update display memory table
latest_mem = load_memories_df(10)
return chat_state + [{"role":"user","content":user_msg},{"role":"assistant","content":bot_md}], "", latest_mem
send_btn.click(_respond, inputs=[user_in, chatbot], outputs=[chatbot, user_in, mem_table])
user_in.submit(_respond, inputs=[user_in, chatbot], outputs=[chatbot, user_in, mem_table])
# Quick actions
async def _qa_balance(chat_state):
msg = "balance"
bot_md = await handle_message(msg, chat_state or [])
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: balance")
latest_mem = load_memories_df(10)
return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem
async def _qa_news(chat_state):
msg = "news on BTC and ETH"
bot_md = await handle_message(msg, chat_state or [])
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: news BTC ETH")
latest_mem = load_memories_df(10)
return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem
async def _qa_ta(chat_state):
msg = "TA HYPE 1h lookback 48"
bot_md = await handle_message(msg, chat_state or [])
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: TA HYPE")
latest_mem = load_memories_df(10)
return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem
async def _qa_long(chat_state):
msg = "Long HYPE 500 at market, SL 2% TP 4%"
bot_md = await handle_message(msg, chat_state or [])
save_memory(f"[{now_sgt():%Y-%m-%d %H:%M}] Quick: long HYPE")
latest_mem = load_memories_df(10)
return chat_state + [{"role":"user","content":msg},{"role":"assistant","content":bot_md}], latest_mem
quick1.click(_qa_balance, inputs=[chatbot], outputs=[chatbot, mem_table])
quick2.click(_qa_news, inputs=[chatbot], outputs=[chatbot, mem_table])
quick3.click(_qa_ta, inputs=[chatbot], outputs=[chatbot, mem_table])
quick4.click(_qa_long, inputs=[chatbot], outputs=[chatbot, mem_table])
if __name__ == "__main__":
# No deprecated args; queue() OK without concurrency_count
demo.queue().launch()