mysql_pipeline.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2018-07-29 22:48:30
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import feapder.utils.tools as tools
  12. from feapder.db.mysqldb import MysqlDB
  13. from feapder.pipelines import BasePipeline
  14. from feapder.utils.log import log
  15. class MysqlPipeline(BasePipeline):
  16. def __init__(self):
  17. self._to_db = None
  18. @property
  19. def to_db(self):
  20. if not self._to_db:
  21. self._to_db = MysqlDB()
  22. return self._to_db
  23. def save_items(self, table, items: List[Dict]) -> bool:
  24. """
  25. 保存数据
  26. Args:
  27. table: 表名
  28. items: 数据,[{},{},...]
  29. Returns: 是否保存成功 True / False
  30. 若False,不会将本批数据入到去重库,以便再次入库
  31. """
  32. sql, datas = tools.make_batch_sql(table, items)
  33. add_count = self.to_db.add_batch(sql, datas)
  34. datas_size = len(datas)
  35. if add_count:
  36. log.info(
  37. "共导出 %s 条数据 到 %s, 重复 %s 条" % (datas_size, table, datas_size - add_count)
  38. )
  39. return add_count != None
  40. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  41. """
  42. 更新数据
  43. Args:
  44. table: 表名
  45. items: 数据,[{},{},...]
  46. update_keys: 更新的字段, 如 ("title", "publish_time")
  47. Returns: 是否更新成功 True / False
  48. 若False,不会将本批数据入到去重库,以便再次入库
  49. """
  50. sql, datas = tools.make_batch_sql(
  51. table, items, update_columns=update_keys or list(items[0].keys())
  52. )
  53. update_count = self.to_db.add_batch(sql, datas)
  54. if update_count:
  55. msg = "共更新 %s 条数据 到 %s" % (update_count // 2, table)
  56. if update_keys:
  57. msg += " 更新字段为 {}".format(update_keys)
  58. log.info(msg)
  59. return update_count != None