gotti_signal_gen / src /db /migrate_local_to_tidb.py
Papaflessas's picture
Deploy Signal Generator app
3fe0726
import mysql.connector
from mysql.connector import Error
import os
from dotenv import load_dotenv
from pathlib import Path
import sys
# Add src to path to import if needed, though we'll use raw connections
sys.path.append(str(Path(__file__).parent.parent.parent))
def create_source_connection():
"""Connect to local MySQL database"""
try:
config = {
'host': 'localhost',
'user': 'root',
'password': '1234',
'database': 'gotti'
}
conn = mysql.connector.connect(**config)
print("✅ Connected to source (local) database")
return conn
except Error as e:
print(f"❌ Error connecting to source database: {e}")
return None
def create_dest_connection():
"""Connect to destination TiDB database"""
try:
load_dotenv(override=True)
config = {
'host': os.getenv('DB_HOST'),
'user': os.getenv('DB_USERNAME'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_DATABASE'),
'port': int(os.getenv('DB_PORT', 4000))
}
ssl_ca = os.getenv('DB_SSL_CA')
if ssl_ca:
project_root = Path(__file__).parent.parent.parent
ssl_ca_path = project_root / ssl_ca
if ssl_ca_path.exists():
config['ssl_ca'] = str(ssl_ca_path)
config['ssl_verify_cert'] = True
config['ssl_verify_identity'] = True
conn = mysql.connector.connect(**config)
print("✅ Connected to destination (TiDB) database")
return conn
except Error as e:
print(f"❌ Error connecting to destination database: {e}")
return None
def migrate_table(source_conn, dest_conn, table_name):
"""Migrate a single table from source to destination"""
print(f"\nMigrating table: {table_name}...")
try:
source_cursor = source_conn.cursor(dictionary=True)
dest_cursor = dest_conn.cursor()
# Check if table exists in source
try:
source_cursor.execute(f"SELECT * FROM {table_name}")
rows = source_cursor.fetchall()
except Error as e:
print(f"⚠️ Skipping {table_name}: {e}")
return
if not rows:
print(f"ℹ️ Table {table_name} is empty.")
return
print(f"Found {len(rows)} rows in {table_name}.")
# Get column names
columns = list(rows[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
insert_query = f"""
INSERT INTO {table_name} ({columns_str})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE
{', '.join([f"{col}=VALUES({col})" for col in columns])}
"""
# Batch insert
batch_size = 100
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
values = [tuple(row[col] for col in columns) for row in batch]
try:
dest_cursor.executemany(insert_query, values)
dest_conn.commit()
print(f" Migrated {min(i+batch_size, len(rows))}/{len(rows)} rows...")
except Error as e:
print(f"❌ Error inserting batch: {e}")
print(f"✅ Successfully migrated {table_name}")
except Error as e:
print(f"❌ Error migrating {table_name}: {e}")
finally:
if 'source_cursor' in locals():
source_cursor.close()
if 'dest_cursor' in locals():
dest_cursor.close()
def main():
print("🚀 Starting migration from Local MySQL to TiDB Cloud...")
source_conn = create_source_connection()
dest_conn = create_dest_connection()
if not source_conn or not dest_conn:
print("❌ Could not establish both connections. Aborting.")
return
tables = ['available_tickers', 'calendar', 'news', 'fundamental_analysis', 'signals']
for table in tables:
migrate_table(source_conn, dest_conn, table)
source_conn.close()
dest_conn.close()
print("\n✨ Migration completed!")
if __name__ == "__main__":
main()