Papaflessas's picture
Deploy Signal Generator app
3fe0726
import os
import threading
import websocket
import json
import time
import logging
from news_scraper.models.news import News
from news_scraper.helpers.news_db_logger import NewsDBLogger
logger = logging.getLogger(__name__)
class AlpacaNewsFeedAdapter:
"""
Singleton class for Alpaca WebSocket API adapter.
This class connects to the Alpaca WebSocket API to receive real-time news updates.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(AlpacaNewsFeedAdapter, cls).__new__(cls)
cls._instance.__initialized = False
return cls._instance
def __init__(self):
if self.__initialized:
return
self.__initialized = True
# Initialize the Alpaca WebSocket API connection
apca_key = "PKFF63XPNPCWWZXYXRK223NE2P"
apca_secret = "HqJE1b2jHuTvZzGGHW2jRHBajcmBw18PNJzUcNXLhfZ5"
self.websocket_url = "wss://stream.data.alpaca.markets/v1beta1/news"
self.headers = {
"APCA-API-KEY-ID": apca_key,
"APCA-API-SECRET-KEY": apca_secret,
}
self.ws = None
self.ws_thread = None
self.authenticated = False
self.connected = False
self.callbacks = []
self.db_logger = NewsDBLogger()
self.connect()
def connect(self):
"""
Connect to the Alpaca WebSocket API.
"""
self.ws = websocket.WebSocketApp(
self.websocket_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
header=self.headers
)
# Run the WebSocket in a separate thread
self.ws_thread = threading.Thread(target=self.ws.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()
def on_open(self, ws):
"""
Handle WebSocket open event.
"""
print("WebSocket connection established")
def subscribe_to_news(self):
"""
Subscribe to all news updates.
"""
subscription_msg = {
"action": "subscribe",
"news": ["*"]
}
self.ws.send(json.dumps(subscription_msg))
#print("Sent subscription request for news")
def on_message(self, ws, message):
"""
Handle incoming messages from the WebSocket.
"""
print(f"[DEBUG] Raw message: {message}")
data = json.loads(message)
# Handle different message types
if isinstance(data, list) and len(data) > 0:
msg_type = data[0].get("T", "")
# Handle connection success
if msg_type == "success" and data[0].get("msg") == "connected":
self.connected = True
#print("Successfully connected to Alpaca News WebSocket")
# Handle authentication success
elif msg_type == "success" and data[0].get("msg") == "authenticated":
self.authenticated = True
#print("Successfully authenticated with Alpaca News WebSocket")
# Subscribe to news after authentication
self.subscribe_to_news()
# Handle subscription confirmation
elif msg_type == "subscription":
print("[ALPACA] Successfully subscribed to news channels:", data[0].get("news", []))
# Handle actual news updates
elif data[0].get("T") == "n":
# Process news data using the News class
news_item = News(
id=data[0].get("id"),
headline=data[0].get("headline"),
summary=data[0].get("summary"),
author=data[0].get("author"),
created_at=data[0].get("created_at"),
updated_at=data[0].get("updated_at"),
url=data[0].get("url"),
content=data[0].get("content"),
symbols=data[0].get("symbols", []),
source=data[0].get("source")
)
symbols_str = ', '.join(news_item.symbols) if news_item.symbols else ''
headline_preview = news_item.headline[:50] if news_item.headline else "No headline"
logger.info(f"[ALPACA] Received news | '{news_item.headline}' ({symbols_str})")
# Log to database (without sentiment for now) - REMOVED to avoid duplicate logging
# self.db_logger.log_news_with_sentiment(news_item)
# logger.info(f"[ALPACA] Logged to DB | {headline_preview}...")
# Call registered callbacks only when we have a news item
for callback in self.callbacks:
callback(news_item)
def on_error(self, ws, error):
"""
Handle errors from the WebSocket.
"""
print(f"Error: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""
Handle WebSocket closure.
"""
print(f"WebSocket closed: {close_status_code} - {close_msg}")
self.connected = False
self.authenticated = False
# Attempt to reconnect after a delay
print("Attempting to reconnect in 5 seconds...")
time.sleep(5)
self.connect()
def register_callback(self, callback):
"""
Register a callback function to be called when news is received.
The callback function should accept a news item dictionary as its parameter.
"""
if callable(callback):
self.callbacks.append(callback)
return True
return False
def close(self):
"""
Close the WebSocket connection.
"""
if self.ws:
self.ws.close()
self.ws = None
if __name__ == "__main__":
# Example usage with a callback function
def print_news(news_item):
print(f"[PROCESSOR] [QUEUE] News item | {news_item.headline}")
# Create the adapter
alpaca_adapter = AlpacaNewsFeedAdapter()
alpaca_adapter.register_callback(print_news)
# Keep the main thread alive to receive messages
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
alpaca_adapter.close()