# -*- coding: utf-8 -*- """ Created on 2023-04-24 --------- @summary: 竞品任务管理 --------- @author: Dzr """ import threading from base_server import BaseServer, tools from common.log import logger class CompetitiveProductServer(BaseServer, threading.Thread): def __init__(self, site: str, redis_label: str, db: str, table: str, maxsize=None): threading.Thread.__init__(self) super(CompetitiveProductServer, self).__init__(site, redis_label, db, table) self.label = f'{self.server}_{self.getName()}' self.maxsize = maxsize or 10 self.sleep_interval = 300 self.output_log = True self.push_interval = 12 * 3600 # 采集任务的延迟间隔 self.now_date = tools.now_date('%Y-%m-%d') # 启动日期 self._mutex = threading.Lock() def put_latest_tasks(self): self.put_tasks(1) # 首次查询一次昨天数据 def put_tasks(self, day): is_add = True query = {'l_np_publishtime': tools.delta_t(day)} with self.mgo_db.find(query) as cursor: for item in cursor: if self.task_total == self.maxsize: is_add = False break else: count = item.get('count') or item.get('es_count') or 0 if all([ count == 0, item.get('crawl') is False, item.get('crawl_status') is None, tools.now_ts() - item['comeintime'] >= self.push_interval ]): fingerprint = self.fingerprint(**item) if not self.exists(fingerprint): task_str = tools.json_dumps(item) self.rpush(task_str) self.add(fingerprint) return is_add def task_pool(self): with self._mutex: days = 2 while True: self.put_latest_tasks() add_task = self.put_tasks(days) if not add_task: self.output_log = True break elif days > 3: # 3天内每日剩余数据已全部下载 logger.info(f'[{self.label}]暂无采集任务') break else: days += 1 if self.output_log: logger.info(f'[{self.label}]待采集任务 {self.task_total} 条') self.output_log = False def flush(self): now_date = tools.now_date('%Y-%m-%d') if self.now_date != now_date: tables = [self.request_tasks, self.task_fingerprint] for table in tables: self.redis_db.delete(table) self.now_date = now_date logger.info(f"[{self.label}]重置任务") def run(self): logger.info(f'[{self.label}]开始生产任务') while True: try: self.flush() if self.task_total < 3: self.task_pool() tools.delay(self.sleep_interval) except Exception as e: logger.exception(e) class CompetitiveProductClient(BaseServer): def __init__(self, site: str, redis_label: str, db: str, table: str): super(CompetitiveProductClient, self).__init__(site, redis_label, db, table) def get_crawl_task(self): task = tools.json_loads(self.lpop(), _id=tools.ObjectId) if task is not None: self.remove(self.fingerprint(**task)) task = tools.document2dict(task) return task def save_data(self, table, documents): pass