Spaces:
Running
Running
| import requests | |
| from datetime import datetime, date, timedelta | |
| import sys | |
| from pathlib import Path | |
| # Add src to path for database import | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from calendar_scraper.models.earnings_calendar import EarningsEvent | |
| from calendar_scraper.models.economic_calendar import EconomicEvent | |
| from calendar_scraper.models.ipo_calendar import IPOEvent | |
| from calendar_scraper.models.stock_split_calendar import StockSplitEvent | |
| from calendar_scraper.models.dividend_calendar import DividendEvent | |
| from db.adapters import CalendarAdapter | |
| class NasdaqAdapter(): | |
| def __init__(self): | |
| self.base_earnings_url = "https://api.nasdaq.com/api/calendar/earnings" | |
| self.base_ipo_url = "https://api.nasdaq.com/api/ipo/calendar" | |
| self.base_splits_url = "https://api.nasdaq.com/api/calendar/splits" | |
| self.base_economic_url = "https://api.nasdaq.com/api/calendar/economicevents" | |
| self.base_dividends_url = "https://api.nasdaq.com/api/calendar/dividends" | |
| self.headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Accept": "application/json, text/plain, */*", | |
| "Referer": "https://www.nasdaq.com/market-activity/earnings", | |
| "Origin": "https://www.nasdaq.com" | |
| } | |
| # Initialize database adapter | |
| self.db_adapter = CalendarAdapter() | |
| def get_earning_events(self): | |
| """Get earnings calendar data from NASDAQ API""" | |
| try: | |
| # Make request to NASDAQ API | |
| response = requests.get(self.base_earnings_url, headers=self.headers) | |
| response.raise_for_status() | |
| # Parse JSON response | |
| data = response.json() | |
| # Verify response structure | |
| if not data or 'data' not in data or 'rows' not in data['data']: | |
| print("Invalid response format from NASDAQ API") | |
| return [] | |
| # Extract date from response | |
| as_of_date_str = data['data'].get('asOf', '') | |
| try: | |
| as_of_date = datetime.strptime(as_of_date_str, '%a, %b %d, %Y').date() | |
| except ValueError: | |
| as_of_date = date.today() | |
| # Parse rows into earnings events | |
| events = [] | |
| for row in data['data']['rows']: | |
| try: | |
| event = self._parse_row_to_event(row, as_of_date) | |
| if event: | |
| events.append(event) | |
| except Exception as e: | |
| print(f"Error parsing row {row['symbol'] if 'symbol' in row else 'unknown'}: {e}") | |
| print(f"Found {len(events)} earnings events from NASDAQ") | |
| return events | |
| except requests.RequestException as e: | |
| print(f"Error fetching data from NASDAQ API: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error: {e}") | |
| return [] | |
| def _parse_row_to_event(self, row, report_date): | |
| """Convert a row from the NASDAQ API response to an EarningsEvent object""" | |
| # Extract basic information | |
| ticker = row.get('symbol', '') | |
| company = row.get('name', '') | |
| # Skip if missing essential data | |
| if not ticker or not company: | |
| return None | |
| # Parse time of day | |
| time_str = row.get('time', '').lower() | |
| if 'after' in time_str or 'post' in time_str: | |
| time_of_day = 'After Close' | |
| elif 'pre' in time_str or 'before' in time_str: | |
| time_of_day = 'Before Open' | |
| elif 'not-supplied' in time_str: | |
| time_of_day = 'Unknown' | |
| else: | |
| time_of_day = 'During Market' | |
| # Extract EPS information | |
| eps_forecast = row.get('epsForecast', '') | |
| last_year_eps = row.get('lastYearEPS', '') | |
| # Combine EPS data | |
| eps_info = [] | |
| if eps_forecast: | |
| eps_info.append(f"Forecast: {eps_forecast}") | |
| if last_year_eps: | |
| eps_info.append(f"Last Year: {last_year_eps}") | |
| eps = " | ".join(eps_info) | |
| # Clean market cap value - remove $ and commas | |
| market_cap = row.get('marketCap', '') | |
| if market_cap and isinstance(market_cap, str): | |
| # Remove $ and commas, then convert to float | |
| market_cap = market_cap.replace('$', '').replace(',', '') | |
| if market_cap.strip(): # Check if it's not just whitespace | |
| try: | |
| market_cap = float(market_cap) | |
| except ValueError: | |
| market_cap = None | |
| # Create earnings event object | |
| event = EarningsEvent( | |
| date=report_date, | |
| time=time_of_day, | |
| company=company, | |
| ticker=ticker, | |
| eps=eps, | |
| revenue="", | |
| market_cap=market_cap | |
| ) | |
| try: | |
| self.db_adapter.save_earnings_event( | |
| date=report_date.strftime("%Y-%m-%d"), | |
| ticker=ticker, | |
| event_data={ | |
| "company": company, | |
| "time": time_of_day, | |
| "eps": eps, | |
| "revenue": "", | |
| "market_cap": market_cap, | |
| "eps_forecast": eps_forecast, | |
| "last_year_eps": last_year_eps | |
| } | |
| ) | |
| # print(f"✅ Saved {ticker} earnings event to database for {report_date.strftime('%Y-%m-%d')}") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save {ticker} to database: {e}") | |
| return event | |
| # ==================== IPO EVENTS ==================== | |
| def get_ipo_events(self, date_str: str = None) -> list: | |
| """ | |
| Get IPO calendar data from NASDAQ API | |
| Args: | |
| date_str: Date in format YYYY-MM (e.g., "2025-11") | |
| Returns: | |
| List of IPOEvent objects | |
| """ | |
| if not date_str: | |
| date_str = datetime.now().strftime("%Y-%m") | |
| try: | |
| url = f"{self.base_ipo_url}?date={date_str}" | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data or 'data' not in data: | |
| print("Invalid response format from NASDAQ IPO API") | |
| return [] | |
| events = [] | |
| # Parse priced IPOs | |
| if 'priced' in data['data'] and 'rows' in data['data']['priced']: | |
| for row in data['data']['priced']['rows']: | |
| event = self._parse_ipo_row(row, 'priced') | |
| if event: | |
| events.append(event) | |
| # Parse upcoming IPOs (nested under upcomingTable) | |
| if 'upcoming' in data['data'] and data['data']['upcoming']: | |
| upcoming_data = data['data']['upcoming'] | |
| if 'upcomingTable' in upcoming_data and upcoming_data['upcomingTable']: | |
| upcoming_table = upcoming_data['upcomingTable'] | |
| if 'rows' in upcoming_table and upcoming_table['rows']: | |
| for row in upcoming_table['rows']: | |
| event = self._parse_ipo_row(row, 'upcoming') | |
| if event: | |
| events.append(event) | |
| # Parse filed IPOs | |
| if 'filed' in data['data'] and data['data']['filed']: | |
| filed_data = data['data']['filed'] | |
| # Check if rows are directly under filed or nested | |
| if 'rows' in filed_data and filed_data['rows']: | |
| for row in filed_data['rows']: | |
| event = self._parse_ipo_row(row, 'filed') | |
| if event: | |
| events.append(event) | |
| print(f"Found {len(events)} IPO events from NASDAQ") | |
| return events | |
| except requests.RequestException as e: | |
| print(f"Error fetching IPO data from NASDAQ API: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error in get_ipo_events: {e}") | |
| return [] | |
| def _parse_ipo_row(self, row, status): | |
| """Parse IPO row data""" | |
| try: | |
| company = row.get('companyName', '') | |
| ticker = row.get('proposedTickerSymbol', '') | |
| exchange = row.get('proposedExchange', '') | |
| if not company or not ticker: | |
| return None | |
| # Parse date | |
| date_str = row.get('expectedPriceDate', '') or row.get('pricedDate', '') | |
| if date_str: | |
| try: | |
| event_date = datetime.strptime(date_str, '%m/%d/%Y').date() | |
| except: | |
| event_date = datetime.now().date() | |
| else: | |
| event_date = datetime.now().date() | |
| # Get shares and clean if string (remove commas) | |
| shares_raw = row.get('sharesOffered', '') | |
| shares = 0 | |
| if shares_raw: | |
| try: | |
| # Remove commas and convert to int | |
| shares = int(str(shares_raw).replace(',', '')) | |
| except: | |
| shares = 0 | |
| price_range = row.get('proposedSharePrice', '') | |
| offer_amount = row.get('dollarValueOfSharesOffered', '') | |
| deal_status = row.get('dealStatus', status) | |
| # Calculate market cap from offer amount or price range | |
| market_cap = None | |
| try: | |
| # Try to get from offer amount first | |
| if offer_amount: | |
| # Remove $, commas, and convert (e.g., "$1,014,999,994") | |
| offer_clean = str(offer_amount).replace('$', '').replace(',', '') | |
| market_cap = float(offer_clean) | |
| elif shares and price_range: | |
| # Extract average price from range (e.g., "4.00-6.00") | |
| if '-' in str(price_range): | |
| prices = str(price_range).replace('$', '').split('-') | |
| avg_price = (float(prices[0].strip()) + float(prices[1].strip())) / 2 | |
| market_cap = shares * avg_price | |
| elif price_range: | |
| # Single price | |
| price = float(str(price_range).replace('$', '')) | |
| market_cap = shares * price | |
| except: | |
| pass | |
| event = IPOEvent( | |
| date=event_date, | |
| company=company, | |
| ticker=ticker, | |
| exchange=exchange, | |
| shares=str(shares) if shares else "", | |
| price_range=str(price_range) if price_range else "", | |
| market_cap=market_cap, | |
| expected_to_trade=deal_status | |
| ) | |
| # Save to database | |
| try: | |
| from db.local_database import DatabaseEntry, DataType | |
| entry = DatabaseEntry( | |
| date=event_date.strftime("%Y-%m-%d"), | |
| data_type=DataType.IPO.value, | |
| ticker=ticker, | |
| data={ | |
| "event_type": "ipo", | |
| "status": deal_status, | |
| "company": company, | |
| "exchange": exchange, | |
| "shares": str(shares) if shares else "", | |
| "price_range": str(price_range) if price_range else "", | |
| "market_cap": market_cap, | |
| "offer_amount": offer_amount | |
| }, | |
| metadata={ | |
| "source": "calendar_scraper", | |
| "scraper": "nasdaq_ipo" | |
| } | |
| ) | |
| self.db_adapter.db.save(entry, expiry_days=90) | |
| # print(f"✅ Saved {ticker} IPO event to database") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save {ticker} IPO to database: {e}") | |
| return event | |
| except Exception as e: | |
| print(f"Error parsing IPO row: {e}") | |
| return None | |
| # ==================== STOCK SPLIT EVENTS ==================== | |
| def get_stock_split_events(self, date_str: str = None) -> list: | |
| """ | |
| Get stock split calendar data from NASDAQ API | |
| Args: | |
| date_str: Date in format YYYY-MM-DD (optional) | |
| Returns: | |
| List of StockSplitEvent objects | |
| """ | |
| try: | |
| url = self.base_splits_url | |
| if date_str: | |
| url = f"{url}?date={date_str}" | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data or 'data' not in data or 'rows' not in data['data']: | |
| print("Invalid response format from NASDAQ Stock Split API") | |
| return [] | |
| events = [] | |
| for row in data['data']['rows']: | |
| event = self._parse_stock_split_row(row) | |
| if event: | |
| events.append(event) | |
| print(f"Found {len(events)} stock split events from NASDAQ") | |
| return events | |
| except requests.RequestException as e: | |
| print(f"Error fetching stock split data from NASDAQ API: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error in get_stock_split_events: {e}") | |
| return [] | |
| def _parse_stock_split_row(self, row): | |
| """Parse stock split row data""" | |
| try: | |
| # API returns: symbol, name, ratio, executionDate | |
| company = row.get('name', '') | |
| ticker = row.get('symbol', '') | |
| ratio = row.get('ratio', '') | |
| if not ticker or not ratio: | |
| return None | |
| # Parse execution date (effective date) | |
| ex_date_str = row.get('executionDate', '') | |
| if ex_date_str: | |
| try: | |
| ex_date = datetime.strptime(ex_date_str, '%m/%d/%Y').date() | |
| except: | |
| ex_date = datetime.now().date() | |
| else: | |
| ex_date = datetime.now().date() | |
| event = StockSplitEvent( | |
| date=ex_date, | |
| company=company, | |
| ticker=ticker, | |
| ratio=ratio, | |
| option_symbol="", | |
| announcement_date=None, | |
| ex_date=ex_date | |
| ) | |
| # Save to database | |
| try: | |
| from db.local_database import DatabaseEntry, DataType | |
| entry = DatabaseEntry( | |
| date=ex_date.strftime("%Y-%m-%d"), | |
| data_type=DataType.STOCK_SPLIT.value, | |
| ticker=ticker, | |
| data={ | |
| "event_type": "stock_split", | |
| "company": company, | |
| "ratio": ratio, | |
| "execution_date": ex_date.strftime("%Y-%m-%d") | |
| }, | |
| metadata={ | |
| "source": "calendar_scraper", | |
| "scraper": "nasdaq_splits" | |
| } | |
| ) | |
| self.db_adapter.db.save(entry, expiry_days=90) | |
| # print(f"✅ Saved {ticker} stock split event to database") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save {ticker} stock split to database: {e}") | |
| return event | |
| except Exception as e: | |
| print(f"Error parsing stock split row: {e}") | |
| return None | |
| # ==================== ECONOMIC EVENTS ==================== | |
| def get_economic_events(self, date_str: str = None, country_list:list[str]=['UNITED_STATES','United States']) -> list: | |
| """ | |
| Get economic events calendar data from NASDAQ API | |
| Args: | |
| date_str: Date in format YYYY-MM-DD (e.g., "2025-11-06") | |
| Returns: | |
| List of EconomicEvent objects | |
| """ | |
| if not date_str: | |
| date_str = datetime.now().strftime("%Y-%m-%d") | |
| try: | |
| url = f"{self.base_economic_url}?date={date_str}" | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data or 'data' not in data or 'rows' not in data['data']: | |
| print("Invalid response format from NASDAQ Economic Events API") | |
| return [] | |
| events = [] | |
| for row in data['data']['rows']: | |
| event = self._parse_economic_event_row(row, date_str, country_list) | |
| if event: | |
| events.append(event) | |
| print(f"Found {len(events)} economic events from NASDAQ") | |
| return events | |
| except requests.RequestException as e: | |
| print(f"Error fetching economic events data from NASDAQ API: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error in get_economic_events: {e}") | |
| return [] | |
| def _parse_economic_event_row(self, row, date_str, country_list) : | |
| """Parse economic event row data""" | |
| try: | |
| event_name = row.get('eventName', '') | |
| country = row.get('country', 'US') | |
| if not event_name: | |
| return None | |
| if event_name.lower() in ['n/a', 'not available']: | |
| return None | |
| if country not in country_list: | |
| return None | |
| # Parse event date | |
| try: | |
| event_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
| except: | |
| event_date = datetime.now().date() | |
| time = row.get('time', '') | |
| actual = row.get('actual', '') | |
| consensus = row.get('consensus', '') | |
| previous = row.get('previous', '') | |
| event = EconomicEvent( | |
| date=event_date, | |
| time=time, | |
| country=country, | |
| importance=row.get('importance', 0), | |
| event=event_name, | |
| actual=actual, | |
| forecast=consensus, | |
| previous=previous | |
| ) | |
| # Save to database | |
| try: | |
| self.db_adapter.save_economic_event( | |
| date=event_date.strftime("%Y-%m-%d"), | |
| event_data={ | |
| "country": country, | |
| "time": time, | |
| "event": event_name, | |
| "actual": actual, | |
| "forecast": consensus, | |
| "previous": previous | |
| } | |
| ) | |
| # print(f"✅ Saved {event_name} economic event to database") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save {event_name} economic event to database: {e}") | |
| return event | |
| except Exception as e: | |
| print(f"Error parsing economic event row: {e}") | |
| return None | |
| # ==================== DIVIDEND EVENTS ==================== | |
| def get_dividend_events(self, date_str: str = None) -> list: | |
| """ | |
| Get dividend calendar data from NASDAQ API | |
| Args: | |
| date_str: Date in format YYYY-MM-DD (e.g., "2025-11-06") | |
| Returns: | |
| List of DividendEvent objects | |
| """ | |
| if not date_str: | |
| date_str = datetime.now().strftime("%Y-%m-%d") | |
| try: | |
| url = f"{self.base_dividends_url}?date={date_str}" | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data or 'data' not in data or 'calendar' not in data['data']: | |
| print("Invalid response format from NASDAQ Dividends API") | |
| return [] | |
| calendar_data = data['data']['calendar'] | |
| if 'rows' not in calendar_data: | |
| print("No rows in calendar data") | |
| return [] | |
| events = [] | |
| for row in calendar_data['rows']: | |
| event = self._parse_dividend_row(row, date_str) | |
| if event: | |
| events.append(event) | |
| print(f"Found {len(events)} dividend events from NASDAQ") | |
| return events | |
| except requests.RequestException as e: | |
| print(f"Error fetching dividend data from NASDAQ API: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error in get_dividend_events: {e}") | |
| return [] | |
| def _parse_dividend_row(self, row, date_str): | |
| """Parse dividend row data""" | |
| try: | |
| # API returns: companyName, symbol, dividend_Ex_Date, payment_Date, record_Date, | |
| # dividend_Rate, indicated_Annual_Dividend, announcement_Date | |
| company = row.get('companyName', '') | |
| ticker = row.get('symbol', '') | |
| dividend_rate = row.get('dividend_Rate', 0) | |
| if not company or not ticker: | |
| return None | |
| # Use ex-dividend date as the event date | |
| ex_date_str = row.get('dividend_Ex_Date', '') | |
| if ex_date_str: | |
| try: | |
| event_date = datetime.strptime(ex_date_str, '%m/%d/%Y').date() | |
| except: | |
| event_date = datetime.now().date() | |
| else: | |
| event_date = datetime.now().date() | |
| annual_dividend = row.get('indicated_Annual_Dividend', '') | |
| ex_date = row.get('dividend_Ex_Date', '') | |
| record_date = row.get('record_Date', '') | |
| payment_date = row.get('payment_Date', '') | |
| announcement_date = row.get('announcement_Date', '') | |
| event = DividendEvent( | |
| date=event_date, | |
| company=company, | |
| ticker=ticker, | |
| dividend_rate=str(dividend_rate), | |
| annual_dividend=str(annual_dividend) if annual_dividend else "", | |
| ex_date=ex_date, | |
| record_date=record_date, | |
| payment_date=payment_date, | |
| announcement_date=announcement_date | |
| ) | |
| # Save to database | |
| try: | |
| from db.local_database import DatabaseEntry, DataType | |
| entry = DatabaseEntry( | |
| date=event_date.strftime("%Y-%m-%d"), | |
| data_type=DataType.DIVIDENDS.value, | |
| ticker=ticker, | |
| data={ | |
| "event_type": "dividend", | |
| "company": company, | |
| "dividend_rate": str(dividend_rate), | |
| "annual_dividend": str(annual_dividend) if annual_dividend else "", | |
| "ex_date": ex_date, | |
| "record_date": record_date, | |
| "payment_date": payment_date, | |
| "announcement_date": announcement_date | |
| }, | |
| metadata={ | |
| "source": "calendar_scraper", | |
| "scraper": "nasdaq_dividends" | |
| } | |
| ) | |
| self.db_adapter.db.save(entry, expiry_days=90) | |
| # print(f"✅ Saved {ticker} dividend event to database") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save {ticker} dividend to database: {e}") | |
| return event | |
| except Exception as e: | |
| print(f"Error parsing dividend row: {e}") | |
| return None | |