item_buffer.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2018-06-19 17:17
  4. ---------
  5. @summary: item 管理器, 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import importlib
  11. import threading
  12. from queue import Queue
  13. import feapder.setting as setting
  14. import feapder.utils.tools as tools
  15. from feapder.db.redisdb import RedisDB
  16. from feapder.dedup import Dedup
  17. from feapder.network.item import Item, UpdateItem
  18. from feapder.pipelines import BasePipeline
  19. from feapder.pipelines.mysql_pipeline import MysqlPipeline
  20. from feapder.utils import metrics
  21. from feapder.utils.log import log
  22. MAX_ITEM_COUNT = 5000 # 缓存中最大item数
  23. UPLOAD_BATCH_MAX_SIZE = 1000
  24. MYSQL_PIPELINE_PATH = "feapder.pipelines.mysql_pipeline.MysqlPipeline"
  25. class ItemBuffer(threading.Thread):
  26. dedup = None
  27. __redis_db = None
  28. def __init__(self, redis_key, task_table=None):
  29. if not hasattr(self, "_table_item"):
  30. super(ItemBuffer, self).__init__()
  31. self._thread_stop = False
  32. self._is_adding_to_db = False
  33. self._redis_key = redis_key
  34. self._task_table = task_table
  35. self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
  36. self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key)
  37. self._table_failed_items = setting.TAB_FAILED_ITEMS.format(
  38. redis_key=redis_key
  39. )
  40. self._item_tables = {
  41. # 'item_name': 'table_name' # 缓存item名与表名对应关系
  42. }
  43. self._item_update_keys = {
  44. # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
  45. }
  46. self._pipelines = self.load_pipelines()
  47. self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES
  48. self._mysql_pipeline = None
  49. if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
  50. self.__class__.dedup = Dedup(
  51. to_md5=False, **setting.ITEM_FILTER_SETTING
  52. )
  53. # 导出重试的次数
  54. self.export_retry_times = 0
  55. # 导出失败的次数 TODO 非air爬虫使用redis统计
  56. self.export_falied_times = 0
  57. @property
  58. def redis_db(self):
  59. if self.__class__.__redis_db is None:
  60. self.__class__.__redis_db = RedisDB()
  61. return self.__class__.__redis_db
  62. def load_pipelines(self):
  63. pipelines = []
  64. for pipeline_path in setting.ITEM_PIPELINES:
  65. module, class_name = pipeline_path.rsplit(".", 1)
  66. pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
  67. pipeline = pipeline_cls()
  68. if not isinstance(pipeline, BasePipeline):
  69. raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
  70. pipelines.append(pipeline)
  71. return pipelines
  72. @property
  73. def mysql_pipeline(self):
  74. if not self._mysql_pipeline:
  75. module, class_name = MYSQL_PIPELINE_PATH.rsplit(".", 1)
  76. pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
  77. self._mysql_pipeline = pipeline_cls()
  78. return self._mysql_pipeline
  79. def run(self): # step 1 开始
  80. self._thread_stop = False
  81. while not self._thread_stop: # 爬虫不停止,就一直循环刷新
  82. self.flush()
  83. tools.delay_time(1)
  84. self.close()
  85. def stop(self):
  86. self._thread_stop = True
  87. self._started.clear()
  88. def put_item(self, item): # step 存储数据的入口 将需要存储的数据放入数据管道队列
  89. if isinstance(item, Item):
  90. # 入库前的回调
  91. if item.item_name == "ListItem": # 测试框架有用,对listitem不进行存储,正式框架没有这个判断
  92. return
  93. item.pre_to_db()
  94. # print(item)
  95. if item.save: # 根据save字段,判断该条信息是否存储
  96. self._items_queue.put(item)
  97. else:
  98. self._items_queue.put(item)
  99. def flush(self):
  100. try:
  101. items = []
  102. update_items = []
  103. requests = []
  104. callbacks = []
  105. items_fingerprints = []
  106. data_count = 0
  107. while not self._items_queue.empty(): # step 2 数据管道队列不为空时时 不等待直接取值
  108. data = self._items_queue.get_nowait() # 队列的 不等待直接取值方法,类似get
  109. data_count += 1
  110. # data 分类
  111. if callable(data):
  112. callbacks.append(data)
  113. elif isinstance(data, UpdateItem): # 更新型数据,走更新管道,采集框架只存不更新,可以忽略不看
  114. update_items.append(data)
  115. elif isinstance(data, Item):
  116. items.append(data)
  117. if setting.ITEM_FILTER_ENABLE: # item去重,对于当前框架,无效,不看
  118. items_fingerprints.append(data.fingerprint)
  119. else: # request-redis
  120. requests.append(data)
  121. if data_count >= UPLOAD_BATCH_MAX_SIZE: # step 3 需要存储的数据,达到一定数量后,统一存储
  122. self.__add_item_to_db(
  123. items, update_items, requests, callbacks, items_fingerprints
  124. )
  125. items = []
  126. update_items = []
  127. requests = []
  128. callbacks = []
  129. items_fingerprints = []
  130. data_count = 0
  131. if data_count: # step 3 管道为空后,将剩余的数据,统一存储
  132. self.__add_item_to_db(
  133. items, update_items, requests, callbacks, items_fingerprints
  134. )
  135. except Exception as e:
  136. log.exception(e)
  137. def get_items_count(self):
  138. return self._items_queue.qsize()
  139. def is_adding_to_db(self):
  140. return self._is_adding_to_db
  141. def __dedup_items(self, items, items_fingerprints):
  142. """
  143. 去重
  144. @param items:
  145. @param items_fingerprints:
  146. @return: 返回去重后的items, items_fingerprints
  147. """
  148. if not items:
  149. return items, items_fingerprints
  150. is_exists = self.__class__.dedup.get(items_fingerprints)
  151. is_exists = is_exists if isinstance(is_exists, list) else [is_exists]
  152. dedup_items = []
  153. dedup_items_fingerprints = []
  154. items_count = dedup_items_count = dup_items_count = 0
  155. while is_exists:
  156. item = items.pop(0)
  157. items_fingerprint = items_fingerprints.pop(0)
  158. is_exist = is_exists.pop(0)
  159. items_count += 1
  160. if not is_exist:
  161. dedup_items.append(item)
  162. dedup_items_fingerprints.append(items_fingerprint)
  163. dedup_items_count += 1
  164. else:
  165. dup_items_count += 1
  166. log.info(
  167. "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format(
  168. items_count, dup_items_count, dedup_items_count
  169. )
  170. )
  171. return dedup_items, dedup_items_fingerprints
  172. def __pick_items(self, items, is_update_item=False):
  173. """
  174. 将每个表之间的数据分开 拆分后 原items为空
  175. @param items:
  176. @param is_update_item:
  177. @return:
  178. """
  179. datas_dict = {
  180. # 'table_name': [{}, {}]
  181. }
  182. while items:
  183. item = items.pop(0)
  184. # 取item下划线格式的名
  185. # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
  186. item_name = item.item_name
  187. table_name = self._item_tables.get(item_name)
  188. if not table_name:
  189. table_name = item.table_name
  190. self._item_tables[item_name] = table_name
  191. if table_name not in datas_dict:
  192. datas_dict[table_name] = []
  193. datas_dict[table_name].append(item.to_dict)
  194. if is_update_item and table_name not in self._item_update_keys:
  195. self._item_update_keys[table_name] = item.update_key
  196. return datas_dict
  197. def __export_to_db(self, table, datas, is_update=False, update_keys=()):
  198. # step 3.1.1 打点 记录总条数及每个key情况
  199. self.check_datas(table=table, datas=datas)
  200. for pipeline in self._pipelines: # setting 配置的piplines方法
  201. if is_update: # 更新方法 不看
  202. if table == self._task_table and not isinstance(
  203. pipeline, MysqlPipeline
  204. ):
  205. continue
  206. if not pipeline.update_items(table, datas, update_keys=update_keys):
  207. log.error(
  208. f"{pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
  209. )
  210. return False
  211. else:
  212. if not pipeline.save_items(table, datas): # step 3.1.2 调用pipline的 save_items 方法
  213. log.error(
  214. f"{pipeline.__class__.__name__} 保存数据失败. table: {table} items: {datas}"
  215. )
  216. return False
  217. # 若是任务表, 且上面的pipeline里没mysql,则需调用mysql更新任务
  218. if not self._have_mysql_pipeline and is_update and table == self._task_table:
  219. if not self.mysql_pipeline.update_items(
  220. table, datas, update_keys=update_keys
  221. ):
  222. log.error(
  223. f"{self.mysql_pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
  224. )
  225. return False
  226. return True
  227. def __add_item_to_db(
  228. self, items, update_items, requests, callbacks, items_fingerprints
  229. ):
  230. export_success = True
  231. self._is_adding_to_db = True
  232. # 去重 item去重,不看
  233. if setting.ITEM_FILTER_ENABLE:
  234. items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
  235. # step 分捡 将每个表之间的数据分开 拆分后 原items为空
  236. items_dict = self.__pick_items(items)
  237. update_items_dict = self.__pick_items(update_items, is_update_item=True)
  238. # item批量入库
  239. failed_items = {"add": [], "update": [], "requests": []}
  240. while items_dict:
  241. table, datas = items_dict.popitem()
  242. log.debug(
  243. """
  244. -------------- item 批量入库 --------------
  245. 表名: %s
  246. datas: %s
  247. """
  248. % (table, tools.dumps_json(datas, indent=16))
  249. )
  250. if not self.__export_to_db(table, datas): # step 3.1 导出到数据库
  251. export_success = False
  252. failed_items["add"].append({"table": table, "datas": datas})
  253. # 执行批量update
  254. while update_items_dict:
  255. table, datas = update_items_dict.popitem()
  256. log.debug(
  257. """
  258. -------------- item 批量更新 --------------
  259. 表名: %s
  260. datas: %s
  261. """
  262. % (table, tools.dumps_json(datas, indent=16))
  263. )
  264. update_keys = self._item_update_keys.get(table)
  265. if not self.__export_to_db(
  266. table, datas, is_update=True, update_keys=update_keys
  267. ):
  268. export_success = False
  269. failed_items["update"].append({"table": table, "datas": datas})
  270. if export_success:
  271. # step 3.2 保存成功后,执行的执行回调
  272. while callbacks:
  273. try:
  274. callback = callbacks.pop(0)
  275. callback()
  276. except Exception as e:
  277. log.exception(e)
  278. # step 删除做过的request
  279. if requests:
  280. self.redis_db.zrem(self._table_request, requests)
  281. # 去重入库 不走这个去重
  282. if setting.ITEM_FILTER_ENABLE:
  283. if items_fingerprints:
  284. self.__class__.dedup.add(items_fingerprints, skip_check=True)
  285. else:
  286. # step 3.2 保存失败后,执行的执行回调
  287. failed_items["requests"] = requests
  288. if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
  289. if self._redis_key != "air_spider":
  290. # 失败的item记录到redis
  291. self.redis_db.sadd(self._table_failed_items, failed_items)
  292. # 删除做过的request
  293. if requests:
  294. self.redis_db.zrem(self._table_request, requests)
  295. log.error(
  296. "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
  297. tools.dumps_json(failed_items)
  298. )
  299. )
  300. self.export_retry_times = 0
  301. else:
  302. tip = ["入库不成功"]
  303. if callbacks:
  304. tip.append("不执行回调")
  305. if requests:
  306. tip.append("不删除任务")
  307. exists = self.redis_db.zexists(self._table_request, requests)
  308. for exist, request in zip(exists, requests):
  309. if exist:
  310. self.redis_db.zadd(self._table_request, requests, 300)
  311. if setting.ITEM_FILTER_ENABLE:
  312. tip.append("数据不入去重库")
  313. if self._redis_key != "air_spider":
  314. tip.append("将自动重试")
  315. tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
  316. log.error(",".join(tip))
  317. self.export_falied_times += 1
  318. if self._redis_key != "air_spider":
  319. self.export_retry_times += 1
  320. if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  321. # 报警
  322. msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
  323. self._redis_key, self.export_falied_times
  324. )
  325. log.error(msg)
  326. tools.send_msg(
  327. msg=msg,
  328. level="error",
  329. message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
  330. )
  331. self._is_adding_to_db = False
  332. def check_datas(self, table, datas):
  333. """
  334. 打点 记录总条数及每个key情况
  335. @param table: 表名
  336. @param datas: 数据 列表
  337. @return:
  338. """
  339. metrics.emit_counter("total count", len(datas), classify=table)
  340. for data in datas:
  341. for k, v in data.items():
  342. metrics.emit_counter(k, int(bool(v)), classify=table)
  343. def close(self):
  344. # 调用pipeline的close方法
  345. for pipeline in self._pipelines:
  346. try:
  347. pipeline.close()
  348. except:
  349. pass