Easy-Financial-Report / service /mysql_service.py
baba521's picture
test
f71610e
import gradio as gr
import pandas as pd
from sqlalchemy import create_engine, text
# 数据库连接配置
DB_CONFIG = {
'host': 'rm-j6c5yhe0l739e7752vo.mysql.cnhk.rds.aliyuncs.com',
'user': 'report_user',
'password': 'report_user_123',
'database': 'easy_financial_report'
}
def get_database_url():
"""构造数据库连接URL"""
return f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}/{DB_CONFIG['database']}"
def execute_query_with_connection(engine, query, params=None):
"""
自动打开和关闭数据库连接执行查询
Args:
engine: SQLAlchemy引擎实例
query: SQL查询语句
params: 查询参数(可选)
Returns:
查询结果或错误信息
"""
try:
with engine.connect() as conn:
if params:
result = conn.execute(text(query), params)
else:
result = conn.execute(text(query))
return result.fetchall()
except Exception as e:
return f"查询执行失败: {str(e)}"
def execute_query(query):
"""执行SQL查询并返回结果"""
if not query.strip():
return "请输入SQL查询语句"
try:
# 创建数据库连接引擎
engine = create_engine(get_database_url())
# 使用上下文管理器执行查询
with engine.connect() as conn:
df = pd.read_sql_query(text(query), conn)
return df
except Exception as e:
return f"查询执行失败: {str(e)}"
# 引擎会在with语句结束后自动清理连接
def get_table_names():
"""获取数据库中的所有表名"""
try:
# 创建数据库连接引擎
engine = create_engine(get_database_url())
# 使用上下文管理器执行查询
with engine.connect() as conn:
# 查询所有表名
query = "SHOW TABLES"
df = pd.read_sql_query(text(query), conn)
# 返回表名列表
return df.iloc[:, 0].tolist() if not df.empty else []
except Exception as e:
return [f"获取表名失败: {str(e)}"]
# 引擎会在with语句结束后自动清理连接
def preview_table(table_name):
"""预览表的前几行数据"""
if not table_name or "失败" in table_name:
return "请选择有效的表名"
query = f"SELECT * FROM {table_name} LIMIT 10"
return execute_query(query)
# 新增功能函数
def insert_record(title):
"""向report_file_link表插入新记录"""
if not title.strip():
return "请输入标题"
try:
engine = create_engine(get_database_url())
# 插入新记录
query = "INSERT INTO report_file_link (title) VALUES (:title)"
with engine.connect() as conn:
trans = conn.begin()
try:
conn.execute(text(query), {"title": title})
trans.commit()
return f"成功插入记录: {title}"
except Exception as e:
trans.rollback()
raise e
except Exception as e:
return f"插入记录失败: {str(e)}"
# 引擎会在with语句结束后自动清理连接
def update_record(record_id, new_title):
"""更新report_file_link表中的记录"""
if not record_id or not new_title.strip():
return "请输入记录ID和新标题"
try:
engine = create_engine(get_database_url())
# 更新记录
query = "UPDATE report_file_link SET title = :title WHERE id = :id"
with engine.connect() as conn:
trans = conn.begin()
try:
result = conn.execute(text(query), {"title": new_title, "id": record_id})
trans.commit()
if result.rowcount > 0:
return f"成功更新记录ID {record_id} 的标题为: {new_title}"
else:
return f"未找到ID为 {record_id} 的记录"
except Exception as e:
trans.rollback()
raise e
except Exception as e:
return f"更新记录失败: {str(e)}"
# 引擎会在with语句结束后自动清理连接
def delete_record(record_id):
"""从report_file_link表中删除记录"""
if not record_id:
return "请输入记录ID"
try:
engine = create_engine(get_database_url())
# 删除记录
query = "DELETE FROM report_file_link WHERE id = :id"
with engine.connect() as conn:
trans = conn.begin()
try:
result = conn.execute(text(query), {"id": record_id})
trans.commit()
if result.rowcount > 0:
return f"成功删除ID为 {record_id} 的记录"
else:
return f"未找到ID为 {record_id} 的记录"
except Exception as e:
trans.rollback()
raise e
except Exception as e:
return f"删除记录失败: {str(e)}"
# 引擎会在with语句结束后自动清理连接
def refresh_report_file_link():
"""刷新report_file_link表的数据"""
return execute_query("SELECT * FROM report_file_link")
# 新增功能函数
def insert_company(company_name, stock_code):
try:
engine = create_engine(get_database_url())
# 插入新记录
query = "INSERT INTO company (company_name, stock_code) VALUES (:company_name, :stock_code)"
with engine.connect() as conn:
trans = conn.begin()
try:
conn.execute(text(query), {"company_name": company_name, "stock_code": stock_code})
trans.commit()
return True
except Exception as e:
trans.rollback()
raise e
except Exception as e:
return False
# 引擎会在with语句结束后自动清理连接
def get_companys():
"""获取company表中的所有公司"""
query = "SELECT * FROM company"
return execute_query(query)
def get_company_by_name(company_name):
"""根据公司名称获取公司信息"""
query = "SELECT * FROM company WHERE company_name = :company_name"
try:
engine = create_engine(get_database_url())
with engine.connect() as conn:
df = pd.read_sql_query(text(query), conn, params={"company_name": company_name})
return df
except Exception as e:
return f"查询执行失败: {str(e)}"