Spaces:
Running
Running
| import time | |
| import threading | |
| import sys | |
| import schedule | |
| import logging | |
| from datetime import datetime, date | |
| from pathlib import Path | |
| # Setup logging | |
| logger = logging.getLogger("coordinator") | |
| logger.setLevel(logging.INFO) | |
| # Add src and root to path to ensure imports work correctly | |
| SRC_PATH = Path(__file__).parent.parent | |
| ROOT_PATH = SRC_PATH.parent | |
| sys.path.append(str(SRC_PATH)) | |
| sys.path.append(str(ROOT_PATH)) | |
| # Import modules | |
| try: | |
| from news_scraper.main import main as run_news_scraper_main | |
| from calendar_scraper.get_calendar_events import get_events, save_events_to_db | |
| # Import run_saturday_analysis dynamically or add root to path | |
| import run_saturday_analysis | |
| except ImportError as e: | |
| logger.error(f"β Import Error: {e}") | |
| logger.error("Ensure you are running from the project root or src directory.") | |
| sys.exit(1) | |
| class Coordinator: | |
| def __init__(self): | |
| self.is_running = False | |
| self.news_thread = None | |
| self.scheduler_thread = None | |
| def start_news_scraper(self): | |
| """Runs the news scraper in a separate daemon thread""" | |
| logger.info("π° Starting News Scraper...") | |
| try: | |
| # Run news scraper in a separate thread since it runs constantly | |
| self.news_thread = threading.Thread(target=run_news_scraper_main, daemon=True) | |
| self.news_thread.start() | |
| logger.info("β News Scraper started") | |
| except Exception as e: | |
| logger.error(f"β Error starting News Scraper: {e}") | |
| def run_calendar_scraper(self): | |
| """Runs the calendar scraper for today""" | |
| logger.info(f"\nπ Running Calendar Scraper for {date.today()}...") | |
| try: | |
| today = date.today() | |
| events = get_events(today) | |
| save_events_to_db(events, today) | |
| logger.info("β Calendar Scraper finished") | |
| except Exception as e: | |
| logger.error(f"β Error running Calendar Scraper: {e}") | |
| def run_saturday_analysis_task(self): | |
| """Runs the Saturday analysis""" | |
| logger.info("\nπ Running Saturday Fundamental Analysis...") | |
| try: | |
| run_saturday_analysis.run_saturday_analysis() | |
| logger.info("β Saturday Analysis finished") | |
| except Exception as e: | |
| logger.error(f"β Error running Saturday Analysis: {e}") | |
| def scheduler_loop(self): | |
| """Main scheduler loop""" | |
| logger.info("β° Scheduler started") | |
| # Schedule Daily Calendar Scraper at 08:00 | |
| schedule.every().day.at("08:00").do(self.run_calendar_scraper) | |
| logger.info(" - Scheduled Calendar Scraper daily at 08:00") | |
| # Schedule Saturday Analysis every Saturday | |
| schedule.every().saturday.at("09:00").do(self.run_saturday_analysis_task) | |
| logger.info(" - Scheduled Saturday Analysis every Saturday at 09:00") | |
| while self.is_running: | |
| schedule.run_pending() | |
| time.sleep(60) # Check every minute | |
| def start(self): | |
| """Start the coordinator""" | |
| self.is_running = True | |
| logger.info("π Stock Alchemist Coordinator Starting...") | |
| # 1. Start News Scraper (Continuous) | |
| self.start_news_scraper() | |
| # 2. Start Scheduler (Blocking or Threaded?) | |
| # Let's run scheduler in main thread for simplicity as it just loops. | |
| try: | |
| self.scheduler_loop() | |
| except KeyboardInterrupt: | |
| self.stop() | |
| def stop(self): | |
| """Stop the coordinator""" | |
| logger.info("\nπ Stopping Coordinator...") | |
| self.is_running = False | |
| logger.info("β Coordinator stopped") | |
| if __name__ == "__main__": | |
| coordinator = Coordinator() | |
| coordinator.start() | |