Spaces:
Running
Running
File size: 4,326 Bytes
3fe0726 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import mysql.connector
from mysql.connector import Error
import os
from dotenv import load_dotenv
from pathlib import Path
import sys
# Add src to path to import if needed, though we'll use raw connections
sys.path.append(str(Path(__file__).parent.parent.parent))
def create_source_connection():
"""Connect to local MySQL database"""
try:
config = {
'host': 'localhost',
'user': 'root',
'password': '1234',
'database': 'gotti'
}
conn = mysql.connector.connect(**config)
print("✅ Connected to source (local) database")
return conn
except Error as e:
print(f"❌ Error connecting to source database: {e}")
return None
def create_dest_connection():
"""Connect to destination TiDB database"""
try:
load_dotenv(override=True)
config = {
'host': os.getenv('DB_HOST'),
'user': os.getenv('DB_USERNAME'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_DATABASE'),
'port': int(os.getenv('DB_PORT', 4000))
}
ssl_ca = os.getenv('DB_SSL_CA')
if ssl_ca:
project_root = Path(__file__).parent.parent.parent
ssl_ca_path = project_root / ssl_ca
if ssl_ca_path.exists():
config['ssl_ca'] = str(ssl_ca_path)
config['ssl_verify_cert'] = True
config['ssl_verify_identity'] = True
conn = mysql.connector.connect(**config)
print("✅ Connected to destination (TiDB) database")
return conn
except Error as e:
print(f"❌ Error connecting to destination database: {e}")
return None
def migrate_table(source_conn, dest_conn, table_name):
"""Migrate a single table from source to destination"""
print(f"\nMigrating table: {table_name}...")
try:
source_cursor = source_conn.cursor(dictionary=True)
dest_cursor = dest_conn.cursor()
# Check if table exists in source
try:
source_cursor.execute(f"SELECT * FROM {table_name}")
rows = source_cursor.fetchall()
except Error as e:
print(f"⚠️ Skipping {table_name}: {e}")
return
if not rows:
print(f"ℹ️ Table {table_name} is empty.")
return
print(f"Found {len(rows)} rows in {table_name}.")
# Get column names
columns = list(rows[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
insert_query = f"""
INSERT INTO {table_name} ({columns_str})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE
{', '.join([f"{col}=VALUES({col})" for col in columns])}
"""
# Batch insert
batch_size = 100
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
values = [tuple(row[col] for col in columns) for row in batch]
try:
dest_cursor.executemany(insert_query, values)
dest_conn.commit()
print(f" Migrated {min(i+batch_size, len(rows))}/{len(rows)} rows...")
except Error as e:
print(f"❌ Error inserting batch: {e}")
print(f"✅ Successfully migrated {table_name}")
except Error as e:
print(f"❌ Error migrating {table_name}: {e}")
finally:
if 'source_cursor' in locals():
source_cursor.close()
if 'dest_cursor' in locals():
dest_cursor.close()
def main():
print("🚀 Starting migration from Local MySQL to TiDB Cloud...")
source_conn = create_source_connection()
dest_conn = create_dest_connection()
if not source_conn or not dest_conn:
print("❌ Could not establish both connections. Aborting.")
return
tables = ['available_tickers', 'calendar', 'news', 'fundamental_analysis', 'signals']
for table in tables:
migrate_table(source_conn, dest_conn, table)
source_conn.close()
dest_conn.close()
print("\n✨ Migration completed!")
if __name__ == "__main__":
main()
|