Spaces:
Running
Running
| import os | |
| import threading | |
| import websocket | |
| import json | |
| import time | |
| import logging | |
| from news_scraper.models.news import News | |
| from news_scraper.helpers.news_db_logger import NewsDBLogger | |
| logger = logging.getLogger(__name__) | |
| class AlpacaNewsFeedAdapter: | |
| """ | |
| Singleton class for Alpaca WebSocket API adapter. | |
| This class connects to the Alpaca WebSocket API to receive real-time news updates. | |
| """ | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super(AlpacaNewsFeedAdapter, cls).__new__(cls) | |
| cls._instance.__initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self.__initialized: | |
| return | |
| self.__initialized = True | |
| # Initialize the Alpaca WebSocket API connection | |
| apca_key = "PKFF63XPNPCWWZXYXRK223NE2P" | |
| apca_secret = "HqJE1b2jHuTvZzGGHW2jRHBajcmBw18PNJzUcNXLhfZ5" | |
| self.websocket_url = "wss://stream.data.alpaca.markets/v1beta1/news" | |
| self.headers = { | |
| "APCA-API-KEY-ID": apca_key, | |
| "APCA-API-SECRET-KEY": apca_secret, | |
| } | |
| self.ws = None | |
| self.ws_thread = None | |
| self.authenticated = False | |
| self.connected = False | |
| self.callbacks = [] | |
| self.db_logger = NewsDBLogger() | |
| self.connect() | |
| def connect(self): | |
| """ | |
| Connect to the Alpaca WebSocket API. | |
| """ | |
| self.ws = websocket.WebSocketApp( | |
| self.websocket_url, | |
| on_open=self.on_open, | |
| on_message=self.on_message, | |
| on_error=self.on_error, | |
| on_close=self.on_close, | |
| header=self.headers | |
| ) | |
| # Run the WebSocket in a separate thread | |
| self.ws_thread = threading.Thread(target=self.ws.run_forever) | |
| self.ws_thread.daemon = True | |
| self.ws_thread.start() | |
| def on_open(self, ws): | |
| """ | |
| Handle WebSocket open event. | |
| """ | |
| print("WebSocket connection established") | |
| def subscribe_to_news(self): | |
| """ | |
| Subscribe to all news updates. | |
| """ | |
| subscription_msg = { | |
| "action": "subscribe", | |
| "news": ["*"] | |
| } | |
| self.ws.send(json.dumps(subscription_msg)) | |
| #print("Sent subscription request for news") | |
| def on_message(self, ws, message): | |
| """ | |
| Handle incoming messages from the WebSocket. | |
| """ | |
| print(f"[DEBUG] Raw message: {message}") | |
| data = json.loads(message) | |
| # Handle different message types | |
| if isinstance(data, list) and len(data) > 0: | |
| msg_type = data[0].get("T", "") | |
| # Handle connection success | |
| if msg_type == "success" and data[0].get("msg") == "connected": | |
| self.connected = True | |
| #print("Successfully connected to Alpaca News WebSocket") | |
| # Handle authentication success | |
| elif msg_type == "success" and data[0].get("msg") == "authenticated": | |
| self.authenticated = True | |
| #print("Successfully authenticated with Alpaca News WebSocket") | |
| # Subscribe to news after authentication | |
| self.subscribe_to_news() | |
| # Handle subscription confirmation | |
| elif msg_type == "subscription": | |
| print("[ALPACA] Successfully subscribed to news channels:", data[0].get("news", [])) | |
| # Handle actual news updates | |
| elif data[0].get("T") == "n": | |
| # Process news data using the News class | |
| news_item = News( | |
| id=data[0].get("id"), | |
| headline=data[0].get("headline"), | |
| summary=data[0].get("summary"), | |
| author=data[0].get("author"), | |
| created_at=data[0].get("created_at"), | |
| updated_at=data[0].get("updated_at"), | |
| url=data[0].get("url"), | |
| content=data[0].get("content"), | |
| symbols=data[0].get("symbols", []), | |
| source=data[0].get("source") | |
| ) | |
| symbols_str = ', '.join(news_item.symbols) if news_item.symbols else '' | |
| headline_preview = news_item.headline[:50] if news_item.headline else "No headline" | |
| logger.info(f"[ALPACA] Received news | '{news_item.headline}' ({symbols_str})") | |
| # Log to database (without sentiment for now) - REMOVED to avoid duplicate logging | |
| # self.db_logger.log_news_with_sentiment(news_item) | |
| # logger.info(f"[ALPACA] Logged to DB | {headline_preview}...") | |
| # Call registered callbacks only when we have a news item | |
| for callback in self.callbacks: | |
| callback(news_item) | |
| def on_error(self, ws, error): | |
| """ | |
| Handle errors from the WebSocket. | |
| """ | |
| print(f"Error: {error}") | |
| def on_close(self, ws, close_status_code, close_msg): | |
| """ | |
| Handle WebSocket closure. | |
| """ | |
| print(f"WebSocket closed: {close_status_code} - {close_msg}") | |
| self.connected = False | |
| self.authenticated = False | |
| # Attempt to reconnect after a delay | |
| print("Attempting to reconnect in 5 seconds...") | |
| time.sleep(5) | |
| self.connect() | |
| def register_callback(self, callback): | |
| """ | |
| Register a callback function to be called when news is received. | |
| The callback function should accept a news item dictionary as its parameter. | |
| """ | |
| if callable(callback): | |
| self.callbacks.append(callback) | |
| return True | |
| return False | |
| def close(self): | |
| """ | |
| Close the WebSocket connection. | |
| """ | |
| if self.ws: | |
| self.ws.close() | |
| self.ws = None | |
| if __name__ == "__main__": | |
| # Example usage with a callback function | |
| def print_news(news_item): | |
| print(f"[PROCESSOR] [QUEUE] News item | {news_item.headline}") | |
| # Create the adapter | |
| alpaca_adapter = AlpacaNewsFeedAdapter() | |
| alpaca_adapter.register_callback(print_news) | |
| # Keep the main thread alive to receive messages | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| print("Exiting...") | |
| alpaca_adapter.close() | |