task_buffer.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-10-16
  4. ---------
  5. @summary: 采集状态管理器,负责缓冲添加到rabbitMq任务采集状态
  6. ---------
  7. @author: dzr
  8. """
  9. import collections
  10. import threading
  11. import feapder.setting as setting
  12. import feapder.utils.tools as tools
  13. from feapder.db.rabbitMq import RabbitMQ
  14. from feapder.dedup import Dedup
  15. from feapder.network.item import Item, BaseListItem
  16. from feapder.utils.log import log
  17. MAX_TASK_COUNT = 1000 # 缓存中最大待处理任务数
  18. MAX_FINISHED_COUNT = 1000 # 缓存中最大任务状态确认数
  19. class TaskBuffer(threading.Thread):
  20. dedup = None
  21. def __init__(self, redis_key, rabbitmq=None):
  22. if not hasattr(self, "_task_deque"):
  23. super(TaskBuffer, self).__init__()
  24. self._thread_stop = False
  25. self._is_adding_to_db = False
  26. self._task_deque = collections.deque()
  27. self._rabbitmq = rabbitmq or RabbitMQ()
  28. # 待处理详情任务队列
  29. self._tab_items = setting.TAB_ITEMS.format(
  30. redis_key=redis_key.replace('_detailc', '')
  31. )
  32. self._rabbitmq.declare(queue=self._tab_items)
  33. # 采集任务状态队列
  34. self._task_crawl_state = setting.TASK_CRAWL_STATE
  35. self._rabbitmq.declare(queue=self._task_crawl_state)
  36. if not self.__class__.dedup and setting.TASK_FILTER_ENABLE:
  37. self.__class__.dedup = Dedup(
  38. name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING
  39. ) # 默认过期时间为一个月
  40. def run(self):
  41. self._thread_stop = False
  42. while not self._thread_stop:
  43. try:
  44. self.__add_task_to_db()
  45. except Exception as e:
  46. log.exception(e)
  47. tools.delay_time(1)
  48. def stop(self):
  49. self._thread_stop = True
  50. self._started.clear()
  51. def put_task(self, task):
  52. if isinstance(task, Item):
  53. self._task_deque.append(task)
  54. if self.get_tasks_count() > MAX_TASK_COUNT: # 超过最大缓存,主动调用
  55. self.flush()
  56. def flush(self):
  57. try:
  58. self.__add_task_to_db()
  59. except Exception as e:
  60. log.exception(e)
  61. def get_tasks_count(self):
  62. return len(self._task_deque)
  63. def is_adding_to_db(self):
  64. return self._is_adding_to_db
  65. def __add_task_to_db(self):
  66. task_list = []
  67. finished_list = []
  68. while self._task_deque:
  69. task = self._task_deque.popleft()
  70. self._is_adding_to_db = True
  71. task_uuid = task.pyuuid # 任务唯一标识
  72. # 如果需要去重并且库中已重复,则continue
  73. if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid):
  74. log.debug("task已存在 pyuuid = %s" % task_uuid)
  75. continue
  76. else:
  77. if isinstance(task, BaseListItem):
  78. task_list.append(task) # 待采集任务
  79. else:
  80. task_dict = task.to_dict
  81. task_dict['state'] = 2 # 待采集任务成功采集状态[2=完成采集]
  82. finished_list.append(task_dict)
  83. # 入库(超过上限[MAX_URL_COUNT]执行)
  84. if len(task_list) > MAX_TASK_COUNT:
  85. self._rabbitmq.add(self._tab_items, task_list)
  86. task_list = []
  87. # 推送消息队列(超过上限[MAX_FINISHED_COUNT]执行)
  88. if len(finished_list) > MAX_FINISHED_COUNT:
  89. self._rabbitmq.add(self._task_crawl_state, finished_list)
  90. finished_list = []
  91. # 入库(小于上限[MAX_URL_COUNT]执行)
  92. if task_list:
  93. self._rabbitmq.add(self._tab_items, task_list)
  94. # 推送状态(超过上限[MAX_FINISHED_COUNT]执行)
  95. if finished_list:
  96. self._rabbitmq.add(self._task_crawl_state, finished_list)
  97. self._is_adding_to_db = False