123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import mysql.connector
- class MysqlUtil:
- @staticmethod
- def connect_to_mysql(host,port,user,password,database):
- # 创建数据库连接
- connection = mysql.connector.connect(
- host=host, # 数据库主机地址
- user=user, # 数据库用户名
- port=port,
- password=password, # 数据库密码
- database=database # 数据库名称
- )
- return connection
- @staticmethod
- def execute_sql(connection,query,params):
- if connection.is_connected():
- print('Connected to MySQL database')
- # 创建一个cursor对象,用于执行SQL语句
- cursor = connection.cursor()
- # 执行SQL查询
- cursor.execute(query,params)
- mysql_count = cursor.fetchone()[0]
- cursor.close()
- connection.close()
- print('MySQL connection is closed')
- return mysql_count
- @staticmethod
- def insert_data(connection, query, params):
- if connection.is_connected():
- print('Connected to MySQL database')
- # 创建一个cursor对象,用于执行SQL语句
- cursor = connection.cursor()
- try:
- # 执行插入数据的SQL语句
- cursor.execute(query, params)
- # 提交事务
- connection.commit()
- print("Data inserted successfully.")
- print(cursor.rowcount) # 返回影响的行数
- except mysql.connector.Error as err:
- print(f"Error: {err}")
- connection.rollback() # 出错时回滚事务
- return None
- finally:
- cursor.close()
- # connection.close()
- # print('MySQL connection is closed')
- @staticmethod
- def insert_many(connection, query, params_list):
- """
- 批量插入数据
- :param connection: MySQL 连接对象
- :param query: SQL 插入语句
- :param params_list: 包含多条数据的参数列表
- """
- if connection.is_connected():
- print('Connected to MySQL database')
- # 创建一个 cursor 对象,用于执行 SQL 语句
- cursor = connection.cursor()
- try:
- # 执行批量插入的 SQL 语句
- cursor.executemany(query, params_list)
- # 提交事务
- connection.commit()
- print(f"Data inserted successfully. {cursor.rowcount} rows affected.")
- except mysql.connector.Error as err:
- print(f"Error: {err}")
- connection.rollback() # 出错时回滚事务
- finally:
- cursor.close()
- print('MySQL cursor is closed')
- @staticmethod
- def query_data(connection, query, params=None):
- """
- 执行查询操作
- :param connection: MySQL 连接对象
- :param query: SQL 查询语句
- :param params: SQL 查询参数
- :return: 查询结果(tuple),如果没有结果返回 None
- """
- if connection.is_connected():
- print('Connected to MySQL database')
- cursor = connection.cursor()
- try:
- cursor.execute(query, params) # 执行查询
- result = cursor.fetchall() # 获取所有查询结果
- return result
- except mysql.connector.Error as err:
- print(f"Error: {err}")
- return None
- finally:
- cursor.close()
- print('MySQL cursor is closed')
- @staticmethod
- def update_data(connection, query, params):
- """
- 执行更新操作
- :param connection: MySQL 连接对象
- :param query: SQL 更新语句
- :param params: SQL 更新参数
- """
- if connection.is_connected():
- print('Connected to MySQL database')
- cursor = connection.cursor()
- try:
- cursor.execute(query, params) # 执行更新
- connection.commit() # 提交事务
- print(f"{cursor.rowcount} rows affected.")
- except mysql.connector.Error as err:
- print(f"Error: {err}")
- connection.rollback() # 出错时回滚事务
- finally:
- cursor.close()
- print('MySQL cursor is closed')
|