|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
from sqlalchemy import create_engine, text |
|
|
|
|
|
|
|
|
DB_CONFIG = { |
|
|
'host': 'rm-j6c5yhe0l739e7752vo.mysql.cnhk.rds.aliyuncs.com', |
|
|
'user': 'report_user', |
|
|
'password': 'report_user_123', |
|
|
'database': 'easy_financial_report' |
|
|
} |
|
|
|
|
|
def get_database_url(): |
|
|
"""构造数据库连接URL""" |
|
|
return f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}/{DB_CONFIG['database']}" |
|
|
|
|
|
def execute_query_with_connection(engine, query, params=None): |
|
|
""" |
|
|
自动打开和关闭数据库连接执行查询 |
|
|
|
|
|
Args: |
|
|
engine: SQLAlchemy引擎实例 |
|
|
query: SQL查询语句 |
|
|
params: 查询参数(可选) |
|
|
|
|
|
Returns: |
|
|
查询结果或错误信息 |
|
|
""" |
|
|
try: |
|
|
with engine.connect() as conn: |
|
|
if params: |
|
|
result = conn.execute(text(query), params) |
|
|
else: |
|
|
result = conn.execute(text(query)) |
|
|
return result.fetchall() |
|
|
except Exception as e: |
|
|
return f"查询执行失败: {str(e)}" |
|
|
|
|
|
def execute_query(query): |
|
|
"""执行SQL查询并返回结果""" |
|
|
if not query.strip(): |
|
|
return "请输入SQL查询语句" |
|
|
|
|
|
try: |
|
|
|
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
|
|
|
with engine.connect() as conn: |
|
|
df = pd.read_sql_query(text(query), conn) |
|
|
return df |
|
|
except Exception as e: |
|
|
return f"查询执行失败: {str(e)}" |
|
|
|
|
|
|
|
|
def get_table_names(): |
|
|
"""获取数据库中的所有表名""" |
|
|
try: |
|
|
|
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
|
|
|
with engine.connect() as conn: |
|
|
|
|
|
query = "SHOW TABLES" |
|
|
df = pd.read_sql_query(text(query), conn) |
|
|
|
|
|
|
|
|
return df.iloc[:, 0].tolist() if not df.empty else [] |
|
|
except Exception as e: |
|
|
return [f"获取表名失败: {str(e)}"] |
|
|
|
|
|
|
|
|
def preview_table(table_name): |
|
|
"""预览表的前几行数据""" |
|
|
if not table_name or "失败" in table_name: |
|
|
return "请选择有效的表名" |
|
|
|
|
|
query = f"SELECT * FROM {table_name} LIMIT 10" |
|
|
return execute_query(query) |
|
|
|
|
|
|
|
|
def insert_record(title): |
|
|
"""向report_file_link表插入新记录""" |
|
|
if not title.strip(): |
|
|
return "请输入标题" |
|
|
|
|
|
try: |
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
query = "INSERT INTO report_file_link (title) VALUES (:title)" |
|
|
with engine.connect() as conn: |
|
|
trans = conn.begin() |
|
|
try: |
|
|
conn.execute(text(query), {"title": title}) |
|
|
trans.commit() |
|
|
return f"成功插入记录: {title}" |
|
|
except Exception as e: |
|
|
trans.rollback() |
|
|
raise e |
|
|
except Exception as e: |
|
|
return f"插入记录失败: {str(e)}" |
|
|
|
|
|
|
|
|
def update_record(record_id, new_title): |
|
|
"""更新report_file_link表中的记录""" |
|
|
if not record_id or not new_title.strip(): |
|
|
return "请输入记录ID和新标题" |
|
|
|
|
|
try: |
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
query = "UPDATE report_file_link SET title = :title WHERE id = :id" |
|
|
with engine.connect() as conn: |
|
|
trans = conn.begin() |
|
|
try: |
|
|
result = conn.execute(text(query), {"title": new_title, "id": record_id}) |
|
|
trans.commit() |
|
|
|
|
|
if result.rowcount > 0: |
|
|
return f"成功更新记录ID {record_id} 的标题为: {new_title}" |
|
|
else: |
|
|
return f"未找到ID为 {record_id} 的记录" |
|
|
except Exception as e: |
|
|
trans.rollback() |
|
|
raise e |
|
|
except Exception as e: |
|
|
return f"更新记录失败: {str(e)}" |
|
|
|
|
|
|
|
|
def delete_record(record_id): |
|
|
"""从report_file_link表中删除记录""" |
|
|
if not record_id: |
|
|
return "请输入记录ID" |
|
|
|
|
|
try: |
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
query = "DELETE FROM report_file_link WHERE id = :id" |
|
|
with engine.connect() as conn: |
|
|
trans = conn.begin() |
|
|
try: |
|
|
result = conn.execute(text(query), {"id": record_id}) |
|
|
trans.commit() |
|
|
|
|
|
if result.rowcount > 0: |
|
|
return f"成功删除ID为 {record_id} 的记录" |
|
|
else: |
|
|
return f"未找到ID为 {record_id} 的记录" |
|
|
except Exception as e: |
|
|
trans.rollback() |
|
|
raise e |
|
|
except Exception as e: |
|
|
return f"删除记录失败: {str(e)}" |
|
|
|
|
|
|
|
|
def refresh_report_file_link(): |
|
|
"""刷新report_file_link表的数据""" |
|
|
return execute_query("SELECT * FROM report_file_link") |
|
|
|
|
|
|
|
|
def insert_company(company_name, stock_code): |
|
|
try: |
|
|
engine = create_engine(get_database_url()) |
|
|
|
|
|
query = "INSERT INTO company (company_name, stock_code) VALUES (:company_name, :stock_code)" |
|
|
with engine.connect() as conn: |
|
|
trans = conn.begin() |
|
|
try: |
|
|
conn.execute(text(query), {"company_name": company_name, "stock_code": stock_code}) |
|
|
trans.commit() |
|
|
return True |
|
|
except Exception as e: |
|
|
trans.rollback() |
|
|
raise e |
|
|
except Exception as e: |
|
|
return False |
|
|
|
|
|
|
|
|
def get_companys(): |
|
|
"""获取company表中的所有公司""" |
|
|
query = "SELECT * FROM company" |
|
|
return execute_query(query) |
|
|
|
|
|
def get_company_by_name(company_name): |
|
|
"""根据公司名称获取公司信息""" |
|
|
query = "SELECT * FROM company WHERE company_name = :company_name" |
|
|
try: |
|
|
engine = create_engine(get_database_url()) |
|
|
with engine.connect() as conn: |
|
|
df = pd.read_sql_query(text(query), conn, params={"company_name": company_name}) |
|
|
return df |
|
|
except Exception as e: |
|
|
return f"查询执行失败: {str(e)}" |