import mysql.connector from mysql.connector import Error import os from dotenv import load_dotenv from pathlib import Path import sys # Add src to path to import if needed, though we'll use raw connections sys.path.append(str(Path(__file__).parent.parent.parent)) def create_source_connection(): """Connect to local MySQL database""" try: config = { 'host': 'localhost', 'user': 'root', 'password': '1234', 'database': 'gotti' } conn = mysql.connector.connect(**config) print("✅ Connected to source (local) database") return conn except Error as e: print(f"❌ Error connecting to source database: {e}") return None def create_dest_connection(): """Connect to destination TiDB database""" try: load_dotenv(override=True) config = { 'host': os.getenv('DB_HOST'), 'user': os.getenv('DB_USERNAME'), 'password': os.getenv('DB_PASSWORD'), 'database': os.getenv('DB_DATABASE'), 'port': int(os.getenv('DB_PORT', 4000)) } ssl_ca = os.getenv('DB_SSL_CA') if ssl_ca: project_root = Path(__file__).parent.parent.parent ssl_ca_path = project_root / ssl_ca if ssl_ca_path.exists(): config['ssl_ca'] = str(ssl_ca_path) config['ssl_verify_cert'] = True config['ssl_verify_identity'] = True conn = mysql.connector.connect(**config) print("✅ Connected to destination (TiDB) database") return conn except Error as e: print(f"❌ Error connecting to destination database: {e}") return None def migrate_table(source_conn, dest_conn, table_name): """Migrate a single table from source to destination""" print(f"\nMigrating table: {table_name}...") try: source_cursor = source_conn.cursor(dictionary=True) dest_cursor = dest_conn.cursor() # Check if table exists in source try: source_cursor.execute(f"SELECT * FROM {table_name}") rows = source_cursor.fetchall() except Error as e: print(f"⚠️ Skipping {table_name}: {e}") return if not rows: print(f"ℹ️ Table {table_name} is empty.") return print(f"Found {len(rows)} rows in {table_name}.") # Get column names columns = list(rows[0].keys()) placeholders = ', '.join(['%s'] * len(columns)) columns_str = ', '.join(columns) insert_query = f""" INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {', '.join([f"{col}=VALUES({col})" for col in columns])} """ # Batch insert batch_size = 100 for i in range(0, len(rows), batch_size): batch = rows[i:i+batch_size] values = [tuple(row[col] for col in columns) for row in batch] try: dest_cursor.executemany(insert_query, values) dest_conn.commit() print(f" Migrated {min(i+batch_size, len(rows))}/{len(rows)} rows...") except Error as e: print(f"❌ Error inserting batch: {e}") print(f"✅ Successfully migrated {table_name}") except Error as e: print(f"❌ Error migrating {table_name}: {e}") finally: if 'source_cursor' in locals(): source_cursor.close() if 'dest_cursor' in locals(): dest_cursor.close() def main(): print("🚀 Starting migration from Local MySQL to TiDB Cloud...") source_conn = create_source_connection() dest_conn = create_dest_connection() if not source_conn or not dest_conn: print("❌ Could not establish both connections. Aborting.") return tables = ['available_tickers', 'calendar', 'news', 'fundamental_analysis', 'signals'] for table in tables: migrate_table(source_conn, dest_conn, table) source_conn.close() dest_conn.close() print("\n✨ Migration completed!") if __name__ == "__main__": main()