import threading from common.log import logger from crawler.services.basics import BasicSearch from crawler.utils import is_url, extract_domain from settings import ( MGO_URLS, MGO_ORGS, MGO_COMPETING_GOODS, MGO_REMOVAL_DUPLICATE, MGO_LUA_SPIDERS ) class SyncData(BasicSearch): def __init__( self, init_validator=False, init_collector=False, loop_interval=600, **kwargs ): super(SyncData, self).__init__(**kwargs) self._init_validator = init_validator self._init_collector = init_collector self._interval = loop_interval self._init() def _init(self): threading.Thread(target=self.sync_data, name='SyncData').start() def sync_data_keywords(self): """同步关键词数据""" logger.info(f'[同步数据]加载关键词') words = self.keywords_table() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, origin=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight )) self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight) logger.info(f'[同步数据]更新{len(words)}条关键词') def sync_data_orgs(self): """同步组织单位数据""" logger.info(f'[同步数据]加载单位组织数据') items = self.orgs_table() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.warning(f'[异常的单位组织]{item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, origin=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight )) self.scheduler.add_query(self.org_groups, lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}个单位组织') def sync_data_urls(self): """同步网址数据""" logger.info(f'[同步数据]加载种子url列表') items = self.seed_urls_table() lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.data(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) for item in items: MGO_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}条网址数据') def sync_data_competing_goods(self): """同步竞品urls""" logger.info(f'[同步数据]加载竞品url列表') items = self.competing_goods_table() # 处理竞品urls并推送到任务队列 lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.data(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.competing_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) # 更新已推送竞品urls状态 for item in items: MGO_COMPETING_GOODS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url') def sync_collector(self): """同步lua已收录网址,推送url收录器""" if self._init_collector: logger.info(f'[同步数据]初始化加载收录器') count = 0 projection = {'param_common': 1} cursor = MGO_LUA_SPIDERS.find(projection=projection) for item in cursor.sort(self.sort): try: url = item['param_common'][11] if not is_url(url): continue domain = extract_domain(url) except IndexError: continue if not self.collector.data(domain): self.collector.add_data(domain) count += 1 logger.info(f'[同步数据]新收录{count}个网站域名') def sync_validator(self): """将垃圾表内容加载到过滤器""" if self._init_validator: logger.info(f'[同步数据]初始化加载过滤器') count = 0 cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1}) for item in cursor.sort(self.sort): try: domain = item['domain'] if not isinstance(domain, str): MGO_REMOVAL_DUPLICATE.delete_one({'_id': item['_id']}) continue except IndexError: continue if not self.validator.data(domain): self.validator.add_data(domain) count += 1 logger.info(f'[同步数据]新增{count}条去重网址特征') def sync_data(self): """同步数据""" logger.info(f'[同步数据]初始化加载') while True: try: self.sync_collector() self.sync_validator() self.sync_data_competing_goods() self.sync_data_keywords() self.sync_data_urls() self.sync_data_orgs() except Exception as e: logger.exception(e) self.loops_interval(self._interval)