import threading from common.log import logger from crawler.services.basics import BasicService from crawler.utils import is_url, extract_domain from settings import ( MGO_URLS, MGO_ORGS, MGO_COMPETING_GOODS, MGO_GARBAGE, MGO_LUA_SPIDERS ) class SyncData(BasicService): """数据同步服务""" def __init__(self, init_validator=False, init_collector=False, **kwargs): self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200) self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600) self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600) self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600) self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600) super(SyncData, self).__init__(**kwargs) self._init_validator = init_validator self._init_collector = init_collector self._allow_load_data = False def sync_keywords(self): """同步搜索词数据""" words = self.keywords_table() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, origin=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight )) self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight) logger.info(f'[搜索词表]任务队列加载{len(words)}条搜索词') def sync_orgs(self): """同步组织单位数据""" items = self.orgs_table() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.error(f'[单位组织表]组织名称错误: {item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, origin=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight )) self.scheduler.add_query(self.org_groups, lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[单位网址查询]任务队列加载{len(items)}家单位组织') def sync_seed_urls(self): """同步网址数据""" items = self.seed_urls_table() lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) for item in items: MGO_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[种子寻源]任务队列加载{len(items)}条种子网址') def sync_competing_goods(self): """同步竞品urls""" items = self.competing_goods_table() # 处理竞品urls并推送到任务队列 lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.data(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.competing_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.add_excavate(lst, level=self.url_weight) # 更新已推送竞品urls状态 for item in items: MGO_COMPETING_GOODS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[竞品寻源]任务队列加载{len(items)}条网址') def data_collector(self): """收录器 - 存放新发现和已拥有的网址域名""" if self._init_collector: domains = [] q = {"param_common.11": {'$exists': True}} projection = {'param_common': 1} cursor = MGO_LUA_SPIDERS.find(q, projection=projection) for item in cursor.sort(self.sort): url = item['param_common'][11] if not is_url(url): continue domain = extract_domain(url) logger.debug(f'[收录器]拉取收录域名特征:{domain}') if domain not in domains: domains.append(domain) count = 0 for domain in domains: logger.debug(f'[收录器]更新收录域名特征:{domain}') if not self.collector.data(domain): logger.debug(f'[收录器]添加收录域名特征:{domain}') self.collector.add_data(domain) count += 1 logger.info(f'[收录器]加载收录网址特征{count}条') def data_validator(self): """过滤器 - 存放寻源过程中垃圾网址和没有招投标相关信息的网站""" if self._init_validator: count = 0 q = { "source": {"$exists": False}, # 来源 "domain": {"$type": "string"} } cursor = MGO_GARBAGE.find(q, projection={'domain': 1}) for item in cursor.sort(self.sort): domain = item['domain'] logger.debug(f'[过滤器]拉取过滤网址特征:{domain}') if not self.validator.data(domain): logger.debug(f'[过滤器]更新过滤网址特征:{domain}') self.validator.add_data(domain) MGO_GARBAGE.update_one( {'_id': item['_id']}, {'$set': {"source": "other"}} ) count += 1 logger.info(f'[过滤器]加载去重特征{count}条') def start(self): """程序入口""" def _validate(): """数据过滤""" logger.info('[自动寻源]加载任务过滤模块') while True: try: self.data_collector() self.data_validator() if not self._allow_load_data: self._allow_load_data = True except Exception as e: logger.exception(e) self.loops_interval(self._sync_validate_data) def _keywords(): """搜索词""" logger.info('[自动寻源]加载搜索词模块') while True: if self._allow_load_data: try: self.sync_keywords() self.loops_interval(self._keywords_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _competing_goods(): """竞品列表""" logger.info('[自动寻源]加载竞品寻源模块') while True: if self._allow_load_data: try: self.sync_competing_goods() self.loops_interval(self._competing_goods_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _seed_urls(): """种子url""" logger.info('[自动寻源]加载种子寻源模块') while True: if self._allow_load_data: try: self.sync_seed_urls() self.loops_interval(self._seed_urls_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) def _orgs(): """单位组织""" logger.info('[自动寻源]加载单位网址查询模块') while True: if self._allow_load_data: try: self.sync_orgs() self.loops_interval(self._orgs_interval) except Exception as e: logger.exception(e) else: self.loops_interval(2) threading.Thread(target=_validate, name='SyncValidateData').start() threading.Thread(target=_keywords, name='SyncKeywords').start() threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start() threading.Thread(target=_seed_urls, name='SyncSeedUrls').start() threading.Thread(target=_orgs, name='SyncOrgs').start()