Spaces:
Running
Running
File size: 5,686 Bytes
3fe0726 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
from datetime import date
import sys
from pathlib import Path
# Add src to path if needed
sys.path.append(str(Path(__file__).parent.parent))
from calendar_scraper.adapters.nasdaq import NasdaqAdapter
from db.local_database import LocalDatabase, DatabaseEntry, DataType
def get_events(_date:date):
adapter = NasdaqAdapter()
earnings = adapter.get_earning_events()
ipos = adapter.get_ipo_events(_date.strftime("%Y-%m"))
splits = adapter.get_stock_split_events(_date.strftime("%Y-%m-%d"))
economic = adapter.get_economic_events(_date.strftime("%Y-%m-%d"))
dividends = adapter.get_dividend_events(_date.strftime("%Y-%m-%d"))
return {
"earnings": earnings,
"ipos": ipos,
"splits": splits,
"economic": economic,
"dividends": dividends
}
def _map_event_to_entry(event, event_type: str, _date: date) -> DatabaseEntry:
"""Map a calendar event object to a DatabaseEntry"""
if event_type == "earnings":
# EarningsEvent(date, time, company, ticker, eps, revenue, market_cap)
return DatabaseEntry(
date=_date.isoformat(),
data_type=DataType.EARNINGS.value,
ticker=event.ticker,
data={
'event_type': 'earnings',
'time': event.time,
'company': event.company,
'eps': event.eps,
'revenue': event.revenue,
'market_cap': event.market_cap,
'execution_date': event.date # Store the actual event date
},
metadata={'source': 'nasdaq_calendar'}
)
elif event_type == "ipos":
# IPOEvent(date, company, ticker, exchange, shares, price_range, market_cap, expected_to_trade)
return DatabaseEntry(
date=_date.isoformat(),
data_type=DataType.IPO.value,
ticker=event.ticker,
data={
'event_type': 'ipo',
'company': event.company,
'exchange': event.exchange,
'shares': event.shares,
'price_range': event.price_range,
'market_cap': event.market_cap,
'expected_to_trade': event.expected_to_trade,
'execution_date': str(event.date)
},
metadata={'source': 'nasdaq_calendar'}
)
elif event_type == "splits":
# StockSplitEvent(date, company, ticker, ratio, ...)
return DatabaseEntry(
date=_date.isoformat(),
data_type=DataType.STOCK_SPLIT.value,
ticker=event.ticker,
data={
'event_type': 'stock_split',
'company': event.company,
'ratio': event.ratio,
'execution_date': str(event.date)
},
metadata={'source': 'nasdaq_calendar'}
)
elif event_type == "economic":
# EconomicEvent(date, time, country, importance, event, actual, forecast, previous)
# Use country as ticker for economic events
ticker = event.country.upper().replace(' ', '_') if event.country else "GLOBAL"
return DatabaseEntry(
date=_date.isoformat(),
data_type=DataType.ECONOMIC_EVENTS.value,
ticker=ticker,
data={
'event_type': 'economic',
'time': event.time,
'importance': event.importance,
'event': event.event,
'actual': event.actual,
'forecast': event.forecast,
'previous': event.previous,
'execution_date': str(event.date)
},
metadata={'source': 'nasdaq_calendar'}
)
elif event_type == "dividends":
# DividendEvent(date, company, ticker, dividend_rate, ...)
return DatabaseEntry(
date=_date.isoformat(),
data_type=DataType.DIVIDENDS.value,
ticker=event.ticker,
data={
'event_type': 'dividend',
'company': event.company,
'dividend_rate': event.dividend_rate,
'execution_date': str(event.date)
},
metadata={'source': 'nasdaq_calendar'}
)
return None
def save_events_to_db(events_dict: dict, _date: date):
"""Save all fetched events to the database"""
db = LocalDatabase()
all_entries = []
print(f"Processing events for database saving...")
for event_type, events in events_dict.items():
if not events:
continue
print(f" - Processing {len(events)} {event_type} events...")
for event in events:
try:
entry = _map_event_to_entry(event, event_type, _date)
if entry:
all_entries.append(entry)
except Exception as e:
print(f"Error mapping {event_type} event: {e}")
if all_entries:
print(f"Saving {len(all_entries)} entries to database...")
saved_count = db.save_batch(all_entries, expiry_days=30)
print(f"Successfully saved {saved_count} events to database.")
else:
print("No events to save.")
if __name__ == "__main__":
today = date.today()
print(f"Fetching events for {today}...")
events = get_events(today)
print(f"Earnings Events: {len(events['earnings'])}")
print(f"IPO Events: {len(events['ipos'])}")
print(f"Stock Split Events: {len(events['splits'])}")
print(f"Economic Events: {len(events['economic'])}")
print(f"Dividend Events: {len(events['dividends'])}")
save_events_to_db(events, today) |