Spaces:
Running
Running
| import mysql.connector | |
| from mysql.connector import Error | |
| import os | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| import sys | |
| # Add src to path to import if needed, though we'll use raw connections | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| def create_source_connection(): | |
| """Connect to local MySQL database""" | |
| try: | |
| config = { | |
| 'host': 'localhost', | |
| 'user': 'root', | |
| 'password': '1234', | |
| 'database': 'gotti' | |
| } | |
| conn = mysql.connector.connect(**config) | |
| print("✅ Connected to source (local) database") | |
| return conn | |
| except Error as e: | |
| print(f"❌ Error connecting to source database: {e}") | |
| return None | |
| def create_dest_connection(): | |
| """Connect to destination TiDB database""" | |
| try: | |
| load_dotenv(override=True) | |
| config = { | |
| 'host': os.getenv('DB_HOST'), | |
| 'user': os.getenv('DB_USERNAME'), | |
| 'password': os.getenv('DB_PASSWORD'), | |
| 'database': os.getenv('DB_DATABASE'), | |
| 'port': int(os.getenv('DB_PORT', 4000)) | |
| } | |
| ssl_ca = os.getenv('DB_SSL_CA') | |
| if ssl_ca: | |
| project_root = Path(__file__).parent.parent.parent | |
| ssl_ca_path = project_root / ssl_ca | |
| if ssl_ca_path.exists(): | |
| config['ssl_ca'] = str(ssl_ca_path) | |
| config['ssl_verify_cert'] = True | |
| config['ssl_verify_identity'] = True | |
| conn = mysql.connector.connect(**config) | |
| print("✅ Connected to destination (TiDB) database") | |
| return conn | |
| except Error as e: | |
| print(f"❌ Error connecting to destination database: {e}") | |
| return None | |
| def migrate_table(source_conn, dest_conn, table_name): | |
| """Migrate a single table from source to destination""" | |
| print(f"\nMigrating table: {table_name}...") | |
| try: | |
| source_cursor = source_conn.cursor(dictionary=True) | |
| dest_cursor = dest_conn.cursor() | |
| # Check if table exists in source | |
| try: | |
| source_cursor.execute(f"SELECT * FROM {table_name}") | |
| rows = source_cursor.fetchall() | |
| except Error as e: | |
| print(f"⚠️ Skipping {table_name}: {e}") | |
| return | |
| if not rows: | |
| print(f"ℹ️ Table {table_name} is empty.") | |
| return | |
| print(f"Found {len(rows)} rows in {table_name}.") | |
| # Get column names | |
| columns = list(rows[0].keys()) | |
| placeholders = ', '.join(['%s'] * len(columns)) | |
| columns_str = ', '.join(columns) | |
| insert_query = f""" | |
| INSERT INTO {table_name} ({columns_str}) | |
| VALUES ({placeholders}) | |
| ON DUPLICATE KEY UPDATE | |
| {', '.join([f"{col}=VALUES({col})" for col in columns])} | |
| """ | |
| # Batch insert | |
| batch_size = 100 | |
| for i in range(0, len(rows), batch_size): | |
| batch = rows[i:i+batch_size] | |
| values = [tuple(row[col] for col in columns) for row in batch] | |
| try: | |
| dest_cursor.executemany(insert_query, values) | |
| dest_conn.commit() | |
| print(f" Migrated {min(i+batch_size, len(rows))}/{len(rows)} rows...") | |
| except Error as e: | |
| print(f"❌ Error inserting batch: {e}") | |
| print(f"✅ Successfully migrated {table_name}") | |
| except Error as e: | |
| print(f"❌ Error migrating {table_name}: {e}") | |
| finally: | |
| if 'source_cursor' in locals(): | |
| source_cursor.close() | |
| if 'dest_cursor' in locals(): | |
| dest_cursor.close() | |
| def main(): | |
| print("🚀 Starting migration from Local MySQL to TiDB Cloud...") | |
| source_conn = create_source_connection() | |
| dest_conn = create_dest_connection() | |
| if not source_conn or not dest_conn: | |
| print("❌ Could not establish both connections. Aborting.") | |
| return | |
| tables = ['available_tickers', 'calendar', 'news', 'fundamental_analysis', 'signals'] | |
| for table in tables: | |
| migrate_table(source_conn, dest_conn, table) | |
| source_conn.close() | |
| dest_conn.close() | |
| print("\n✨ Migration completed!") | |
| if __name__ == "__main__": | |
| main() | |