Spaces:
Running
Running
| from datetime import date | |
| import sys | |
| from pathlib import Path | |
| # Add src to path if needed | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from calendar_scraper.adapters.nasdaq import NasdaqAdapter | |
| from db.local_database import LocalDatabase, DatabaseEntry, DataType | |
| def get_events(_date:date): | |
| adapter = NasdaqAdapter() | |
| earnings = adapter.get_earning_events() | |
| ipos = adapter.get_ipo_events(_date.strftime("%Y-%m")) | |
| splits = adapter.get_stock_split_events(_date.strftime("%Y-%m-%d")) | |
| economic = adapter.get_economic_events(_date.strftime("%Y-%m-%d")) | |
| dividends = adapter.get_dividend_events(_date.strftime("%Y-%m-%d")) | |
| return { | |
| "earnings": earnings, | |
| "ipos": ipos, | |
| "splits": splits, | |
| "economic": economic, | |
| "dividends": dividends | |
| } | |
| def _map_event_to_entry(event, event_type: str, _date: date) -> DatabaseEntry: | |
| """Map a calendar event object to a DatabaseEntry""" | |
| if event_type == "earnings": | |
| # EarningsEvent(date, time, company, ticker, eps, revenue, market_cap) | |
| return DatabaseEntry( | |
| date=_date.isoformat(), | |
| data_type=DataType.EARNINGS.value, | |
| ticker=event.ticker, | |
| data={ | |
| 'event_type': 'earnings', | |
| 'time': event.time, | |
| 'company': event.company, | |
| 'eps': event.eps, | |
| 'revenue': event.revenue, | |
| 'market_cap': event.market_cap, | |
| 'execution_date': event.date # Store the actual event date | |
| }, | |
| metadata={'source': 'nasdaq_calendar'} | |
| ) | |
| elif event_type == "ipos": | |
| # IPOEvent(date, company, ticker, exchange, shares, price_range, market_cap, expected_to_trade) | |
| return DatabaseEntry( | |
| date=_date.isoformat(), | |
| data_type=DataType.IPO.value, | |
| ticker=event.ticker, | |
| data={ | |
| 'event_type': 'ipo', | |
| 'company': event.company, | |
| 'exchange': event.exchange, | |
| 'shares': event.shares, | |
| 'price_range': event.price_range, | |
| 'market_cap': event.market_cap, | |
| 'expected_to_trade': event.expected_to_trade, | |
| 'execution_date': str(event.date) | |
| }, | |
| metadata={'source': 'nasdaq_calendar'} | |
| ) | |
| elif event_type == "splits": | |
| # StockSplitEvent(date, company, ticker, ratio, ...) | |
| return DatabaseEntry( | |
| date=_date.isoformat(), | |
| data_type=DataType.STOCK_SPLIT.value, | |
| ticker=event.ticker, | |
| data={ | |
| 'event_type': 'stock_split', | |
| 'company': event.company, | |
| 'ratio': event.ratio, | |
| 'execution_date': str(event.date) | |
| }, | |
| metadata={'source': 'nasdaq_calendar'} | |
| ) | |
| elif event_type == "economic": | |
| # EconomicEvent(date, time, country, importance, event, actual, forecast, previous) | |
| # Use country as ticker for economic events | |
| ticker = event.country.upper().replace(' ', '_') if event.country else "GLOBAL" | |
| return DatabaseEntry( | |
| date=_date.isoformat(), | |
| data_type=DataType.ECONOMIC_EVENTS.value, | |
| ticker=ticker, | |
| data={ | |
| 'event_type': 'economic', | |
| 'time': event.time, | |
| 'importance': event.importance, | |
| 'event': event.event, | |
| 'actual': event.actual, | |
| 'forecast': event.forecast, | |
| 'previous': event.previous, | |
| 'execution_date': str(event.date) | |
| }, | |
| metadata={'source': 'nasdaq_calendar'} | |
| ) | |
| elif event_type == "dividends": | |
| # DividendEvent(date, company, ticker, dividend_rate, ...) | |
| return DatabaseEntry( | |
| date=_date.isoformat(), | |
| data_type=DataType.DIVIDENDS.value, | |
| ticker=event.ticker, | |
| data={ | |
| 'event_type': 'dividend', | |
| 'company': event.company, | |
| 'dividend_rate': event.dividend_rate, | |
| 'execution_date': str(event.date) | |
| }, | |
| metadata={'source': 'nasdaq_calendar'} | |
| ) | |
| return None | |
| def save_events_to_db(events_dict: dict, _date: date): | |
| """Save all fetched events to the database""" | |
| db = LocalDatabase() | |
| all_entries = [] | |
| print(f"Processing events for database saving...") | |
| for event_type, events in events_dict.items(): | |
| if not events: | |
| continue | |
| print(f" - Processing {len(events)} {event_type} events...") | |
| for event in events: | |
| try: | |
| entry = _map_event_to_entry(event, event_type, _date) | |
| if entry: | |
| all_entries.append(entry) | |
| except Exception as e: | |
| print(f"Error mapping {event_type} event: {e}") | |
| if all_entries: | |
| print(f"Saving {len(all_entries)} entries to database...") | |
| saved_count = db.save_batch(all_entries, expiry_days=30) | |
| print(f"Successfully saved {saved_count} events to database.") | |
| else: | |
| print("No events to save.") | |
| if __name__ == "__main__": | |
| today = date.today() | |
| print(f"Fetching events for {today}...") | |
| events = get_events(today) | |
| print(f"Earnings Events: {len(events['earnings'])}") | |
| print(f"IPO Events: {len(events['ipos'])}") | |
| print(f"Stock Split Events: {len(events['splits'])}") | |
| print(f"Economic Events: {len(events['economic'])}") | |
| print(f"Dividend Events: {len(events['dividends'])}") | |
| save_events_to_db(events, today) |