import gradio as gr import pandas as pd from sqlalchemy import create_engine, text # 数据库连接配置 DB_CONFIG = { 'host': 'rm-j6c5yhe0l739e7752vo.mysql.cnhk.rds.aliyuncs.com', 'user': 'report_user', 'password': 'report_user_123', 'database': 'easy_financial_report' } def get_database_url(): """构造数据库连接URL""" return f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}/{DB_CONFIG['database']}" def execute_query_with_connection(engine, query, params=None): """ 自动打开和关闭数据库连接执行查询 Args: engine: SQLAlchemy引擎实例 query: SQL查询语句 params: 查询参数(可选) Returns: 查询结果或错误信息 """ try: with engine.connect() as conn: if params: result = conn.execute(text(query), params) else: result = conn.execute(text(query)) return result.fetchall() except Exception as e: return f"查询执行失败: {str(e)}" def execute_query(query): """执行SQL查询并返回结果""" if not query.strip(): return "请输入SQL查询语句" try: # 创建数据库连接引擎 engine = create_engine(get_database_url()) # 使用上下文管理器执行查询 with engine.connect() as conn: df = pd.read_sql_query(text(query), conn) return df except Exception as e: return f"查询执行失败: {str(e)}" # 引擎会在with语句结束后自动清理连接 def get_table_names(): """获取数据库中的所有表名""" try: # 创建数据库连接引擎 engine = create_engine(get_database_url()) # 使用上下文管理器执行查询 with engine.connect() as conn: # 查询所有表名 query = "SHOW TABLES" df = pd.read_sql_query(text(query), conn) # 返回表名列表 return df.iloc[:, 0].tolist() if not df.empty else [] except Exception as e: return [f"获取表名失败: {str(e)}"] # 引擎会在with语句结束后自动清理连接 def preview_table(table_name): """预览表的前几行数据""" if not table_name or "失败" in table_name: return "请选择有效的表名" query = f"SELECT * FROM {table_name} LIMIT 10" return execute_query(query) # 新增功能函数 def insert_record(title): """向report_file_link表插入新记录""" if not title.strip(): return "请输入标题" try: engine = create_engine(get_database_url()) # 插入新记录 query = "INSERT INTO report_file_link (title) VALUES (:title)" with engine.connect() as conn: trans = conn.begin() try: conn.execute(text(query), {"title": title}) trans.commit() return f"成功插入记录: {title}" except Exception as e: trans.rollback() raise e except Exception as e: return f"插入记录失败: {str(e)}" # 引擎会在with语句结束后自动清理连接 def update_record(record_id, new_title): """更新report_file_link表中的记录""" if not record_id or not new_title.strip(): return "请输入记录ID和新标题" try: engine = create_engine(get_database_url()) # 更新记录 query = "UPDATE report_file_link SET title = :title WHERE id = :id" with engine.connect() as conn: trans = conn.begin() try: result = conn.execute(text(query), {"title": new_title, "id": record_id}) trans.commit() if result.rowcount > 0: return f"成功更新记录ID {record_id} 的标题为: {new_title}" else: return f"未找到ID为 {record_id} 的记录" except Exception as e: trans.rollback() raise e except Exception as e: return f"更新记录失败: {str(e)}" # 引擎会在with语句结束后自动清理连接 def delete_record(record_id): """从report_file_link表中删除记录""" if not record_id: return "请输入记录ID" try: engine = create_engine(get_database_url()) # 删除记录 query = "DELETE FROM report_file_link WHERE id = :id" with engine.connect() as conn: trans = conn.begin() try: result = conn.execute(text(query), {"id": record_id}) trans.commit() if result.rowcount > 0: return f"成功删除ID为 {record_id} 的记录" else: return f"未找到ID为 {record_id} 的记录" except Exception as e: trans.rollback() raise e except Exception as e: return f"删除记录失败: {str(e)}" # 引擎会在with语句结束后自动清理连接 def refresh_report_file_link(): """刷新report_file_link表的数据""" return execute_query("SELECT * FROM report_file_link") # 新增功能函数 def insert_company(company_name, stock_code): try: engine = create_engine(get_database_url()) # 插入新记录 query = "INSERT INTO company (company_name, stock_code) VALUES (:company_name, :stock_code)" with engine.connect() as conn: trans = conn.begin() try: conn.execute(text(query), {"company_name": company_name, "stock_code": stock_code}) trans.commit() return True except Exception as e: trans.rollback() raise e except Exception as e: return False # 引擎会在with语句结束后自动清理连接 def get_companys(): """获取company表中的所有公司""" query = "SELECT * FROM company" return execute_query(query) def get_company_by_name(company_name): """根据公司名称获取公司信息""" query = "SELECT * FROM company WHERE company_name = :company_name" try: engine = create_engine(get_database_url()) with engine.connect() as conn: df = pd.read_sql_query(text(query), conn, params={"company_name": company_name}) return df except Exception as e: return f"查询执行失败: {str(e)}"