File size: 5,703 Bytes
d69447e 6902ae8 580a4eb d69447e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
"""
Database connection module for FleetMind
Handles PostgreSQL database connections and initialization
"""
import psycopg2
import psycopg2.extras
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
import logging
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration from environment variables
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432'),
'database': os.getenv('DB_NAME', 'fleetmind'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', ''),
}
def get_db_connection() -> psycopg2.extensions.connection:
"""
Create and return a PostgreSQL database connection.
Returns:
psycopg2.connection: Database connection object
Raises:
psycopg2.Error: If connection fails
"""
try:
conn = psycopg2.connect(
host=DB_CONFIG['host'],
port=DB_CONFIG['port'],
database=DB_CONFIG['database'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
cursor_factory=psycopg2.extras.RealDictCursor,
sslmode='prefer'
)
logger.info(f"Database connection established: {DB_CONFIG['database']}@{DB_CONFIG['host']}")
return conn
except psycopg2.Error as e:
logger.error(f"Database connection error: {e}")
raise
def init_database(schema_file: Optional[str] = None) -> None:
"""
Initialize the database with schema.
Args:
schema_file: Path to SQL schema file. If None, uses default schema.
Raises:
psycopg2.Error: If initialization fails
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
if schema_file and os.path.exists(schema_file):
# Read and execute schema from file
with open(schema_file, 'r') as f:
schema_sql = f.read()
cursor.execute(schema_sql)
logger.info(f"Database initialized from schema file: {schema_file}")
else:
# Use default schema
from .schema import SCHEMA_SQL
cursor.execute(SCHEMA_SQL)
logger.info("Database initialized with default schema")
conn.commit()
cursor.close()
conn.close()
logger.info("Database initialization completed successfully")
except psycopg2.Error as e:
logger.error(f"Database initialization error: {e}")
if conn:
conn.rollback()
raise
def close_connection(conn: psycopg2.extensions.connection) -> None:
"""
Safely close database connection.
Args:
conn: Database connection to close
"""
try:
if conn and not conn.closed:
conn.close()
logger.info("Database connection closed")
except psycopg2.Error as e:
logger.error(f"Error closing connection: {e}")
def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute a SELECT query and return results.
Args:
query: SQL query string
params: Query parameters tuple
Returns:
list: Query results as list of dictionaries
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
return results
except psycopg2.Error as e:
logger.error(f"Query execution error: {e}")
raise
finally:
close_connection(conn)
def execute_write(query: str, params: tuple = ()) -> int:
"""
Execute an INSERT, UPDATE, or DELETE query.
Args:
query: SQL query string
params: Query parameters tuple
Returns:
int: Number of rows affected
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params)
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
return rows_affected
except psycopg2.Error as e:
conn.rollback()
logger.error(f"Write query error: {e}")
raise
finally:
close_connection(conn)
def execute_many(query: str, params_list: List[tuple]) -> int:
"""
Execute multiple INSERT/UPDATE queries in a batch.
Args:
query: SQL query string
params_list: List of parameter tuples
Returns:
int: Number of rows affected
Raises:
psycopg2.Error: If query fails
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.executemany(query, params_list)
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
return rows_affected
except psycopg2.Error as e:
conn.rollback()
logger.error(f"Batch write error: {e}")
raise
finally:
close_connection(conn)
def test_connection() -> bool:
"""
Test database connection.
Returns:
bool: True if connection successful, False otherwise
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()
logger.info(f"PostgreSQL version: {version['version']}")
cursor.close()
close_connection(conn)
return True
except Exception as e:
logger.error(f"Connection test failed: {e}")
return False
|