File size: 10,302 Bytes
df6d484 98cae5e 8e507c9 98cae5e 756731e 97a4601 ff8850f 97a4601 ff8850f 0c6421d d9a70da 0c6421d d9a70da 0c6421d d9a70da 07099dd d9a70da 0c6421d d9a70da 0e18b8b d9a70da 0e18b8b d9a70da 0e18b8b d9a70da 0e18b8b d9a70da 0e18b8b d9a70da 0e18b8b d9a70da 34a3311 d9a70da 34a3311 d9a70da 07099dd d9a70da 0c6421d d9a70da 0c6421d d9a70da 0c6421d d9a70da 0c6421d d9a70da 0c6421d a192521 0c6421d d9a70da 0c6421d 9a3e33b 0c6421d d9a70da 9a3e33b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
import os
from dotenv import load_dotenv
import streamlit as st
import pandas as pd
import plotly.express as px
import cloudscraper
import warnings
import logging
# Charger les variables d'environnement (si vous utilisez un .env localement)
load_dotenv()
API_KEY = os.environ.get("API_KEY")
headers = {
"Authorization": f"Bearer {API_KEY}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36"
}
url = "https://archeanvision.com/api/signals/available"
# Create a cloudscraper instance
scraper = cloudscraper.create_scraper() # This will handle Cloudflare challenges
response = scraper.get(url, headers=headers)
print(response.status_code)
print(response.text)
# Load environment variables from .env
load_dotenv()
# Suppress deprecation warnings about experimental query params functions
warnings.filterwarnings(
"ignore",
message="Please replace `st.experimental_get_query_params` with `st.query_params`"
)
warnings.filterwarnings(
"ignore",
message="Please replace `st.experimental_set_query_params` with `st.query_params`"
)
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Adjust Streamlit loggers to show only errors
logging.getLogger("streamlit.deprecation").setLevel(logging.ERROR)
logging.getLogger("streamlit.runtime.scriptrunner").setLevel(logging.ERROR)
# ---------------------------- #
# AUTO-REFRESH #
# ---------------------------- #
st.set_page_config(
page_title="Dashboard Auto-Refresh",
layout="wide"
)
REFRESH_INTERVAL = 260 # seconds
st.markdown(f"<meta http-equiv='refresh' content='{REFRESH_INTERVAL}'>", unsafe_allow_html=True)
# ---------------------------- #
LOGO_IMAGE_URL = "https://cdn.discordapp.com/attachments/1276553391748812800/1374489683769163827/image.png?ex=682e3cc5&is=682ceb45&hm=ca258b6323ea40faafe307c00e48a3841450ff34b05de452e3a0fb544909615f&"
st.sidebar.image(LOGO_IMAGE_URL, use_container_width=True, caption="FrameWorx")
# Get the API key from environment variables (stored in .env or Hugging Face Secrets)
if not API_KEY:
st.error("API_KEY is not set. Please add it to your environment (e.g. .env file or Hugging Face Secrets).")
st.stop()
# --- Helper Functions Using cloudscraper ---
def get_active_markets_cloudscraper(api_key):
"""Retrieves the list of active markets using cloudscraper to bypass Cloudflare."""
headers = {
"Authorization": f"Bearer {api_key}",
"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36")
}
url = "https://archeanvision.com/api/signals/available"
scraper = cloudscraper.create_scraper()
response = scraper.get(url, headers=headers)
response.raise_for_status() # Raises an exception for HTTP errors
return response.json() # Assuming the endpoint returns JSON
def get_market_data_cloudscraper(api_key, market):
"""Retrieves market data for the given market using cloudscraper."""
headers = {
"Authorization": f"Bearer {api_key}",
"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36")
}
# Endpoint for market data (1,440 points ~ 24h); adjust as per API docs
url = f"https://archeanvision.com/api/signals/{market}/data"
scraper = cloudscraper.create_scraper()
response = scraper.get(url, headers=headers)
response.raise_for_status()
return response.json()
def get_market_signals_cloudscraper(api_key, market):
"""Retrieves market signals for the given market using cloudscraper."""
headers = {
"Authorization": f"Bearer {api_key}",
"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36")
}
url = f"https://archeanvision.com/api/signals/{market}/signals"
scraper = cloudscraper.create_scraper()
response = scraper.get(url, headers=headers)
response.raise_for_status()
return response.json()
# --- End Helper Functions ---
def get_selected_market(market_list):
"""
Retourne le marché sélectionné à partir des paramètres d'URL ou, par défaut, le premier élément.
Met à jour le paramètre de l'URL si l'utilisateur choisit un marché différent.
"""
# Récupère les paramètres sous forme de dictionnaire-like
params = st.query_params
# Récupérer le paramètre "market" ou définir la valeur par défaut
default_market = params.get("market", market_list[0])
# Si "market" est une liste (clé répétée), on prend le dernier (ou le premier) élément
if isinstance(default_market, list):
default_market = default_market[0]
# Trouver l'index correspondant
default_index = market_list.index(default_market) if default_market in market_list else 0
# Affiche un menu déroulant pour choisir le marché
selected = st.selectbox("Select a market:", market_list, index=default_index)
# Si l'utilisateur choisit un marché différent, on met à jour le paramètre dans l'URL
if selected != default_market:
st.query_params.market = selected # Mise à jour via la notation par attribut
# Vous pouvez également faire : st.query_params["market"] = selected
return selected
def main():
st.title("Active AI Crypto Markets - frameWorxVision")
st.markdown("""
### What is frameWorxVision?
**frameWorx** is an autonomous multi-market trading agent.
It operates simultaneously on multiple crypto assets, monitoring price movements
in real time and delivering **data** as well as **signals** (BUY, SELL, etc.)
to automate and optimize decision-making.
- **AI Agent**: Continuously analyzes crypto markets.
- **Multi-Market**: Manages multiple assets at once.
- **Live Data**: Access to streaming data feeds (SSE).
- **Buy/Sell Signals**: Generated in real-time to seize market opportunities.
Below is a dashboard showcasing the active markets, their 24h data
(1,440 most recent data points), and their associated signals.
---
**Join our Platform as a beta tester** to help improve the agent and the system.
- Official platform: [https://frameworx.site](https://frameworx.fun)
""")
# Retrieve active markets using cloudscraper
try:
active_markets = get_active_markets_cloudscraper(API_KEY)
except Exception as e:
st.error(f"Error fetching active markets: {e}")
return
if not active_markets:
st.error("No active markets found through the API.")
return
# Expecting active_markets to be a list of market names, e.g. ["BTC", "ETH", ...]
market_list = []
if isinstance(active_markets, list):
for item in active_markets:
# Depending on the response structure, adjust accordingly.
if isinstance(item, dict) and "market" in item:
market_list.append(item["market"])
elif isinstance(item, str):
market_list.append(item)
else:
st.warning(f"Item missing 'market' key: {item}")
else:
st.error("The structure of 'active_markets' is not a list as expected.")
return
if not market_list:
st.error("The market list is empty or 'market' keys not found.")
return
selected_market = get_selected_market(market_list)
if not selected_market:
st.error("No market selected.")
return
st.subheader(f"Selected Market: {selected_market}")
st.write(f"Fetching data for **{selected_market}** ...")
# Retrieve market data using cloudscraper
try:
market_data = get_market_data_cloudscraper(API_KEY, selected_market)
except Exception as e:
st.error(f"Error fetching market data for {selected_market}: {e}")
return
if not market_data:
st.error(f"No data found for market {selected_market}.")
return
df = pd.DataFrame(market_data)
if "close_time" in df.columns:
df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', errors='coerce')
else:
st.error("The 'close_time' column is missing from the retrieved data.")
return
st.write("### Market Data Overview")
st.dataframe(df.head())
required_cols = {"close", "last_predict_15m", "last_predict_1h"}
if not required_cols.issubset(df.columns):
st.error(
f"The required columns {required_cols} are not all present. "
f"Available columns: {list(df.columns)}"
)
return
fig = px.line(
df,
x='close_time',
y=['close', 'last_predict_15m', 'last_predict_1h'],
title=f"{selected_market} : Close Price & Predictions",
labels={
'close_time': 'Time',
'value': 'Price',
'variable': 'Metric'
}
)
st.plotly_chart(fig, use_container_width=True)
st.write(f"### Signals for {selected_market}")
try:
signals = get_market_signals_cloudscraper(API_KEY, selected_market)
except Exception as e:
st.error(f"Error fetching signals for {selected_market}: {e}")
return
if not signals:
st.warning(f"No signals found for market {selected_market}.")
else:
df_signals = pd.DataFrame(signals)
if 'date' in df_signals.columns:
df_signals['date'] = pd.to_datetime(df_signals['date'], unit='s', errors='coerce')
for col in df_signals.columns:
if df_signals[col].apply(lambda x: isinstance(x, dict)).any():
df_signals[col] = df_signals[col].apply(lambda x: str(x) if isinstance(x, dict) else x)
if 'date' in df_signals.columns:
df_signals = df_signals.sort_values('date', ascending=False)
st.write("Total number of signals:", len(df_signals))
st.write("Preview of the last 4 signals:")
st.dataframe(df_signals.head(4))
if __name__ == "__main__":
main()
|