123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- # -*- coding: utf-8 -*-
- """
- Created on 2016-11-16 16:25
- ---------
- @summary: 操作oracle数据库
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import datetime
- import json
- from urllib import parse
- from typing import List, Dict
- import pymysql
- from dbutils.pooled_db import PooledDB
- from pymysql import cursors
- from pymysql import err
- import feapder.setting as setting
- from feapder.utils.log import log
- from feapder.utils.tools import make_insert_sql, make_batch_sql, make_update_sql
- def auto_retry(func):
- def wapper(*args, **kwargs):
- for i in range(3):
- try:
- return func(*args, **kwargs)
- except (err.InterfaceError, err.OperationalError) as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, kwargs.get("sql") or args[1])
- )
- return wapper
- class MysqlDB:
- def __init__(
- self, ip=None, port=None, db=None, user_name=None, user_pass=None, **kwargs
- ):
- # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
- if not ip:
- ip = setting.MYSQL_IP
- if not port:
- port = setting.MYSQL_PORT
- if not db:
- db = setting.MYSQL_DB
- if not user_name:
- user_name = setting.MYSQL_USER_NAME
- if not user_pass:
- user_pass = setting.MYSQL_USER_PASS
- try:
- self.connect_pool = PooledDB(
- creator=pymysql,
- mincached=1,
- maxcached=100,
- maxconnections=100,
- blocking=True,
- ping=7,
- host=ip,
- port=port,
- user=user_name,
- passwd=user_pass,
- db=db,
- charset="utf8mb4",
- cursorclass=cursors.SSCursor,
- ) # cursorclass 使用服务的游标,默认的在多线程下大批量插入数据会使内存递增
- except Exception as e:
- log.error(
- """
- 连接数据失败:
- ip: {}
- port: {}
- db: {}
- user_name: {}
- user_pass: {}
- exception: {}
- """.format(
- ip, port, db, user_name, user_pass, e
- )
- )
- else:
- log.debug("连接到mysql数据库 %s : %s" % (ip, db))
- @classmethod
- def from_url(cls, url, **kwargs):
- # mysql://username:password@ip:port/db?charset=utf8mb4
- url_parsed = parse.urlparse(url)
- db_type = url_parsed.scheme.strip()
- if db_type != "mysql":
- raise Exception(
- "url error, expect mysql://username:ip:port/db?charset=utf8mb4, but get {}".format(
- url
- )
- )
- connect_params = {}
- connect_params["ip"] = url_parsed.hostname.strip()
- connect_params["port"] = url_parsed.port
- connect_params["user_name"] = url_parsed.username.strip()
- connect_params["user_pass"] = url_parsed.password.strip()
- connect_params["db"] = url_parsed.path.strip("/").strip()
- connect_params.update(kwargs)
- return cls(**connect_params)
- @staticmethod
- def unescape_string(value):
- if not isinstance(value, str):
- return value
- value = value.replace("\\0", "\0")
- value = value.replace("\\\\", "\\")
- value = value.replace("\\n", "\n")
- value = value.replace("\\r", "\r")
- value = value.replace("\\Z", "\032")
- value = value.replace('\\"', '"')
- value = value.replace("\\'", "'")
- return value
- def get_connection(self):
- conn = self.connect_pool.connection(shareable=False)
- # cursor = conn.cursor(cursors.SSCursor)
- cursor = conn.cursor()
- return conn, cursor
- def close_connection(self, conn, cursor):
- cursor.close()
- conn.close()
- def size_of_connections(self):
- """
- 当前活跃的连接数
- @return:
- """
- return self.connect_pool._connections
- def size_of_connect_pool(self):
- """
- 池子里一共有多少连接
- @return:
- """
- return len(self.connect_pool._idle_cache)
- @auto_retry
- def find(self, sql, limit=0, to_json=False):
- """
- @summary:
- 无数据: 返回()
- 有数据: 若limit == 1 则返回 (data1, data2)
- 否则返回 ((data1, data2),)
- ---------
- @param sql:
- @param limit:
- @param to_json 是否将查询结果转为json
- ---------
- @result:
- """
- conn, cursor = self.get_connection()
- cursor.execute(sql)
- if limit == 1:
- result = cursor.fetchone() # 全部查出来,截取 不推荐使用
- elif limit > 1:
- result = cursor.fetchmany(limit) # 全部查出来,截取 不推荐使用
- else:
- result = cursor.fetchall()
- if to_json:
- columns = [i[0] for i in cursor.description]
- # 处理数据
- def convert(col):
- if isinstance(col, (datetime.date, datetime.time)):
- return str(col)
- elif isinstance(col, str) and (
- col.startswith("{") or col.startswith("[")
- ):
- try:
- # col = self.unescape_string(col)
- return json.loads(col)
- except:
- return col
- else:
- # col = self.unescape_string(col)
- return col
- if limit == 1:
- result = [convert(col) for col in result]
- result = dict(zip(columns, result))
- else:
- result = [[convert(col) for col in row] for row in result]
- result = [dict(zip(columns, r)) for r in result]
- self.close_connection(conn, cursor)
- return result
- def add(self, sql, exception_callfunc=None):
- """
- Args:
- sql:
- exception_callfunc: 异常回调
- Returns: 添加行数
- """
- affect_count = None
- try:
- conn, cursor = self.get_connection()
- affect_count = cursor.execute(sql)
- conn.commit()
- except Exception as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, sql)
- )
- if exception_callfunc:
- exception_callfunc(e)
- finally:
- self.close_connection(conn, cursor)
- return affect_count
- def add_smart(self, table, data: Dict, **kwargs):
- """
- 添加数据, 直接传递json格式的数据,不用拼sql
- Args:
- table: 表名
- data: 字典 {"xxx":"xxx"}
- **kwargs:
- Returns: 添加行数
- """
- sql = make_insert_sql(table, data, **kwargs)
- return self.add(sql)
- def add_batch(self, sql, datas: List[Dict]):
- """
- @summary: 批量添加数据
- ---------
- @ param sql: insert ignore into (xxx,xxx) values (%s, %s, %s)
- # param datas: 列表 [{}, {}, {}]
- ---------
- @result: 添加行数
- """
- affect_count = None
- try:
- conn, cursor = self.get_connection()
- affect_count = cursor.executemany(sql, datas)
- conn.commit()
- except Exception as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, sql)
- )
- finally:
- self.close_connection(conn, cursor)
- return affect_count
- def add_batch_smart(self, table, datas: List[Dict], **kwargs):
- """
- 批量添加数据, 直接传递list格式的数据,不用拼sql
- Args:
- table: 表名
- datas: 列表 [{}, {}, {}]
- **kwargs:
- Returns: 添加行数
- """
- sql, datas = make_batch_sql(table, datas, **kwargs)
- return self.add_batch(sql, datas)
- def update(self, sql):
- try:
- conn, cursor = self.get_connection()
- cursor.execute(sql)
- conn.commit()
- except Exception as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, sql)
- )
- return False
- else:
- return True
- finally:
- self.close_connection(conn, cursor)
- def update_smart(self, table, data: Dict, condition):
- """
- 更新, 不用拼sql
- Args:
- table: 表名
- data: 数据 {"xxx":"xxx"}
- condition: 更新条件 where后面的条件,如 condition='status=1'
- Returns: True / False
- """
- sql = make_update_sql(table, data, condition)
- return self.update(sql)
- def delete(self, sql):
- """
- 删除
- Args:
- sql:
- Returns: True / False
- """
- try:
- conn, cursor = self.get_connection()
- cursor.execute(sql)
- conn.commit()
- except Exception as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, sql)
- )
- return False
- else:
- return True
- finally:
- self.close_connection(conn, cursor)
- def execute(self, sql):
- try:
- conn, cursor = self.get_connection()
- cursor.execute(sql)
- conn.commit()
- except Exception as e:
- log.error(
- """
- error:%s
- sql: %s
- """
- % (e, sql)
- )
- return False
- else:
- return True
- finally:
- self.close_connection(conn, cursor)
|