Spaces:
Sleeping
Sleeping
| from sqlalchemy import create_engine | |
| from sqlalchemy.engine import Engine | |
| from sqlalchemy.exc import OperationalError, SQLAlchemyError | |
| from sqlalchemy.orm import Session, sessionmaker | |
| from src.config import settings | |
| from src.utils.logger_util import setup_logging | |
| logger = setup_logging() | |
| def init_engine() -> Engine: | |
| """Initialize the SQLAlchemy engine for Supabase Postgres. | |
| Returns: | |
| Engine: The SQLAlchemy engine instance. | |
| Raises: | |
| ValueError: If database configuration is missing or invalid. | |
| OperationalError: If the database connection fails. | |
| SQLAlchemyError: For other SQLAlchemy-related errors during engine creation. | |
| """ | |
| try: | |
| db = settings.supabase_db | |
| if not all([db.user, db.password, db.host, db.port, db.name]): | |
| logger.error( | |
| "Incomplete database configuration: missing user, password, host, port, or name" | |
| ) | |
| raise ValueError( | |
| "Incomplete database configuration: ensure all Supabase settings are provided" | |
| ) | |
| logger.info(f"Connecting to database {db.name} at {db.host}:{db.port}") | |
| engine_url = ( | |
| f"postgresql://{db.user}:{db.password.get_secret_value()}@{db.host}:{db.port}/{db.name}" | |
| ) | |
| logger.debug(f"Using engine URL: {engine_url}") | |
| # Create the engine with connection pooling options for robustness | |
| engine = create_engine( | |
| engine_url, | |
| pool_size=5, # Matches number of feeds/tasks | |
| max_overflow=10, # Allow additional connections if pool is full | |
| pool_timeout=30, # Timeout for getting a connection from the pool | |
| echo=False, # Disable SQL statement logging (set to True for debugging) | |
| connect_args={ | |
| "client_encoding": "utf8", | |
| }, | |
| ) | |
| # Test the connection to ensure it’s valid | |
| with engine.connect(): | |
| logger.debug("Successfully tested database connection") | |
| logger.info("Database engine initialized successfully") | |
| return engine | |
| except AttributeError as e: | |
| logger.error(f"Invalid database configuration: {e}") | |
| raise ValueError( | |
| "Invalid database configuration: ensure settings.supabase_db is properly configured" | |
| ) from e | |
| except OperationalError as e: | |
| logger.error(f"Failed to connect to database: {e}") | |
| raise | |
| except SQLAlchemyError as e: | |
| logger.error(f"SQLAlchemy error during engine initialization: {e}") | |
| raise SQLAlchemyError("Failed to initialize database engine") from e | |
| except Exception as e: | |
| logger.error(f"Unexpected error during engine initialization: {e}") | |
| raise | |
| def init_session(engine: Engine | None = None) -> Session: | |
| """Create a new SQLAlchemy session. | |
| Args: | |
| engine (Optional[Engine]): The SQLAlchemy engine to bind the session to. | |
| If None, a new engine is created. | |
| Returns: | |
| Session: A new SQLAlchemy session. | |
| Raises: | |
| ValueError: If no engine is provided and a new engine cannot be created. | |
| SQLAlchemyError: If session creation fails. | |
| """ | |
| try: | |
| if engine is None: | |
| logger.debug("No engine provided; creating a new engine") | |
| engine = init_engine() | |
| logger.info("Creating new database session") | |
| SessionLocal = sessionmaker( | |
| bind=engine, | |
| autocommit=False, | |
| autoflush=False, | |
| ) | |
| session = SessionLocal() | |
| logger.info("Database session created successfully") | |
| return session | |
| except ValueError as e: | |
| logger.error(f"Failed to create session due to invalid engine: {e}") | |
| raise ValueError("Cannot create session: invalid or missing engine") from e | |
| except SQLAlchemyError as e: | |
| logger.error(f"SQLAlchemy error during session creation: {e}") | |
| raise SQLAlchemyError("Failed to create database session") from e | |
| except Exception as e: | |
| logger.error(f"Unexpected error during session creation: {e}") | |
| raise | |