123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import threading
- from common.log import logger
- from crawler.services.basics import BasicService
- from crawler.utils import is_url, extract_domain
- from settings import (
- MGO_URLS,
- MGO_ORGS,
- MGO_COMPETING_GOODS,
- MGO_DATA_GARBAGE,
- MGO_LUA_SPIDERS
- )
- class SyncData(BasicService):
- """数据同步服务"""
- def __init__(
- self,
- init_validator=False,
- init_collector=False,
- **kwargs
- ):
- self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
- self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
- self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
- self._seed_urls_interval = (kwargs.pop('seed_urls_interval', None) or 3600)
- self._orgs_interval = (kwargs.pop('orgs_interval', None) or 3600)
- super(SyncData, self).__init__(**kwargs)
- self._init_validator = init_validator
- self._init_collector = init_collector
- self._allow_load_data = False
- def sync_keywords(self):
- """同步搜索词数据"""
- logger.info(f'[数据同步]开始加载 - 搜索词表')
- words = self.keywords_table()
- # 处理关键词格式并推送到任务队列
- words = [str(word).replace(' ', '').strip() for word in words]
- lst = []
- for word in words:
- lst.append(self.make_task(
- search=word,
- origin=word,
- groups=self.keyword_groups,
- classify=self.query_classify,
- weight=self.keyword_weight
- ))
- self.scheduler.add_query(self.keyword_groups, lst, level=self.keyword_weight)
- logger.info(f'[数据同步]任务队列读取{len(words)}条搜索词')
- def sync_orgs(self):
- """同步组织单位数据"""
- logger.info(f'[数据同步]开始加载 - 单位组织表')
- items = self.orgs_table()
- # 处理单位组织名称并推送到任务队列
- orgs = []
- for item in items:
- name = item.get('name')
- if name in ['', None]:
- logger.error(f'[数据同步 - 组织列表]组织名称错误: {item}')
- continue
- word = str(name).replace(' ', '').strip()
- orgs.append(word)
- lst = []
- for word in orgs:
- lst.append(self.make_task(
- search=word,
- origin=word,
- groups=self.org_groups,
- classify=self.query_classify,
- weight=self.org_weight
- ))
- self.scheduler.add_query(self.org_groups, lst, level=self.org_weight)
- # 已添加的组织单位名称进行标记,之后不在推送到任务队列
- for item in items:
- MGO_ORGS.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[数据同步]任务队列读取{len(items)}家单位组织')
- def sync_seed_urls(self):
- """同步网址数据"""
- logger.info(f'[数据同步]开始加载 - 种子网址表')
- items = self.seed_urls_table()
- lst = []
- for item in items:
- if not is_url(item['name']):
- items.remove(item)
- continue
- # exists_url = self.validator.data(item['name'])
- # if exists_url:
- # items.remove(item)
- # continue
- lst.append(self.make_task(
- url=item['name'],
- origin=item['name'],
- groups=self.url_groups,
- classify=self.visit_classify,
- weight=self.url_weight
- ))
- self.scheduler.add_excavate(lst, level=self.url_weight)
- for item in items:
- MGO_URLS.update_many(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[数据同步]任务队列读取{len(items)}条种子网址')
- def sync_competing_goods(self):
- """同步竞品urls"""
- logger.info(f'[数据同步]开始加载 - 竞品网址表')
- items = self.competing_goods_table()
- # 处理竞品urls并推送到任务队列
- lst = []
- for item in items:
- if not is_url(item['name']):
- items.remove(item)
- continue
- exists_url = self.validator.data(item['name'])
- if exists_url:
- items.remove(item)
- continue
- lst.append(self.make_task(
- url=item['name'],
- origin=item['name'],
- groups=self.competing_groups,
- classify=self.visit_classify,
- weight=self.url_weight
- ))
- self.scheduler.add_excavate(lst, level=self.url_weight)
- # 更新已推送竞品urls状态
- for item in items:
- MGO_COMPETING_GOODS.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
- def sync_collector(self):
- """同步lua已收录网址,推送url收录器"""
- if self._init_collector:
- logger.info(f'[数据同步]开始加载 - 收录器')
- count = 0
- q = {"param_common.11": {'$exists': True}}
- projection = {'param_common': 1}
- cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
- for item in cursor.sort(self.sort):
- url = item['param_common'][11]
- if not is_url(url):
- continue
- domain = extract_domain(url)
- if not self.collector.data(domain):
- self.collector.add_data(domain)
- count += 1
- logger.info(f'[数据同步]收录器读取{count}个网址域名')
- def sync_validator(self):
- """将垃圾表内容加载到过滤器"""
- if self._init_validator:
- logger.info(f'[数据同步]开始加载 - 过滤器')
- count = 0
- cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1})
- for item in cursor.sort(self.sort):
- try:
- domain = item['domain']
- if not isinstance(domain, str):
- MGO_DATA_GARBAGE.delete_one({'_id': item['_id']})
- continue
- except IndexError:
- continue
- if not self.validator.data(domain):
- self.validator.add_data(domain)
- count += 1
- logger.info(f'[数据同步]过滤器读取{count}条去重特征')
- def start(self):
- """数据同步"""
- def _validate():
- """验证模块"""
- while True:
- try:
- self.sync_collector()
- self.sync_validator()
- if not self._allow_load_data:
- self._allow_load_data = True
- except Exception as e:
- logger.exception(e)
- self.loops_interval(self._sync_validate_data)
- def _keywords():
- """搜索词"""
- while True:
- if self._allow_load_data:
- try:
- self.sync_keywords()
- self.loops_interval(self._keywords_interval)
- except Exception as e:
- logger.exception(e)
- else:
- self.loops_interval(2)
- def _competing_goods():
- """竞品列表"""
- while True:
- if self._allow_load_data:
- try:
- self.sync_competing_goods()
- self.loops_interval(self._competing_goods_interval)
- except Exception as e:
- logger.exception(e)
- else:
- self.loops_interval(2)
- def _seed_urls():
- """种子url"""
- while True:
- if self._allow_load_data:
- try:
- self.sync_seed_urls()
- self.loops_interval(self._seed_urls_interval)
- except Exception as e:
- logger.exception(e)
- else:
- self.loops_interval(2)
- def _orgs():
- """单位组织"""
- while True:
- if self._allow_load_data:
- try:
- self.sync_orgs()
- self.loops_interval(self._orgs_interval)
- except Exception as e:
- logger.exception(e)
- else:
- self.loops_interval(2)
- logger.info(f'[数据同步]初始化加载')
- threading.Thread(target=_validate, name='SyncValidateData').start()
- threading.Thread(target=_keywords, name='SyncKeywords').start()
- threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()
- threading.Thread(target=_seed_urls, name='SyncSeedUrls').start()
- threading.Thread(target=_orgs, name='SyncOrgs').start()
|