import threading from common.log import logger from crawler.services.basics import BasicService from crawler.utils import is_url, extract_domain from settings import ( MGO_URLS, MGO_ORGS, MGO_COMPETING_GOODS, MGO_DATA_GARBAGE, MGO_LUA_SPIDERS ) class SyncData(BasicService): """数据同步服务""" def __init__( self, init_validator=False, init_collector=False, **kwargs ): self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200) self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600) self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600) self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600) self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600) super(SyncData, self).__init__(**kwargs) self._init_validator = init_validator self._init_collector = init_collector self._allow_load_data = False def sync_keywords(self): """同步搜索词数据""" logger.info(f'[数据同步]开始加载 - 搜索词表') words = self.keywords_table() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, origin=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight )) self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight) logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词') def sync_orgs(self): """同步组织单位数据""" logger.info(f'[数据同步]开始加载 - 单位组织表') items = self.orgs_table() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, origin=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight )) self.scheduler.add_query(self.org_groups, lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织') def sync_seed_urls(self): """同步网址数据""" logger.info(f'[数据同步]开始加载 - 种子网址表') items = self.seed_urls_table() lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue # exists_url = self.validator.data(item['name']) # if exists_url: # items.remove(item) # continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) for item in items: MGO_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址') def sync_competing_goods(self): """同步竞品urls""" logger.info(f'[数据同步]开始加载 - 竞品网址表') items = self.competing_goods_table() # 处理竞品urls并推送到任务队列 lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.data(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.competing_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) # 更新已推送竞品urls状态 for item in items: MGO_COMPETING_GOODS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址') def sync_collector(self): """同步lua已收录网址,推送url收录器""" if self._init_collector: logger.info(f'[数据同步]开始加载 - 收录器') count = 0 q = {"param_common.11": {'$exists': True}} projection = {'param_common': 1} cursor = MGO_LUA_SPIDERS.find(q, projection=projection) for item in cursor.sort(self.sort): url = item['param_common'][11] if not is_url(url): continue domain = extract_domain(url) if not self.collector.data(domain): self.collector.add_data(domain) count += 1 logger.info(f'[数据同步]收录器读取{count}个网址域名') def sync_validator(self): """将垃圾表内容加载到过滤器""" if self._init_validator: logger.info(f'[数据同步]开始加载 - 过滤器') count = 0 cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1}) for item in cursor.sort(self.sort): try: domain = item['domain'] if not isinstance(domain, str): MGO_DATA_GARBAGE.delete_one({'_id': item['_id']}) continue except IndexError: continue if not self.validator.data(domain): self.validator.add_data(domain) count += 1 logger.info(f'[数据同步]过滤器读取{count}条去重特征') def start(self): """数据同步""" def _validate(): """验证模块""" while True: try: self.sync_collector() self.sync_validator() if not self._allow_load_data: self._allow_load_data = True except Exception as e: logger.exception(e) self.loops_interval(self._sync_validate_data) def _keywords(): """搜索词""" while True: if self._allow_load_data: try: self.sync_keywords() self.loops_interval(self._keywords_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _competing_goods(): """竞品列表""" while True: if self._allow_load_data: try: self.sync_competing_goods() self.loops_interval(self._competing_goods_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _seed_urls(): """种子url""" while True: if self._allow_load_data: try: self.sync_seed_urls() self.loops_interval(self._seed_urls_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _orgs(): """单位组织""" while True: if self._allow_load_data: try: self.sync_orgs() self.loops_interval(self._orgs_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) logger.info(f'[数据同步]初始化加载') threading.Thread(target=_validate, name='SyncValidateData').start() threading.Thread(target=_keywords, name='SyncKeywords').start() threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start() threading.Thread(target=_seed_urls, name='SyncSeedUrls').start() threading.Thread(target=_orgs, name='SyncOrgs').start()