123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-24
- ---------
- @summary: 竞品任务管理
- ---------
- @author: Dzr
- """
- import threading
- from base_server import BaseServer, tools
- from common.log import logger
- class CompetitiveProductServer(BaseServer, threading.Thread):
- def __init__(self, site: str, redis_label: str, db: str, table: str, maxsize=None):
- threading.Thread.__init__(self)
- super(CompetitiveProductServer, self).__init__(site, redis_label, db, table)
- self.label = f'{self.server}_{self.getName()}'
- self.maxsize = maxsize or 10
- self.sleep_interval = 300
- self.output_log = True
- self.push_interval = 12 * 3600 # 采集任务的延迟间隔
- self.now_date = tools.now_date('%Y-%m-%d') # 启动日期
- self._mutex = threading.Lock()
- def put_latest_tasks(self):
- self.put_tasks(1) # 首次查询一次昨天数据
- def put_tasks(self, day):
- is_add = True
- query = {'l_np_publishtime': tools.delta_t(day)}
- with self.mgo_db.find(query) as cursor:
- for item in cursor:
- if self.task_total == self.maxsize:
- is_add = False
- break
- else:
- count = item.get('count') or item.get('es_count') or 0
- if all([
- count == 0,
- item.get('crawl') is False,
- item.get('crawl_status') is None,
- tools.now_ts() - item['comeintime'] >= self.push_interval
- ]):
- fingerprint = self.fingerprint(**item)
- if not self.exists(fingerprint):
- task_str = tools.json_dumps(item)
- self.rpush(task_str)
- self.add(fingerprint)
- return is_add
- def task_pool(self):
- with self._mutex:
- days = 2
- while True:
- self.put_latest_tasks()
- add_task = self.put_tasks(days)
- if not add_task:
- self.output_log = True
- break
- elif days > 3:
- # 3天内每日剩余数据已全部下载
- logger.info(f'[{self.label}]暂无采集任务')
- break
- else:
- days += 1
- if self.output_log:
- logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
- self.output_log = False
- def flush(self):
- now_date = tools.now_date('%Y-%m-%d')
- if self.now_date != now_date:
- tables = [self.request_tasks, self.task_fingerprint]
- for table in tables:
- self.redis_db.delete(table)
- self.now_date = now_date
- logger.info(f"[{self.label}]重置任务")
- def run(self):
- logger.info(f'[{self.label}]开始生产任务')
- while True:
- try:
- self.flush()
- if self.task_total < 3:
- self.task_pool()
- tools.delay(self.sleep_interval)
- except Exception as e:
- logger.exception(e)
- class CompetitiveProductClient(BaseServer):
- def __init__(self, site: str, redis_label: str, db: str, table: str):
- super(CompetitiveProductClient, self).__init__(site, redis_label, db, table)
- def get_crawl_task(self):
- task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
- if task is not None:
- self.remove(self.fingerprint(**task))
- task = tools.document2dict(task)
- return task
- def save_data(self, table, documents):
- pass
|