import mysql.connector class MysqlUtil: @staticmethod def connect_to_mysql(host,port,user,password,database): # 创建数据库连接 connection = mysql.connector.connect( host=host, # 数据库主机地址 user=user, # 数据库用户名 port=port, password=password, # 数据库密码 database=database # 数据库名称 ) return connection @staticmethod def execute_sql(connection,query,params): if connection.is_connected(): print('Connected to MySQL database') # 创建一个cursor对象,用于执行SQL语句 cursor = connection.cursor() # 执行SQL查询 cursor.execute(query,params) mysql_count = cursor.fetchone()[0] cursor.close() connection.close() print('MySQL connection is closed') return mysql_count @staticmethod def insert_data(connection, query, params): if connection.is_connected(): print('Connected to MySQL database') # 创建一个cursor对象,用于执行SQL语句 cursor = connection.cursor() try: # 执行插入数据的SQL语句 cursor.execute(query, params) # 提交事务 connection.commit() print("Data inserted successfully.") print(cursor.rowcount) # 返回影响的行数 except mysql.connector.Error as err: print(f"Error: {err}") connection.rollback() # 出错时回滚事务 return None finally: cursor.close() # connection.close() # print('MySQL connection is closed') @staticmethod def insert_many(connection, query, params_list): """ 批量插入数据 :param connection: MySQL 连接对象 :param query: SQL 插入语句 :param params_list: 包含多条数据的参数列表 """ if connection.is_connected(): print('Connected to MySQL database') # 创建一个 cursor 对象,用于执行 SQL 语句 cursor = connection.cursor() try: # 执行批量插入的 SQL 语句 cursor.executemany(query, params_list) # 提交事务 connection.commit() print(f"Data inserted successfully. {cursor.rowcount} rows affected.") except mysql.connector.Error as err: print(f"Error: {err}") connection.rollback() # 出错时回滚事务 finally: cursor.close() print('MySQL cursor is closed') @staticmethod def query_data(connection, query, params=None): """ 执行查询操作 :param connection: MySQL 连接对象 :param query: SQL 查询语句 :param params: SQL 查询参数 :return: 查询结果(tuple),如果没有结果返回 None """ if connection.is_connected(): print('Connected to MySQL database') cursor = connection.cursor() try: cursor.execute(query, params) # 执行查询 result = cursor.fetchall() # 获取所有查询结果 return result except mysql.connector.Error as err: print(f"Error: {err}") return None finally: cursor.close() print('MySQL cursor is closed') @staticmethod def update_data(connection, query, params): """ 执行更新操作 :param connection: MySQL 连接对象 :param query: SQL 更新语句 :param params: SQL 更新参数 """ if connection.is_connected(): print('Connected to MySQL database') cursor = connection.cursor() try: cursor.execute(query, params) # 执行更新 connection.commit() # 提交事务 print(f"{cursor.rowcount} rows affected.") except mysql.connector.Error as err: print(f"Error: {err}") connection.rollback() # 出错时回滚事务 finally: cursor.close() print('MySQL cursor is closed')