3
0

mysqldb.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2016-11-16 16:25
  4. ---------
  5. @summary: 操作oracle数据库
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import datetime
  11. import json
  12. from urllib import parse
  13. from typing import List, Dict
  14. import pymysql
  15. from dbutils.pooled_db import PooledDB
  16. from pymysql import cursors
  17. from pymysql import err
  18. import feapder.setting as setting
  19. from feapder.utils.log import log
  20. from feapder.utils.tools import make_insert_sql, make_batch_sql, make_update_sql
  21. def auto_retry(func):
  22. def wapper(*args, **kwargs):
  23. for i in range(3):
  24. try:
  25. return func(*args, **kwargs)
  26. except (err.InterfaceError, err.OperationalError) as e:
  27. log.error(
  28. """
  29. error:%s
  30. sql: %s
  31. """
  32. % (e, kwargs.get("sql") or args[1])
  33. )
  34. return wapper
  35. class MysqlDB:
  36. def __init__(
  37. self, ip=None, port=None, db=None, user_name=None, user_pass=None, **kwargs
  38. ):
  39. # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
  40. if not ip:
  41. ip = setting.MYSQL_IP
  42. if not port:
  43. port = setting.MYSQL_PORT
  44. if not db:
  45. db = setting.MYSQL_DB
  46. if not user_name:
  47. user_name = setting.MYSQL_USER_NAME
  48. if not user_pass:
  49. user_pass = setting.MYSQL_USER_PASS
  50. try:
  51. self.connect_pool = PooledDB(
  52. creator=pymysql,
  53. mincached=1,
  54. maxcached=100,
  55. maxconnections=100,
  56. blocking=True,
  57. ping=7,
  58. host=ip,
  59. port=port,
  60. user=user_name,
  61. passwd=user_pass,
  62. db=db,
  63. charset="utf8mb4",
  64. cursorclass=cursors.SSCursor,
  65. ) # cursorclass 使用服务的游标,默认的在多线程下大批量插入数据会使内存递增
  66. except Exception as e:
  67. log.error(
  68. """
  69. 连接数据失败:
  70. ip: {}
  71. port: {}
  72. db: {}
  73. user_name: {}
  74. user_pass: {}
  75. exception: {}
  76. """.format(
  77. ip, port, db, user_name, user_pass, e
  78. )
  79. )
  80. else:
  81. log.debug("连接到mysql数据库 %s : %s" % (ip, db))
  82. @classmethod
  83. def from_url(cls, url, **kwargs):
  84. # mysql://username:password@ip:port/db?charset=utf8mb4
  85. url_parsed = parse.urlparse(url)
  86. db_type = url_parsed.scheme.strip()
  87. if db_type != "mysql":
  88. raise Exception(
  89. "url error, expect mysql://username:ip:port/db?charset=utf8mb4, but get {}".format(
  90. url
  91. )
  92. )
  93. connect_params = {}
  94. connect_params["ip"] = url_parsed.hostname.strip()
  95. connect_params["port"] = url_parsed.port
  96. connect_params["user_name"] = url_parsed.username.strip()
  97. connect_params["user_pass"] = url_parsed.password.strip()
  98. connect_params["db"] = url_parsed.path.strip("/").strip()
  99. connect_params.update(kwargs)
  100. return cls(**connect_params)
  101. @staticmethod
  102. def unescape_string(value):
  103. if not isinstance(value, str):
  104. return value
  105. value = value.replace("\\0", "\0")
  106. value = value.replace("\\\\", "\\")
  107. value = value.replace("\\n", "\n")
  108. value = value.replace("\\r", "\r")
  109. value = value.replace("\\Z", "\032")
  110. value = value.replace('\\"', '"')
  111. value = value.replace("\\'", "'")
  112. return value
  113. def get_connection(self):
  114. conn = self.connect_pool.connection(shareable=False)
  115. # cursor = conn.cursor(cursors.SSCursor)
  116. cursor = conn.cursor()
  117. return conn, cursor
  118. def close_connection(self, conn, cursor):
  119. cursor.close()
  120. conn.close()
  121. def size_of_connections(self):
  122. """
  123. 当前活跃的连接数
  124. @return:
  125. """
  126. return self.connect_pool._connections
  127. def size_of_connect_pool(self):
  128. """
  129. 池子里一共有多少连接
  130. @return:
  131. """
  132. return len(self.connect_pool._idle_cache)
  133. @auto_retry
  134. def find(self, sql, limit=0, to_json=False):
  135. """
  136. @summary:
  137. 无数据: 返回()
  138. 有数据: 若limit == 1 则返回 (data1, data2)
  139. 否则返回 ((data1, data2),)
  140. ---------
  141. @param sql:
  142. @param limit:
  143. @param to_json 是否将查询结果转为json
  144. ---------
  145. @result:
  146. """
  147. conn, cursor = self.get_connection()
  148. cursor.execute(sql)
  149. if limit == 1:
  150. result = cursor.fetchone() # 全部查出来,截取 不推荐使用
  151. elif limit > 1:
  152. result = cursor.fetchmany(limit) # 全部查出来,截取 不推荐使用
  153. else:
  154. result = cursor.fetchall()
  155. if to_json:
  156. columns = [i[0] for i in cursor.description]
  157. # 处理数据
  158. def convert(col):
  159. if isinstance(col, (datetime.date, datetime.time)):
  160. return str(col)
  161. elif isinstance(col, str) and (
  162. col.startswith("{") or col.startswith("[")
  163. ):
  164. try:
  165. # col = self.unescape_string(col)
  166. return json.loads(col)
  167. except:
  168. return col
  169. else:
  170. # col = self.unescape_string(col)
  171. return col
  172. if limit == 1:
  173. result = [convert(col) for col in result]
  174. result = dict(zip(columns, result))
  175. else:
  176. result = [[convert(col) for col in row] for row in result]
  177. result = [dict(zip(columns, r)) for r in result]
  178. self.close_connection(conn, cursor)
  179. return result
  180. def add(self, sql, exception_callfunc=None):
  181. """
  182. Args:
  183. sql:
  184. exception_callfunc: 异常回调
  185. Returns: 添加行数
  186. """
  187. affect_count = None
  188. try:
  189. conn, cursor = self.get_connection()
  190. affect_count = cursor.execute(sql)
  191. conn.commit()
  192. except Exception as e:
  193. log.error(
  194. """
  195. error:%s
  196. sql: %s
  197. """
  198. % (e, sql)
  199. )
  200. if exception_callfunc:
  201. exception_callfunc(e)
  202. finally:
  203. self.close_connection(conn, cursor)
  204. return affect_count
  205. def add_smart(self, table, data: Dict, **kwargs):
  206. """
  207. 添加数据, 直接传递json格式的数据,不用拼sql
  208. Args:
  209. table: 表名
  210. data: 字典 {"xxx":"xxx"}
  211. **kwargs:
  212. Returns: 添加行数
  213. """
  214. sql = make_insert_sql(table, data, **kwargs)
  215. return self.add(sql)
  216. def add_batch(self, sql, datas: List[Dict]):
  217. """
  218. @summary: 批量添加数据
  219. ---------
  220. @ param sql: insert ignore into (xxx,xxx) values (%s, %s, %s)
  221. # param datas: 列表 [{}, {}, {}]
  222. ---------
  223. @result: 添加行数
  224. """
  225. affect_count = None
  226. try:
  227. conn, cursor = self.get_connection()
  228. affect_count = cursor.executemany(sql, datas)
  229. conn.commit()
  230. except Exception as e:
  231. log.error(
  232. """
  233. error:%s
  234. sql: %s
  235. """
  236. % (e, sql)
  237. )
  238. finally:
  239. self.close_connection(conn, cursor)
  240. return affect_count
  241. def add_batch_smart(self, table, datas: List[Dict], **kwargs):
  242. """
  243. 批量添加数据, 直接传递list格式的数据,不用拼sql
  244. Args:
  245. table: 表名
  246. datas: 列表 [{}, {}, {}]
  247. **kwargs:
  248. Returns: 添加行数
  249. """
  250. sql, datas = make_batch_sql(table, datas, **kwargs)
  251. return self.add_batch(sql, datas)
  252. def update(self, sql):
  253. try:
  254. conn, cursor = self.get_connection()
  255. cursor.execute(sql)
  256. conn.commit()
  257. except Exception as e:
  258. log.error(
  259. """
  260. error:%s
  261. sql: %s
  262. """
  263. % (e, sql)
  264. )
  265. return False
  266. else:
  267. return True
  268. finally:
  269. self.close_connection(conn, cursor)
  270. def update_smart(self, table, data: Dict, condition):
  271. """
  272. 更新, 不用拼sql
  273. Args:
  274. table: 表名
  275. data: 数据 {"xxx":"xxx"}
  276. condition: 更新条件 where后面的条件,如 condition='status=1'
  277. Returns: True / False
  278. """
  279. sql = make_update_sql(table, data, condition)
  280. return self.update(sql)
  281. def delete(self, sql):
  282. """
  283. 删除
  284. Args:
  285. sql:
  286. Returns: True / False
  287. """
  288. try:
  289. conn, cursor = self.get_connection()
  290. cursor.execute(sql)
  291. conn.commit()
  292. except Exception as e:
  293. log.error(
  294. """
  295. error:%s
  296. sql: %s
  297. """
  298. % (e, sql)
  299. )
  300. return False
  301. else:
  302. return True
  303. finally:
  304. self.close_connection(conn, cursor)
  305. def execute(self, sql):
  306. try:
  307. conn, cursor = self.get_connection()
  308. cursor.execute(sql)
  309. conn.commit()
  310. except Exception as e:
  311. log.error(
  312. """
  313. error:%s
  314. sql: %s
  315. """
  316. % (e, sql)
  317. )
  318. return False
  319. else:
  320. return True
  321. finally:
  322. self.close_connection(conn, cursor)