Papaflessas's picture
Deploy Signal Generator app
3fe0726
import requests
from datetime import datetime, date, timedelta
import sys
from pathlib import Path
# Add src to path for database import
sys.path.append(str(Path(__file__).parent.parent.parent))
from calendar_scraper.models.earnings_calendar import EarningsEvent
from calendar_scraper.models.economic_calendar import EconomicEvent
from calendar_scraper.models.ipo_calendar import IPOEvent
from calendar_scraper.models.stock_split_calendar import StockSplitEvent
from calendar_scraper.models.dividend_calendar import DividendEvent
from db.adapters import CalendarAdapter
class NasdaqAdapter():
def __init__(self):
self.base_earnings_url = "https://api.nasdaq.com/api/calendar/earnings"
self.base_ipo_url = "https://api.nasdaq.com/api/ipo/calendar"
self.base_splits_url = "https://api.nasdaq.com/api/calendar/splits"
self.base_economic_url = "https://api.nasdaq.com/api/calendar/economicevents"
self.base_dividends_url = "https://api.nasdaq.com/api/calendar/dividends"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Referer": "https://www.nasdaq.com/market-activity/earnings",
"Origin": "https://www.nasdaq.com"
}
# Initialize database adapter
self.db_adapter = CalendarAdapter()
def get_earning_events(self):
"""Get earnings calendar data from NASDAQ API"""
try:
# Make request to NASDAQ API
response = requests.get(self.base_earnings_url, headers=self.headers)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Verify response structure
if not data or 'data' not in data or 'rows' not in data['data']:
print("Invalid response format from NASDAQ API")
return []
# Extract date from response
as_of_date_str = data['data'].get('asOf', '')
try:
as_of_date = datetime.strptime(as_of_date_str, '%a, %b %d, %Y').date()
except ValueError:
as_of_date = date.today()
# Parse rows into earnings events
events = []
for row in data['data']['rows']:
try:
event = self._parse_row_to_event(row, as_of_date)
if event:
events.append(event)
except Exception as e:
print(f"Error parsing row {row['symbol'] if 'symbol' in row else 'unknown'}: {e}")
print(f"Found {len(events)} earnings events from NASDAQ")
return events
except requests.RequestException as e:
print(f"Error fetching data from NASDAQ API: {e}")
return []
except Exception as e:
print(f"Unexpected error: {e}")
return []
def _parse_row_to_event(self, row, report_date):
"""Convert a row from the NASDAQ API response to an EarningsEvent object"""
# Extract basic information
ticker = row.get('symbol', '')
company = row.get('name', '')
# Skip if missing essential data
if not ticker or not company:
return None
# Parse time of day
time_str = row.get('time', '').lower()
if 'after' in time_str or 'post' in time_str:
time_of_day = 'After Close'
elif 'pre' in time_str or 'before' in time_str:
time_of_day = 'Before Open'
elif 'not-supplied' in time_str:
time_of_day = 'Unknown'
else:
time_of_day = 'During Market'
# Extract EPS information
eps_forecast = row.get('epsForecast', '')
last_year_eps = row.get('lastYearEPS', '')
# Combine EPS data
eps_info = []
if eps_forecast:
eps_info.append(f"Forecast: {eps_forecast}")
if last_year_eps:
eps_info.append(f"Last Year: {last_year_eps}")
eps = " | ".join(eps_info)
# Clean market cap value - remove $ and commas
market_cap = row.get('marketCap', '')
if market_cap and isinstance(market_cap, str):
# Remove $ and commas, then convert to float
market_cap = market_cap.replace('$', '').replace(',', '')
if market_cap.strip(): # Check if it's not just whitespace
try:
market_cap = float(market_cap)
except ValueError:
market_cap = None
# Create earnings event object
event = EarningsEvent(
date=report_date,
time=time_of_day,
company=company,
ticker=ticker,
eps=eps,
revenue="",
market_cap=market_cap
)
try:
self.db_adapter.save_earnings_event(
date=report_date.strftime("%Y-%m-%d"),
ticker=ticker,
event_data={
"company": company,
"time": time_of_day,
"eps": eps,
"revenue": "",
"market_cap": market_cap,
"eps_forecast": eps_forecast,
"last_year_eps": last_year_eps
}
)
# print(f"✅ Saved {ticker} earnings event to database for {report_date.strftime('%Y-%m-%d')}")
except Exception as e:
print(f"⚠️ Failed to save {ticker} to database: {e}")
return event
# ==================== IPO EVENTS ====================
def get_ipo_events(self, date_str: str = None) -> list:
"""
Get IPO calendar data from NASDAQ API
Args:
date_str: Date in format YYYY-MM (e.g., "2025-11")
Returns:
List of IPOEvent objects
"""
if not date_str:
date_str = datetime.now().strftime("%Y-%m")
try:
url = f"{self.base_ipo_url}?date={date_str}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if not data or 'data' not in data:
print("Invalid response format from NASDAQ IPO API")
return []
events = []
# Parse priced IPOs
if 'priced' in data['data'] and 'rows' in data['data']['priced']:
for row in data['data']['priced']['rows']:
event = self._parse_ipo_row(row, 'priced')
if event:
events.append(event)
# Parse upcoming IPOs (nested under upcomingTable)
if 'upcoming' in data['data'] and data['data']['upcoming']:
upcoming_data = data['data']['upcoming']
if 'upcomingTable' in upcoming_data and upcoming_data['upcomingTable']:
upcoming_table = upcoming_data['upcomingTable']
if 'rows' in upcoming_table and upcoming_table['rows']:
for row in upcoming_table['rows']:
event = self._parse_ipo_row(row, 'upcoming')
if event:
events.append(event)
# Parse filed IPOs
if 'filed' in data['data'] and data['data']['filed']:
filed_data = data['data']['filed']
# Check if rows are directly under filed or nested
if 'rows' in filed_data and filed_data['rows']:
for row in filed_data['rows']:
event = self._parse_ipo_row(row, 'filed')
if event:
events.append(event)
print(f"Found {len(events)} IPO events from NASDAQ")
return events
except requests.RequestException as e:
print(f"Error fetching IPO data from NASDAQ API: {e}")
return []
except Exception as e:
print(f"Unexpected error in get_ipo_events: {e}")
return []
def _parse_ipo_row(self, row, status):
"""Parse IPO row data"""
try:
company = row.get('companyName', '')
ticker = row.get('proposedTickerSymbol', '')
exchange = row.get('proposedExchange', '')
if not company or not ticker:
return None
# Parse date
date_str = row.get('expectedPriceDate', '') or row.get('pricedDate', '')
if date_str:
try:
event_date = datetime.strptime(date_str, '%m/%d/%Y').date()
except:
event_date = datetime.now().date()
else:
event_date = datetime.now().date()
# Get shares and clean if string (remove commas)
shares_raw = row.get('sharesOffered', '')
shares = 0
if shares_raw:
try:
# Remove commas and convert to int
shares = int(str(shares_raw).replace(',', ''))
except:
shares = 0
price_range = row.get('proposedSharePrice', '')
offer_amount = row.get('dollarValueOfSharesOffered', '')
deal_status = row.get('dealStatus', status)
# Calculate market cap from offer amount or price range
market_cap = None
try:
# Try to get from offer amount first
if offer_amount:
# Remove $, commas, and convert (e.g., "$1,014,999,994")
offer_clean = str(offer_amount).replace('$', '').replace(',', '')
market_cap = float(offer_clean)
elif shares and price_range:
# Extract average price from range (e.g., "4.00-6.00")
if '-' in str(price_range):
prices = str(price_range).replace('$', '').split('-')
avg_price = (float(prices[0].strip()) + float(prices[1].strip())) / 2
market_cap = shares * avg_price
elif price_range:
# Single price
price = float(str(price_range).replace('$', ''))
market_cap = shares * price
except:
pass
event = IPOEvent(
date=event_date,
company=company,
ticker=ticker,
exchange=exchange,
shares=str(shares) if shares else "",
price_range=str(price_range) if price_range else "",
market_cap=market_cap,
expected_to_trade=deal_status
)
# Save to database
try:
from db.local_database import DatabaseEntry, DataType
entry = DatabaseEntry(
date=event_date.strftime("%Y-%m-%d"),
data_type=DataType.IPO.value,
ticker=ticker,
data={
"event_type": "ipo",
"status": deal_status,
"company": company,
"exchange": exchange,
"shares": str(shares) if shares else "",
"price_range": str(price_range) if price_range else "",
"market_cap": market_cap,
"offer_amount": offer_amount
},
metadata={
"source": "calendar_scraper",
"scraper": "nasdaq_ipo"
}
)
self.db_adapter.db.save(entry, expiry_days=90)
# print(f"✅ Saved {ticker} IPO event to database")
except Exception as e:
print(f"⚠️ Failed to save {ticker} IPO to database: {e}")
return event
except Exception as e:
print(f"Error parsing IPO row: {e}")
return None
# ==================== STOCK SPLIT EVENTS ====================
def get_stock_split_events(self, date_str: str = None) -> list:
"""
Get stock split calendar data from NASDAQ API
Args:
date_str: Date in format YYYY-MM-DD (optional)
Returns:
List of StockSplitEvent objects
"""
try:
url = self.base_splits_url
if date_str:
url = f"{url}?date={date_str}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if not data or 'data' not in data or 'rows' not in data['data']:
print("Invalid response format from NASDAQ Stock Split API")
return []
events = []
for row in data['data']['rows']:
event = self._parse_stock_split_row(row)
if event:
events.append(event)
print(f"Found {len(events)} stock split events from NASDAQ")
return events
except requests.RequestException as e:
print(f"Error fetching stock split data from NASDAQ API: {e}")
return []
except Exception as e:
print(f"Unexpected error in get_stock_split_events: {e}")
return []
def _parse_stock_split_row(self, row):
"""Parse stock split row data"""
try:
# API returns: symbol, name, ratio, executionDate
company = row.get('name', '')
ticker = row.get('symbol', '')
ratio = row.get('ratio', '')
if not ticker or not ratio:
return None
# Parse execution date (effective date)
ex_date_str = row.get('executionDate', '')
if ex_date_str:
try:
ex_date = datetime.strptime(ex_date_str, '%m/%d/%Y').date()
except:
ex_date = datetime.now().date()
else:
ex_date = datetime.now().date()
event = StockSplitEvent(
date=ex_date,
company=company,
ticker=ticker,
ratio=ratio,
option_symbol="",
announcement_date=None,
ex_date=ex_date
)
# Save to database
try:
from db.local_database import DatabaseEntry, DataType
entry = DatabaseEntry(
date=ex_date.strftime("%Y-%m-%d"),
data_type=DataType.STOCK_SPLIT.value,
ticker=ticker,
data={
"event_type": "stock_split",
"company": company,
"ratio": ratio,
"execution_date": ex_date.strftime("%Y-%m-%d")
},
metadata={
"source": "calendar_scraper",
"scraper": "nasdaq_splits"
}
)
self.db_adapter.db.save(entry, expiry_days=90)
# print(f"✅ Saved {ticker} stock split event to database")
except Exception as e:
print(f"⚠️ Failed to save {ticker} stock split to database: {e}")
return event
except Exception as e:
print(f"Error parsing stock split row: {e}")
return None
# ==================== ECONOMIC EVENTS ====================
def get_economic_events(self, date_str: str = None, country_list:list[str]=['UNITED_STATES','United States']) -> list:
"""
Get economic events calendar data from NASDAQ API
Args:
date_str: Date in format YYYY-MM-DD (e.g., "2025-11-06")
Returns:
List of EconomicEvent objects
"""
if not date_str:
date_str = datetime.now().strftime("%Y-%m-%d")
try:
url = f"{self.base_economic_url}?date={date_str}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if not data or 'data' not in data or 'rows' not in data['data']:
print("Invalid response format from NASDAQ Economic Events API")
return []
events = []
for row in data['data']['rows']:
event = self._parse_economic_event_row(row, date_str, country_list)
if event:
events.append(event)
print(f"Found {len(events)} economic events from NASDAQ")
return events
except requests.RequestException as e:
print(f"Error fetching economic events data from NASDAQ API: {e}")
return []
except Exception as e:
print(f"Unexpected error in get_economic_events: {e}")
return []
def _parse_economic_event_row(self, row, date_str, country_list) :
"""Parse economic event row data"""
try:
event_name = row.get('eventName', '')
country = row.get('country', 'US')
if not event_name:
return None
if event_name.lower() in ['n/a', 'not available']:
return None
if country not in country_list:
return None
# Parse event date
try:
event_date = datetime.strptime(date_str, '%Y-%m-%d').date()
except:
event_date = datetime.now().date()
time = row.get('time', '')
actual = row.get('actual', '')
consensus = row.get('consensus', '')
previous = row.get('previous', '')
event = EconomicEvent(
date=event_date,
time=time,
country=country,
importance=row.get('importance', 0),
event=event_name,
actual=actual,
forecast=consensus,
previous=previous
)
# Save to database
try:
self.db_adapter.save_economic_event(
date=event_date.strftime("%Y-%m-%d"),
event_data={
"country": country,
"time": time,
"event": event_name,
"actual": actual,
"forecast": consensus,
"previous": previous
}
)
# print(f"✅ Saved {event_name} economic event to database")
except Exception as e:
print(f"⚠️ Failed to save {event_name} economic event to database: {e}")
return event
except Exception as e:
print(f"Error parsing economic event row: {e}")
return None
# ==================== DIVIDEND EVENTS ====================
def get_dividend_events(self, date_str: str = None) -> list:
"""
Get dividend calendar data from NASDAQ API
Args:
date_str: Date in format YYYY-MM-DD (e.g., "2025-11-06")
Returns:
List of DividendEvent objects
"""
if not date_str:
date_str = datetime.now().strftime("%Y-%m-%d")
try:
url = f"{self.base_dividends_url}?date={date_str}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
if not data or 'data' not in data or 'calendar' not in data['data']:
print("Invalid response format from NASDAQ Dividends API")
return []
calendar_data = data['data']['calendar']
if 'rows' not in calendar_data:
print("No rows in calendar data")
return []
events = []
for row in calendar_data['rows']:
event = self._parse_dividend_row(row, date_str)
if event:
events.append(event)
print(f"Found {len(events)} dividend events from NASDAQ")
return events
except requests.RequestException as e:
print(f"Error fetching dividend data from NASDAQ API: {e}")
return []
except Exception as e:
print(f"Unexpected error in get_dividend_events: {e}")
return []
def _parse_dividend_row(self, row, date_str):
"""Parse dividend row data"""
try:
# API returns: companyName, symbol, dividend_Ex_Date, payment_Date, record_Date,
# dividend_Rate, indicated_Annual_Dividend, announcement_Date
company = row.get('companyName', '')
ticker = row.get('symbol', '')
dividend_rate = row.get('dividend_Rate', 0)
if not company or not ticker:
return None
# Use ex-dividend date as the event date
ex_date_str = row.get('dividend_Ex_Date', '')
if ex_date_str:
try:
event_date = datetime.strptime(ex_date_str, '%m/%d/%Y').date()
except:
event_date = datetime.now().date()
else:
event_date = datetime.now().date()
annual_dividend = row.get('indicated_Annual_Dividend', '')
ex_date = row.get('dividend_Ex_Date', '')
record_date = row.get('record_Date', '')
payment_date = row.get('payment_Date', '')
announcement_date = row.get('announcement_Date', '')
event = DividendEvent(
date=event_date,
company=company,
ticker=ticker,
dividend_rate=str(dividend_rate),
annual_dividend=str(annual_dividend) if annual_dividend else "",
ex_date=ex_date,
record_date=record_date,
payment_date=payment_date,
announcement_date=announcement_date
)
# Save to database
try:
from db.local_database import DatabaseEntry, DataType
entry = DatabaseEntry(
date=event_date.strftime("%Y-%m-%d"),
data_type=DataType.DIVIDENDS.value,
ticker=ticker,
data={
"event_type": "dividend",
"company": company,
"dividend_rate": str(dividend_rate),
"annual_dividend": str(annual_dividend) if annual_dividend else "",
"ex_date": ex_date,
"record_date": record_date,
"payment_date": payment_date,
"announcement_date": announcement_date
},
metadata={
"source": "calendar_scraper",
"scraper": "nasdaq_dividends"
}
)
self.db_adapter.db.save(entry, expiry_days=90)
# print(f"✅ Saved {ticker} dividend event to database")
except Exception as e:
print(f"⚠️ Failed to save {ticker} dividend to database: {e}")
return event
except Exception as e:
print(f"Error parsing dividend row: {e}")
return None