competitive_product.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-24
  4. ---------
  5. @summary: 竞品任务管理
  6. ---------
  7. @author: Dzr
  8. """
  9. import threading
  10. from base_server import BaseServer, tools
  11. from common.log import logger
  12. class CompetitiveProductServer(BaseServer, threading.Thread):
  13. def __init__(self, site: str, redis_label: str, db: str, table: str, maxsize=None):
  14. threading.Thread.__init__(self)
  15. super(CompetitiveProductServer, self).__init__(site, redis_label, db, table)
  16. self.label = f'{self.server}_{self.getName()}'
  17. self.maxsize = maxsize or 10
  18. self.sleep_interval = 300
  19. self.output_log = True
  20. self.push_interval = 12 * 3600 # 采集任务的延迟间隔
  21. self.now_date = tools.now_date('%Y-%m-%d') # 启动日期
  22. self._mutex = threading.Lock()
  23. def put_latest_tasks(self):
  24. self.put_tasks(1) # 首次查询一次昨天数据
  25. def put_tasks(self, day):
  26. is_add = True
  27. query = {'l_np_publishtime': tools.delta_t(day)}
  28. with self.mgo_db.find(query) as cursor:
  29. for item in cursor:
  30. if self.task_total == self.maxsize:
  31. is_add = False
  32. break
  33. else:
  34. count = item.get('count') or item.get('es_count') or 0
  35. if all([
  36. count == 0,
  37. item.get('crawl') is False,
  38. item.get('crawl_status') is None,
  39. tools.now_ts() - item['comeintime'] >= self.push_interval
  40. ]):
  41. fingerprint = self.fingerprint(**item)
  42. if not self.exists(fingerprint):
  43. task_str = tools.json_dumps(item)
  44. self.rpush(task_str)
  45. self.add(fingerprint)
  46. return is_add
  47. def task_pool(self):
  48. with self._mutex:
  49. days = 2
  50. while True:
  51. self.put_latest_tasks()
  52. add_task = self.put_tasks(days)
  53. if not add_task:
  54. self.output_log = True
  55. break
  56. elif days > 3:
  57. # 3天内每日剩余数据已全部下载
  58. logger.info(f'[{self.label}]暂无采集任务')
  59. break
  60. else:
  61. days += 1
  62. if self.output_log:
  63. logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
  64. self.output_log = False
  65. def flush(self):
  66. now_date = tools.now_date('%Y-%m-%d')
  67. if self.now_date != now_date:
  68. tables = [self.request_tasks, self.task_fingerprint]
  69. for table in tables:
  70. self.redis_db.delete(table)
  71. self.now_date = now_date
  72. logger.info(f"[{self.label}]重置任务")
  73. def run(self):
  74. logger.info(f'[{self.label}]开始生产任务')
  75. while True:
  76. try:
  77. self.flush()
  78. if self.task_total < 3:
  79. self.task_pool()
  80. tools.delay(self.sleep_interval)
  81. except Exception as e:
  82. logger.exception(e)
  83. class CompetitiveProductClient(BaseServer):
  84. def __init__(self, site: str, redis_label: str, db: str, table: str):
  85. super(CompetitiveProductClient, self).__init__(site, redis_label, db, table)
  86. def get_crawl_task(self):
  87. task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
  88. if task is not None:
  89. self.remove(self.fingerprint(**task))
  90. task = tools.document2dict(task)
  91. return task
  92. def save_data(self, table, documents):
  93. pass