mashrur950's picture
add new field driver location
580a4eb
"""
Database connection module for FleetMind
Handles PostgreSQL database connections and initialization
"""
import psycopg2
import psycopg2.extras
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
import logging
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration from environment variables
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432'),
'database': os.getenv('DB_NAME', 'fleetmind'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', ''),
}
def get_db_connection() -> psycopg2.extensions.connection:
"""
Create and return a PostgreSQL database connection.
Returns:
psycopg2.connection: Database connection object
Raises:
psycopg2.Error: If connection fails
"""
try:
conn = psycopg2.connect(
host=DB_CONFIG['host'],
port=DB_CONFIG['port'],
database=DB_CONFIG['database'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
cursor_factory=psycopg2.extras.RealDictCursor,
sslmode='prefer'
)
logger.info(f"Database connection established: {DB_CONFIG['database']}@{DB_CONFIG['host']}")
return conn
except psycopg2.Error as e:
logger.error(f"Database connection error: {e}")
raise
def init_database(schema_file: Optional[str] = None) -> None:
"""
Initialize the database with schema.
Args:
schema_file: Path to SQL schema file. If None, uses default schema.
Raises:
psycopg2.Error: If initialization fails
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
if schema_file and os.path.exists(schema_file):
# Read and execute schema from file
with open(schema_file, 'r') as f:
schema_sql = f.read()
cursor.execute(schema_sql)
logger.info(f"Database initialized from schema file: {schema_file}")
else:
# Use default schema
from .schema import SCHEMA_SQL
cursor.execute(SCHEMA_SQL)
logger.info("Database initialized with default schema")
conn.commit()
cursor.close()
conn.close()
logger.info("Database initialization completed successfully")
except psycopg2.Error as e:
logger.error(f"Database initialization error: {e}")
if conn:
conn.rollback()
raise
def close_connection(conn: psycopg2.extensions.connection) -> None:
"""
Safely close database connection.
Args:
conn: Database connection to close
"""
try:
if conn and not conn.closed:
conn.close()
logger.info("Database connection closed")
except psycopg2.Error as e:
logger.error(f"Error closing connection: {e}")
def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute a SELECT query and return results.
Args:
query: SQL query string
params: Query parameters tuple
Returns:
list: Query results as list of dictionaries
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
return results
except psycopg2.Error as e:
logger.error(f"Query execution error: {e}")
raise
finally:
close_connection(conn)
def execute_write(query: str, params: tuple = ()) -> int:
"""
Execute an INSERT, UPDATE, or DELETE query.
Args:
query: SQL query string
params: Query parameters tuple
Returns:
int: Number of rows affected
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params)
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
return rows_affected
except psycopg2.Error as e:
conn.rollback()
logger.error(f"Write query error: {e}")
raise
finally:
close_connection(conn)
def execute_many(query: str, params_list: List[tuple]) -> int:
"""
Execute multiple INSERT/UPDATE queries in a batch.
Args:
query: SQL query string
params_list: List of parameter tuples
Returns:
int: Number of rows affected
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.executemany(query, params_list)
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
return rows_affected
except psycopg2.Error as e:
conn.rollback()
logger.error(f"Batch write error: {e}")
raise
finally:
close_connection(conn)
def test_connection() -> bool:
"""
Test database connection.
Returns:
bool: True if connection successful, False otherwise
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()
logger.info(f"PostgreSQL version: {version['version']}")
cursor.close()
close_connection(conn)
return True
except Exception as e:
logger.error(f"Connection test failed: {e}")
return False