item_buffer.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2018-06-19 17:17
  4. ---------
  5. @summary: item 管理器 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import threading
  11. from queue import Queue
  12. import feapder.setting as setting
  13. import feapder.utils.tools as tools
  14. from feapder.db.rabbitMq import RabbitMQ
  15. from feapder.dedup import Dedup
  16. from feapder.network.item import (
  17. Item,
  18. UpdateItem,
  19. BaseListItem,
  20. BaseDetailItem,
  21. FailedTaskItem,
  22. )
  23. from feapder.pipelines import BasePipeline
  24. from feapder.utils import metrics
  25. from feapder.utils.log import log
  26. MAX_ITEM_COUNT = 5000 # 缓存中最大item数
  27. UPLOAD_BATCH_MAX_SIZE = 1000
  28. class ItemBuffer(threading.Thread):
  29. dedup = None
  30. def __init__(self, redis_key, rabbitmq=None, user=None):
  31. if not hasattr(self, "_items_queue"):
  32. super(ItemBuffer, self).__init__()
  33. self._thread_stop = False
  34. self._is_adding_to_db = False
  35. self._redis_key = redis_key
  36. self._user = user
  37. self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
  38. self._rabbitmq = rabbitmq or RabbitMQ()
  39. # 任务队列
  40. self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
  41. self._rabbitmq.declare_bind(queue=self._tab_requests)
  42. # 数据保存失败队列
  43. self._tab_failed_items = setting.TAB_FAILED_ITEMS
  44. self._rabbitmq.declare_bind(queue=self._tab_failed_items)
  45. # 采集任务队列(rabbitMq)
  46. self._tab_items = setting.TAB_ITEMS.format(
  47. redis_key=redis_key.replace('_detailc', '')
  48. )
  49. self._rabbitmq.declare_bind(queue=self._tab_items)
  50. self._item_tables = {
  51. # 'item_name': 'table_name' # 缓存item名与表名对应关系
  52. }
  53. self._item_update_keys = {
  54. # 'table_name': ['id', 'name'...] # 缓存table_name与__update_key__的关系
  55. }
  56. self._pipelines = self.load_pipelines()
  57. if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
  58. self.__class__.dedup = Dedup(
  59. to_md5=False, **setting.ITEM_FILTER_SETTING
  60. )
  61. # 导出重试的次数
  62. self.export_retry_times = 0
  63. # 导出失败的次数
  64. self.export_falied_times = 0
  65. def load_pipelines(self):
  66. pipelines = []
  67. for pipeline_path in setting.ITEM_PIPELINES:
  68. pipeline = tools.import_cls(pipeline_path)()
  69. if not isinstance(pipeline, BasePipeline):
  70. raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
  71. pipelines.append(pipeline)
  72. return pipelines
  73. def run(self):
  74. self._thread_stop = False
  75. while not self._thread_stop:
  76. self.flush()
  77. tools.delay_time(1)
  78. self.close()
  79. def stop(self):
  80. self._thread_stop = True
  81. self._started.clear()
  82. def put_item(self, item):
  83. if isinstance(item, Item):
  84. # 入库前的回调
  85. item.pre_to_db()
  86. self._items_queue.put(item)
  87. def flush(self):
  88. try:
  89. items = []
  90. update_items = []
  91. failed_task_items = []
  92. requests = []
  93. callbacks = []
  94. items_fingerprints = []
  95. data_count = 0
  96. while not self._items_queue.empty():
  97. data = self._items_queue.get_nowait()
  98. data_count += 1
  99. update_at = tools.ensure_int64(tools.get_current_timestamp())
  100. # data 分类
  101. if callable(data):
  102. callbacks.append(data)
  103. elif isinstance(data, UpdateItem):
  104. update_items.append(data)
  105. elif isinstance(data, FailedTaskItem):
  106. data.queue_name = self._tab_items # 采集任务队列名称
  107. failed_times = data.to_dict.pop('failed_times', 0)
  108. failed_retries = data.to_dict.pop('failed_retries', 0)
  109. if failed_retries >= setting.SPIDER_MAX_RETRY_TIMES:
  110. state = 4 # 待采集任务停止采集状态[4=停止采集]
  111. # 更新完成采集的任务状态
  112. update_item = UpdateItem(
  113. state=state,
  114. pyuuid=data.pyuuid,
  115. update_at=update_at,
  116. )
  117. update_item.update_key = ['state', 'update_at']
  118. update_item.table_name = setting.TASK_REQUEST_PRODUCE
  119. update_items.append(update_item)
  120. # 保存失败的采集任务详情
  121. data.state = state
  122. data.failed_times = failed_times
  123. data.failed_retries = failed_retries
  124. data.create_at = update_at
  125. failed_task_items.append(data)
  126. else:
  127. update_item = UpdateItem(
  128. state=3, # 待采集任务失败采集状态[3=采集失败]
  129. pyuuid=data.pyuuid,
  130. update_at=update_at,
  131. failed_times=failed_times + 1,
  132. failed_retries=failed_retries + 1
  133. )
  134. update_item.update_key = [
  135. 'state',
  136. 'update_at',
  137. 'failed_times',
  138. 'failed_retries',
  139. ]
  140. update_item.table_name = setting.TASK_REQUEST_PRODUCE
  141. update_items.append(update_item)
  142. elif isinstance(data, Item):
  143. if isinstance(data, BaseListItem):
  144. data.queue_name = self._tab_items
  145. data.update_at = update_at
  146. if hasattr(data, 'is_delay') and data.is_delay:
  147. data.state = 5 # 待采集任务延时采集状态[5=延时采集]
  148. else:
  149. data.state = 1 # 待采集任务等待采集状态[1=等待采集]
  150. elif isinstance(data, BaseDetailItem):
  151. update_item = UpdateItem(
  152. state=2, # 待采集任务成功采集状态[2=完成采集]
  153. pyuuid=data.pyuuid,
  154. update_at=update_at,
  155. )
  156. update_item.update_key = ['state', 'update_at']
  157. update_item.table_name = setting.TASK_REQUEST_PRODUCE
  158. update_items.append(update_item)
  159. if data.dont_save:
  160. # 不保存数据
  161. continue
  162. items.append(data)
  163. if setting.ITEM_FILTER_ENABLE:
  164. items_fingerprints.append(data.fingerprint)
  165. else: # request-redis
  166. requests.append(data)
  167. if data_count >= UPLOAD_BATCH_MAX_SIZE:
  168. self.__add_item_to_db(
  169. items, update_items, failed_task_items, requests, callbacks, items_fingerprints
  170. )
  171. items = []
  172. update_items = []
  173. failed_task_items = []
  174. requests = []
  175. callbacks = []
  176. items_fingerprints = []
  177. data_count = 0
  178. if data_count:
  179. self.__add_item_to_db(
  180. items, update_items, failed_task_items, requests, callbacks, items_fingerprints
  181. )
  182. except Exception as e:
  183. log.exception(e)
  184. def get_items_count(self):
  185. return self._items_queue.qsize()
  186. def is_adding_to_db(self):
  187. return self._is_adding_to_db
  188. def __dedup_items(self, items, items_fingerprints):
  189. """
  190. 去重
  191. @param items:
  192. @param items_fingerprints:
  193. @return: 返回去重后的items, items_fingerprints
  194. """
  195. if not items:
  196. return items, items_fingerprints
  197. is_exists = self.__class__.dedup.get(items_fingerprints)
  198. is_exists = is_exists if isinstance(is_exists, list) else [is_exists]
  199. dedup_items = []
  200. dedup_items_fingerprints = []
  201. items_count = dedup_items_count = dup_items_count = 0
  202. while is_exists:
  203. item = items.pop(0)
  204. items_fingerprint = items_fingerprints.pop(0)
  205. is_exist = is_exists.pop(0)
  206. items_count += 1
  207. if not is_exist:
  208. dedup_items.append(item)
  209. dedup_items_fingerprints.append(items_fingerprint)
  210. dedup_items_count += 1
  211. else:
  212. dup_items_count += 1
  213. log.info(
  214. "待入库数据 {} 条, 重复 {} 条,实际待入库数据 {} 条".format(
  215. items_count, dup_items_count, dedup_items_count
  216. )
  217. )
  218. return dedup_items, dedup_items_fingerprints
  219. def __pick_items(self, items, is_update_item=False):
  220. """
  221. 将每个表之间的数据分开 拆分后 原items为空
  222. @param items:
  223. @param is_update_item:
  224. @return:
  225. """
  226. datas_dict = {
  227. # 'table_name': [{}, {}]
  228. }
  229. while items:
  230. item = items.pop(0)
  231. # 取item下划线格式的名
  232. # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
  233. item_name = item.item_name
  234. table_name = self._item_tables.get(item_name)
  235. if not table_name:
  236. table_name = item.table_name
  237. self._item_tables[item_name] = table_name
  238. if table_name not in datas_dict:
  239. datas_dict[table_name] = []
  240. datas_dict[table_name].append(item.to_dict)
  241. if is_update_item and table_name not in self._item_update_keys:
  242. self._item_update_keys[table_name] = item.update_key
  243. return datas_dict
  244. def __export_to_db(self, table, datas, is_update=False, update_keys=()):
  245. for pipeline in self._pipelines:
  246. if is_update:
  247. if not pipeline.update_items(table, datas, update_keys=update_keys):
  248. log.error(
  249. f"{pipeline.__class__.__name__} 更新数据失败. table: {table} items: {datas}"
  250. )
  251. return False
  252. else:
  253. if not pipeline.save_items(table, datas):
  254. log.error(
  255. f"{pipeline.__class__.__name__} 保存数据失败. table: {table} items: {datas}"
  256. )
  257. return False
  258. self.metric_datas(table=table, datas=datas)
  259. return True
  260. def __add_item_to_db(
  261. self, items, update_items, failed_task_items, requests, callbacks, items_fingerprints
  262. ):
  263. export_success = True
  264. self._is_adding_to_db = True
  265. if setting.ITEM_FILTER_ENABLE:
  266. items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
  267. # 分捡
  268. items_dict = self.__pick_items(items)
  269. update_items_dict = self.__pick_items(update_items, is_update_item=True)
  270. failed_task_items_dict = self.__pick_items(failed_task_items)
  271. # item批量入库
  272. failed_items = {"add": [], "update": [], "requests": []}
  273. while items_dict:
  274. table, datas = items_dict.popitem()
  275. log.debug(
  276. """
  277. -------------- item 批量入库 --------------
  278. 表名: %s
  279. datas: %s
  280. """
  281. % (table, tools.dumps_json(datas, indent=16))
  282. )
  283. if not self.__export_to_db(table, datas):
  284. export_success = False
  285. failed_items["add"].append({"table": table, "datas": datas})
  286. # 执行批量update
  287. while update_items_dict:
  288. table, datas = update_items_dict.popitem()
  289. log.debug(
  290. """
  291. -------------- item 批量更新 --------------
  292. 表名: %s
  293. datas: %s
  294. """
  295. % (table, tools.dumps_json(datas, indent=16))
  296. )
  297. update_keys = self._item_update_keys.get(table)
  298. if not self.__export_to_db(
  299. table, datas, is_update=True, update_keys=update_keys
  300. ):
  301. export_success = False
  302. failed_items["update"].append({"table": table, "datas": datas})
  303. # 采集失败 item批量入库
  304. while failed_task_items_dict:
  305. table, datas = failed_task_items_dict.popitem()
  306. log.debug(
  307. """
  308. -------------- crawl failed item 批量入库 --------------
  309. 表名: %s
  310. datas: %s
  311. """
  312. % (table, tools.dumps_json(datas, indent=16))
  313. )
  314. if not self.__export_to_db(table, datas):
  315. export_success = False
  316. failed_items["add"].append({"table": table, "datas": datas})
  317. if export_success:
  318. # 执行回调
  319. while callbacks:
  320. try:
  321. callback = callbacks.pop(0)
  322. callback()
  323. except Exception as e:
  324. log.exception(e)
  325. # 删除做过的request
  326. if requests:
  327. # self._rabbitmq.add(self._tab_requests, requests)
  328. pass
  329. # 去重入库
  330. if setting.ITEM_FILTER_ENABLE:
  331. if items_fingerprints:
  332. self.__class__.dedup.add(items_fingerprints, skip_check=True)
  333. else:
  334. failed_items["requests"] = requests
  335. # 设置mq访问者的唯一标识特性 correlation_id
  336. properties = dict(correlation_id=self._user or self._redis_key)
  337. if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
  338. if self._redis_key != "air_spider":
  339. # 记录失败的item
  340. self._rabbitmq.add_batch(self._tab_failed_items, failed_items, properties=properties)
  341. # 删除做过的request
  342. if requests:
  343. # self.redis_db.zrem(self._table_request, requests)
  344. print(f'做过的requests数量: {len(requests)}')
  345. log.error(
  346. "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
  347. tools.dumps_json(failed_items)
  348. )
  349. )
  350. self.export_retry_times = 0
  351. else:
  352. tip = ["入库不成功"]
  353. if callbacks:
  354. tip.append("不执行回调")
  355. if requests:
  356. tip.append("不删除任务")
  357. self._rabbitmq.add_batch(self._tab_requests, requests, properties=properties)
  358. if setting.ITEM_FILTER_ENABLE:
  359. tip.append("数据不入去重库")
  360. if self._redis_key != "air_spider":
  361. tip.append("将自动重试")
  362. tip.append("失败items:\n {}".format(tools.dumps_json(failed_items)))
  363. log.error(",".join(tip))
  364. self.export_falied_times += 1
  365. if self._redis_key != "air_spider":
  366. self.export_retry_times += 1
  367. if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  368. # 报警
  369. msg = "《{}》爬虫导出数据失败,失败次数:{},请检查爬虫是否正常".format(
  370. self._redis_key, self.export_falied_times
  371. )
  372. log.error(msg)
  373. tools.send_msg(
  374. msg=msg,
  375. level="error",
  376. message_prefix="《%s》爬虫导出数据失败" % (self._redis_key),
  377. )
  378. self._is_adding_to_db = False
  379. def metric_datas(self, table, datas):
  380. """
  381. 打点 记录总条数及每个key情况
  382. @param table: 表名
  383. @param datas: 数据 列表
  384. @return:
  385. """
  386. total_count = 0
  387. for data in datas:
  388. total_count += 1
  389. for k, v in data.items():
  390. metrics.emit_counter(k, int(bool(v)), classify=table)
  391. metrics.emit_counter("total count", total_count, classify=table)
  392. def close(self):
  393. # 调用pipeline的close方法
  394. for pipeline in self._pipelines:
  395. try:
  396. pipeline.close()
  397. except:
  398. pass