dongzhaorui 3 жил өмнө
parent
commit
8ff76f0dfe

+ 4 - 4
find_source/crawler/__init__.py

@@ -1,19 +1,19 @@
 import threading
 
-from crawler.spiders import SyncData, SearchEngine, DataExcavate
+from crawler.services import SyncData, DataQuery, DataExcavate
 
 
 class BreadthCrawler:
 
     def __init__(self, engines=None, **kwargs):
         SyncData(**kwargs)
-        self._engines = SearchEngine(engines, **kwargs)
+        self._query = DataQuery(engines, **kwargs)
         self._excavator = DataExcavate(**kwargs)
 
     def start(self):
         threading.Thread(
-            target=self._engines.start,
-            name='MainSearchEngine'
+            target=self._query.start,
+            name='MainDataQuery'
         ).start()
         threading.Thread(
             target=self._excavator.start,

+ 0 - 1
find_source/crawler/retrieve/__init__.py

@@ -1 +0,0 @@
-from .verify import Validator

+ 0 - 2
find_source/crawler/search/__init__.py

@@ -1,2 +0,0 @@
-from .engine import BingSearchEngine
-from .engine import JySearchEngine

+ 3 - 0
find_source/crawler/services/__init__.py

@@ -0,0 +1,3 @@
+from .data_excavate import DataExcavate
+from .data_query import DataQuery
+from .sync_data import SyncData

+ 181 - 0
find_source/crawler/services/basics.py

@@ -0,0 +1,181 @@
+import threading
+import time
+from typing import List, Mapping
+
+from common.databases import insert_one, int2long
+from common.log import logger
+from common.tools import delay_by
+from crawler.Task import Task
+from crawler.analysis import Parser
+from crawler.download import Downloader
+from crawler.schedule import Scheduler
+from crawler.utils import (
+    extract_domain,
+    is_url,
+)
+from crawler.validate import Validator
+from settings import (
+    MGO_LUA_SPIDERS,
+    MGO_URLS,
+    MGO_ORGS,
+    MGO_KEYWORDS,
+    MGO_COMPETING_GOODS,
+    MGO_REMOVAL_DUPLICATE,
+    MGO_DOMAIN,
+    MGO_QUERY,
+    MGO_RECORDS
+)
+
+
+class BasicSearch:
+
+    def __init__(
+            self,
+            keyword_weight=9,
+            url_weight=8,
+            org_weight=7,
+            scheduler=None,
+            validator=None,
+            downloader=None,
+            parser=None,
+            **kwargs
+    ):
+        self.scheduler = (scheduler or Scheduler())
+        self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
+        self.downloader = (downloader or Downloader())
+        self.parser = (parser or Parser())
+        # mongo查询
+        self.query = {'enable_added': {'$exists': False}}
+        self.projection = {'name': 1}
+        self.sort = [('_id', -1)]
+        # 分类
+        self.visit_classify = 'visit'
+        self.query_classify = 'query'
+        # 权重
+        self.org_weight = org_weight
+        self.url_weight = url_weight
+        self.keyword_weight = keyword_weight
+        self.retrieve_weight = 0
+        # 归属组
+        self.org_groups = 'organization'
+        self.keyword_groups = 'keyword'
+        self.url_groups = 'seed_url'
+        self.competing_groups = 'competing_goods'
+
+    @staticmethod
+    def loops_interval(interval):
+        t_name = threading.currentThread().getName()
+        next_run_time = delay_by((interval or 300))
+        logger.debug(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
+        time.sleep(interval)
+
+    @staticmethod
+    def make_task(**kwargs):
+        """生成Task对象"""
+        return Task(**kwargs)
+
+    @staticmethod
+    def make_retrieve_item(task: Task):
+        item = {
+            'name': task['name'],
+            'url': task['url'],
+            'domain': task['domain'],
+            'origin': task['origin'],
+            'groups': task['groups'],
+            'create_at': task['create_at'],
+            'update_at': task['update_at'],
+        }
+        return item
+
+    @staticmethod
+    def make_duplicate_removal(task: Task):
+        item = {
+            'domain': task['domain'],
+            'origin': task['origin'],
+            'create_at': task['update_at'],
+        }
+        return item
+
+    def _push_data(self, purpose: str, task: Task, collection):
+        if purpose == 'save':
+            insert_one(collection, self.make_retrieve_item(task))
+        elif purpose == 'remove':
+            insert_one(collection, self.make_duplicate_removal(task))
+        else:
+            insert_one(collection, task)
+
+    def push_remove(self, task: Task):
+        """数据去重的垃圾表"""
+        logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】")
+        self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
+
+    def push_domain(self, task: Task):
+        """挖掘网站的查询结果"""
+        logger.info(f"[推送挖掘结果]【{task['name']} - {task['domain']}】")
+        self._push_data('save', task, MGO_DOMAIN)
+
+    def push_query(self, task: Task):
+        """搜索组织单位查询结果"""
+        logger.info(f"[推送查询结果]【{task['name']} - {task['url']}】")
+        self._push_data('save', task, MGO_QUERY)
+
+    def push_records(self, task: Task):
+        """挖掘数据的记录"""
+        logger.info(f"[推送数据记录]【{task['name']} - {task['url']}】")
+        self._push_data('records', task, MGO_RECORDS)
+
+    def seed_orgs(self) -> List[Mapping]:
+        """组织|单位"""
+        search_orgs = []
+        cursor = MGO_ORGS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_orgs.append(item)
+        return search_orgs
+
+    def seed_keywords(self):
+        """关键词"""
+        search_keywords = []
+        cursor = MGO_KEYWORDS.find(projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_keywords.append(item['name'])
+        return search_keywords
+
+    def seed_urls(self) -> List[Mapping]:
+        """种子urls"""
+        search_urls = []
+        cursor = MGO_URLS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_urls.append(item)
+        return search_urls
+
+    def seed_competing_goods(self):
+        """竞品urls"""
+        competing_goods = []
+        cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            competing_goods.append(item)
+        return competing_goods
+
+    def lua_common_domains(self):
+        """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
+        parm_commons = []
+        projection = {'param_common': 1}
+        cursor = MGO_LUA_SPIDERS.find(projection=projection)
+        for item in cursor.sort(self.sort):
+            # name = item['param_common'][1]
+            try:
+                url = item['param_common'][11]
+                if not is_url(url):
+                    continue
+                domain = extract_domain(url)
+            except IndexError:
+                continue
+            if not self.validator.data(domain):
+                parm_commons.append({
+                    # 'name': name,
+                    'domain': domain,
+                    'origin': url,
+                    'create_at': int2long(int(time.time()))
+                })
+                self.validator.add_data(domain)
+        return parm_commons

+ 88 - 0
find_source/crawler/services/data_excavate.py

@@ -0,0 +1,88 @@
+import threading
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from common.log import logger
+from crawler.Task import Task
+from crawler.services.basics import BasicSearch
+from crawler.utils import (
+    extract_base_url,
+    extract_page_title,
+    extract_domain,
+    err_details,
+)
+
+
+class DataExcavate(BasicSearch):
+
+    def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
+        super(DataExcavate, self).__init__(**kwargs)
+        self._interval = loop_excavate_interval
+        self._workers = excavate_workers
+
+    def retrieve_site(self, task: Task):
+        logger.info(f'[数据挖掘]开始请求 - {task["url"]}')
+        response = self.downloader.get(task['url'])
+        task['status_code'] = response.status_code
+        if response.status_code != 200 or response.text in ['', None]:
+            task['err_reason'] = response.reason
+            logger.error(f'[数据挖掘]异常网址 - {task["url"]}')
+            return
+
+        task['domain'] = extract_domain(task['url'])
+        page_source = response.text
+        task['name'] = extract_page_title(page_source)
+        task['base_url'] = extract_base_url(task['url'])
+
+        items = self.parser.site_items(page_source, task['base_url'])
+        lst = []
+        _c = 0  # 页面包含的关键词计数器
+        for item in items:
+            name, url = item['name'], item['host']
+            if self.validator.phrase(name):
+                lst.append(self.make_task(
+                    url=url,
+                    name=name,
+                    origin=task['origin'],
+                    groups=task['groups'],
+                    classify=self.visit_classify,
+                    weight=task['weight']
+                ))
+                _c += 1
+
+        if _c > 1:
+            self.push_domain(task)
+        else:
+            if not self.validator.data(task['domain']):
+                self.push_remove(task)
+        self.scheduler.add_excavate(lst, level=task['weight'])
+
+    def excavate(self):
+        t_name = threading.currentThread().getName()
+        logger.info(f'[数据挖掘]启动 - {t_name}')
+        while True:
+            tasks = self.scheduler.get_excavate_task()
+            if len(tasks) == 0:
+                self.loops_interval(self._interval)
+                continue
+
+            task_key, task = tasks
+            if self.validator.data(task['url']):
+                continue
+
+            self.retrieve_site(task)
+            '''url - 添加过滤器'''
+            self.validator.add_data(task['url'])
+            '''domain - 添加过滤器'''
+            self.validator.add_data(task['domain'])
+            '''访问记录'''
+            self.push_records(task)
+
+    def start(self):
+        logger.info(f'[数据挖掘]初始化加载')
+        with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
+            futures = []
+            for _ in range(1, self._workers + 1):
+                f = executor.submit(self.excavate)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)

+ 125 - 0
find_source/crawler/services/data_query.py

@@ -0,0 +1,125 @@
+import threading
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from common.execptions import HostsRetrieveError
+from common.log import logger
+from crawler.engines import JySearchEngine
+from crawler.services.basics import BasicSearch
+from crawler.utils import (
+    extract_base_url,
+    extract_domain,
+    is_url,
+    err_details,
+)
+
+
+class DataQuery(BasicSearch):
+
+    def __init__(
+            self,
+            engines=None,
+            max_query_page=1,
+            loop_query_interval=60,
+            **kwargs
+    ):
+        super(DataQuery, self).__init__(**kwargs)
+        self._max_pages = max_query_page
+        self._interval = loop_query_interval
+        self._engines = []
+        self.set_engines(engines)
+
+    def _init(self):
+        self.set_engines(self._engines)
+
+    def _set_engine(self, engine):
+        if isinstance(engine, JySearchEngine):
+            self._engines.append(engine)
+            logger.info(f'[数据查询]添加引擎 - <{engine.__class__.__name__}>')
+        return self
+
+    def set_engines(self, engines):
+        if isinstance(engines, list):
+            for engine in engines:
+                self._set_engine(engine)
+        else:
+            self._set_engine(engines)
+        return self
+
+    def search(self, engine):
+        ename = engine.__class__.__name__
+        threading.currentThread().setName(ename)
+        logger.info(f'[数据查询]启动引擎 - <{ename}>')
+        while True:
+            tasks = self.scheduler.get_query_task()
+            if len(tasks) == 0:
+                self.loops_interval(self._interval)
+                continue
+
+            task_key, task = tasks
+            word = task['search']
+            if task['groups'] == self.org_groups:
+                '''使用企查查服务检索site'''
+                logger.info(f"<QccSearch> {task['groups']} >>> {word}")
+                try:
+                    url = engine.by_org_get_site(word)
+                    task['url'] = url
+                    task['name'] = word
+                    task['domain'] = extract_domain(url)
+                    '''保存数据'''
+                    self.push_query(task)
+                    if not is_url(url):
+                        continue
+
+                    if self.validator.data(task['domain']):
+                        continue
+
+                    '''domain - 添加过滤器'''
+                    self.validator.add_data(task['domain'])
+                    '''推送数据挖掘队列'''
+                    task['classify'] = self.visit_classify
+                    self.scheduler.add_excavate(task, level=task['weight'])
+                except HostsRetrieveError as e:
+                    task['status_code'] = e.code
+                    task['err_reason'] = e.reason
+                    logger.exception(e)
+                    '''重新放回查询队列'''
+                    self.scheduler.add_query(task, level=task['weight'])
+            else:
+                '''使用搜索引擎查询关键词'''
+                logger.info(f"<{ename}> {task['groups']} >>> {word}")
+                cur_page = 0
+                while cur_page < self._max_pages:
+                    cur_page += 1
+                    '''检索文本'''
+                    lst = []
+                    urls = engine.search(word, cur_page)
+                    '''生成数据挖掘任务'''
+                    for url in urls:
+                        domain = extract_domain(url)
+                        if self.validator.data(domain):
+                            continue
+                        lst.append(self.make_task(
+                            url=extract_base_url(url),
+                            origin=task['origin'],
+                            groups=task['groups'],
+                            classify=self.visit_classify,
+                            weight=task['weight'],
+                        ))
+                    '''推送数据挖掘队列'''
+                    self.scheduler.add_excavate(lst, level=task['weight'])
+                    logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
+
+            '''数据记录'''
+            self.push_records(task)
+
+    def start(self):
+        if len(self._engines) > 0:
+            logger.info(f'[数据查询]初始化加载')
+            max_workers = len(self._engines)  # 根据搜索引擎设置最大线程池
+            with ThreadPoolExecutor(max_workers, 'DataQuery') as executor:
+                futures = []
+                for engine in self._engines:
+                    f = executor.submit(self.search, engine)
+                    f.add_done_callback(err_details)
+                    futures.append(f)
+                wait(futures)

+ 172 - 0
find_source/crawler/services/sync_data.py

@@ -0,0 +1,172 @@
+import threading
+
+from common.log import logger
+from crawler.services.basics import BasicSearch
+from crawler.utils import is_url
+from settings import (
+    MGO_URLS,
+    MGO_ORGS,
+    MGO_COMPETING_GOODS,
+    MGO_REMOVAL_DUPLICATE
+)
+
+
+class SyncData(BasicSearch):
+
+    def __init__(self, init_validator=False, loop_sync_interval=600, **kwargs):
+        super(SyncData, self).__init__(**kwargs)
+        self._init_validator = init_validator
+        self._interval = loop_sync_interval
+        self._init()
+
+    def _init(self):
+        threading.Thread(target=self.sync_data, name='SyncData').start()
+
+    def sync_data_keywords(self):
+        """同步关键词数据"""
+        logger.info(f'[同步数据]加载关键词')
+        words = self.seed_keywords()
+        # 处理关键词格式并推送到任务队列
+        words = [str(word).replace('&nbsp;', '').strip() for word in words]
+        lst = []
+        for word in words:
+            lst.append(self.make_task(
+                search=word,
+                origin=word,
+                groups=self.keyword_groups,
+                classify=self.query_classify,
+                weight=self.keyword_weight
+            ))
+        self.scheduler.add_query(lst, level=self.keyword_weight)
+        logger.info(f'[同步数据]更新{len(words)}条关键词')
+
+    def sync_data_orgs(self):
+        """同步组织单位数据"""
+        logger.info(f'[同步数据]加载单位组织数据')
+        items = self.seed_orgs()
+        # 处理单位组织名称并推送到任务队列
+        orgs = []
+        for item in items:
+            name = item.get('name')
+            if name in ['', None]:
+                logger.warning(f'[异常的单位组织]{item}')
+                continue
+            word = str(name).replace('&nbsp;', '').strip()
+            orgs.append(word)
+        lst = []
+        for word in orgs:
+            lst.append(self.make_task(
+                search=word,
+                origin=word,
+                groups=self.org_groups,
+                classify=self.query_classify,
+                weight=self.org_weight
+            ))
+        self.scheduler.add_query(lst, level=self.org_weight)
+        # 已添加的组织单位名称进行标记,之后不在推送到任务队列
+        for item in items:
+            MGO_ORGS.update_one(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}个单位组织')
+
+    def sync_data_urls(self):
+        """同步网址数据"""
+        logger.info(f'[同步数据]加载种子url列表')
+        items = self.seed_urls()
+        lst = []
+        for item in items:
+            if not is_url(item['name']):
+                items.remove(item)
+                continue
+            exists_url = self.validator.data(item['name'])
+            if exists_url:
+                items.remove(item)
+                continue
+            lst.append(self.make_task(
+                url=item['name'],
+                origin=item['name'],
+                groups=self.url_groups,
+                classify=self.visit_classify,
+                weight=self.url_weight
+            ))
+        self.scheduler.add_excavate(lst, level=self.url_weight)
+        for item in items:
+            MGO_URLS.update_many(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}条网址数据')
+
+    def sync_data_competing_goods(self):
+        """同步竞品urls"""
+        logger.info(f'[同步数据]加载竞品url列表')
+        items = self.seed_competing_goods()
+        # 处理竞品urls并推送到任务队列
+        lst = []
+        for item in items:
+            if not is_url(item['name']):
+                items.remove(item)
+                continue
+            exists_url = self.validator.data(item['name'])
+            if exists_url:
+                items.remove(item)
+                continue
+            lst.append(self.make_task(
+                url=item['name'],
+                origin=item['name'],
+                groups=self.competing_groups,
+                classify=self.visit_classify,
+                weight=self.url_weight
+            ))
+        self.scheduler.add_excavate(lst, level=self.url_weight)
+        # 更新已推送竞品urls状态
+        for item in items:
+            MGO_COMPETING_GOODS.update_one(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url')
+
+    def sync_lua_commons(self):
+        """同步lua采集爬虫中网址与网址名称"""
+        logger.info(f'[同步数据]加载lua_commons数据')
+        items = self.lua_common_domains()
+        for item in items:
+            MGO_REMOVAL_DUPLICATE.insert_one(item)
+        logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
+
+    def sync_loading_validator(self):
+        """将垃圾表内容加载到过滤器"""
+        if self._init_validator:
+            logger.info(f'[同步数据]过滤器加载去重网址特征')
+            count = 0
+            cursor = MGO_REMOVAL_DUPLICATE.find(projection={'domain': 1})
+            for item in cursor.sort(self.sort):
+                try:
+                    domain = item['domain']
+                    if not isinstance(domain, str):
+                        MGO_REMOVAL_DUPLICATE.delete_one({'_id': item['_id']})
+                        continue
+                except IndexError:
+                    continue
+                if not self.validator.data(domain):
+                    self.validator.add_data(domain)
+                    count += 1
+            logger.info(f'[同步数据]更新{count}条去重网址特征')
+
+    def sync_data(self):
+        """同步数据"""
+        logger.info(f'[同步数据]初始化加载')
+        while True:
+            try:
+                self.sync_loading_validator()
+                self.sync_lua_commons()
+                self.sync_data_keywords()
+                self.sync_data_orgs()
+                self.sync_data_competing_goods()
+                self.sync_data_urls()
+            except Exception as e:
+                logger.exception(e)
+            self.loops_interval(self._interval)

+ 0 - 488
find_source/crawler/spiders.py

@@ -1,488 +0,0 @@
-import threading
-import time
-from concurrent.futures import ThreadPoolExecutor, wait
-from typing import List, Mapping
-
-from common.databases import insert_one, int2long
-from common.execptions import HostsRetrieveError
-from common.log import logger
-from common.tools import delay_by
-from crawler.Task import Task
-from crawler.analysis import Parser
-from crawler.download import Downloader
-from crawler.retrieve import Validator
-from crawler.schedule import Scheduler
-from crawler.search import JySearchEngine
-from crawler.utils import (
-    extract_base_url,
-    extract_page_title,
-    extract_domain,
-    is_url,
-    err_details,
-)
-from settings import (
-    MGO_LUA_SPIDERS,
-    MGO_SEARCH,
-    MGO_DOMAIN,
-    MGO_URLS,
-    MGO_ORGS,
-    MGO_KEYWORDS,
-    MGO_COMPETING_GOODS,
-    MGO_REMOVAL_DUPLICATE
-)
-
-
-class BasicSearch:
-
-    def __init__(
-            self,
-            keyword_weight=9,
-            url_weight=8,
-            org_weight=7,
-            scheduler=None,
-            validator=None,
-            downloader=None,
-            parser=None,
-            **kwargs
-    ):
-        self.scheduler = (scheduler or Scheduler())
-        self.validator = (validator or Validator())
-        self.downloader = (downloader or Downloader())
-        self.parser = (parser or Parser())
-        # mongo查询
-        self.query = {'enable_added': {'$exists': False}}
-        self.projection = {'name': 1}
-        self.sort = [('_id', -1)]
-        # 分类
-        self.visit_classify = 'visit'
-        self.query_classify = 'query'
-        # 权重
-        self.org_weight = org_weight
-        self.url_weight = url_weight
-        self.keyword_weight = keyword_weight
-        self.retrieve_weight = 0
-        # 归属组
-        self.org_groups = 'organization'
-        self.keyword_groups = 'keyword'
-        self.url_groups = 'seed_url'
-        self.competing_groups = 'competing_goods'
-
-    @staticmethod
-    def loops_interval(interval):
-        t_name = threading.currentThread().getName()
-        next_run_time = delay_by((interval or 300))
-        logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
-        time.sleep(interval)
-
-    @staticmethod
-    def make_task(**kwargs):
-        """生成Task对象"""
-        return Task(**kwargs)
-
-    def seed_orgs(self) -> List[Mapping]:
-        """组织|单位"""
-        search_orgs = []
-        cursor = MGO_ORGS.find(self.query, projection=self.projection)
-        for item in cursor.sort(self.sort):
-            search_orgs.append(item)
-        return search_orgs
-
-    def seed_keywords(self):
-        """关键词"""
-        search_keywords = []
-        cursor = MGO_KEYWORDS.find(projection=self.projection)
-        for item in cursor.sort(self.sort):
-            search_keywords.append(item['name'])
-        return search_keywords
-
-    def seed_urls(self) -> List[Mapping]:
-        """种子urls"""
-        search_urls = []
-        cursor = MGO_URLS.find(self.query, projection=self.projection)
-        for item in cursor.sort(self.sort):
-            search_urls.append(item)
-        return search_urls
-
-    def seed_competing_goods(self):
-        """竞品urls"""
-        competing_goods = []
-        cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
-        for item in cursor.sort(self.sort):
-            competing_goods.append(item)
-        return competing_goods
-
-    def lua_common_domains(self):
-        """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
-        parm_commons = []
-        projection = {'param_common': 1}
-        cursor = MGO_LUA_SPIDERS.find(projection=projection)
-        for item in cursor.sort(self.sort):
-            name = item['param_common'][1]
-            try:
-                url = item['param_common'][11]
-                if not is_url(url):
-                    continue
-                domain = extract_domain(url)
-            except IndexError:
-                continue
-            if not self.validator.url(domain):
-                parm_commons.append({'name': name, 'domain': domain})
-                self.validator.add_url(domain)
-        return parm_commons
-
-    def push_data(self, purpose: str, task: Task, collection):
-        if purpose == 'save':
-            insert_one(collection, self.make_retrieve_item(task))
-        else:
-            insert_one(collection, self.make_duplicate_removal(task))
-
-    @staticmethod
-    def make_retrieve_item(task: Task):
-        item = {
-            'name': task['name'],
-            'url': task['url'],
-            'domain': task['domain'],
-            'origin': task['origin'],
-            'groups': task['groups'],
-            'create_at': task['create_at'],
-            'update_at': task['update_at'],
-        }
-        return item
-
-    @staticmethod
-    def make_duplicate_removal(task: Task):
-        item = {
-            'domain': task['domain'],
-            'url': task['url'],
-            'create_at': task['update_at'],
-        }
-        return item
-
-
-class SyncData(BasicSearch):
-
-    def __init__(self, allow_load_filter=False, **kwargs):
-        super(SyncData, self).__init__(**kwargs)
-        self._init(allow_load_filter)
-
-    def _init(self, allow_load_filter=False):
-        threading.Thread(
-            target=self.sync_data,
-            name='SyncData'
-        ).start()
-        if allow_load_filter:
-            self.validator.load_filter()
-
-    def sync_data_keywords(self):
-        """同步关键词数据"""
-        logger.info(f'[同步数据]加载关键词')
-        words = self.seed_keywords()
-        # 处理关键词格式并推送到任务队列
-        words = [str(word).replace('&nbsp;', '').strip() for word in words]
-        lst = []
-        for word in words:
-            lst.append(self.make_task(
-                search=word,
-                origin=word,
-                groups=self.keyword_groups,
-                classify=self.query_classify,
-                weight=self.keyword_weight
-            ))
-        self.scheduler.add_query(lst, level=self.keyword_weight)
-        logger.info(f'[同步数据]更新{len(words)}条关键词')
-
-    def sync_data_orgs(self):
-        """同步组织单位数据"""
-        logger.info(f'[同步数据]加载单位组织数据')
-        items = self.seed_orgs()
-        # 处理单位组织名称并推送到任务队列
-        orgs = []
-        for item in items:
-            name = item.get('name')
-            if name in ['', None]:
-                logger.warning(f'[异常的单位组织]{item}')
-                continue
-            word = str(name).replace('&nbsp;', '').strip()
-            orgs.append(word)
-        lst = []
-        for word in orgs:
-            lst.append(self.make_task(
-                search=word,
-                origin=word,
-                groups=self.org_groups,
-                classify=self.query_classify,
-                weight=self.org_weight
-            ))
-        self.scheduler.add_query(lst, level=self.org_weight)
-        # 已添加的组织单位名称进行标记,之后不在推送到任务队列
-        for item in items:
-            MGO_ORGS.update_one(
-                {'_id': item['_id']},
-                {'$set': {'enable_added': True}}
-            )
-        logger.info(f'[同步数据]更新{len(items)}个单位组织')
-
-    def sync_data_urls(self):
-        """同步网址数据"""
-        logger.info(f'[同步数据]加载种子url列表')
-        items = self.seed_urls()
-        lst = []
-        for item in items:
-            if not is_url(item['name']):
-                items.remove(item)
-                continue
-            exists_url = self.validator.url(item['name'])
-            if exists_url:
-                items.remove(item)
-                continue
-            lst.append(self.make_task(
-                url=item['name'],
-                origin=item['name'],
-                groups=self.url_groups,
-                classify=self.visit_classify,
-                weight=self.url_weight
-            ))
-        self.scheduler.add_excavate(lst, level=self.url_weight)
-        for item in items:
-            MGO_URLS.update_many(
-                {'_id': item['_id']},
-                {'$set': {'enable_added': True}}
-            )
-        logger.info(f'[同步数据]更新{len(items)}条网址数据')
-
-    def sync_data_competing_goods(self):
-        """同步竞品urls"""
-        logger.info(f'[同步数据]加载竞品url列表')
-        items = self.seed_competing_goods()
-        # 处理竞品urls并推送到任务队列
-        lst = []
-        for item in items:
-            if not is_url(item['name']):
-                items.remove(item)
-                continue
-            exists_url = self.validator.url(item['name'])
-            if exists_url:
-                items.remove(item)
-                continue
-            lst.append(self.make_task(
-                url=item['name'],
-                origin=item['name'],
-                groups=self.competing_groups,
-                classify=self.visit_classify,
-                weight=self.url_weight))
-        self.scheduler.add_excavate(lst, level=self.url_weight)
-        # 更新已推送竞品urls状态
-        for item in items:
-            MGO_COMPETING_GOODS.update_one(
-                {'_id': item['_id']},
-                {'$set': {'enable_added': True}}
-            )
-        logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url')
-
-    def sync_lua_commons(self):
-        """同步lua采集爬虫中网址与网址名称"""
-        logger.info(f'[同步数据]加载lua_commons数据')
-        items = self.lua_common_domains()
-        for item in items:
-            item['create_at'] = int2long(int(time.time()))
-            MGO_REMOVAL_DUPLICATE.insert_one(item)
-        logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
-
-    def sync_data(self):
-        """同步数据"""
-        logger.info(f'[同步数据]初始化加载')
-        _interval = 600
-        while True:
-            try:
-                self.sync_lua_commons()
-                self.sync_data_keywords()
-                self.sync_data_orgs()
-                self.sync_data_competing_goods()
-                self.sync_data_urls()
-            except Exception as e:
-                logger.exception(e)
-            self.loops_interval(_interval)
-
-
-class SearchEngine(BasicSearch):
-
-    def __init__(
-            self,
-            engines=None,
-            max_search_page=1,
-            loop_search_interval=60,
-            **kwargs
-    ):
-        super(SearchEngine, self).__init__(**kwargs)
-        self._max_pages = max_search_page
-        self._interval = loop_search_interval
-        self._engines = []
-        self.set_engines(engines)
-
-    def _init(self):
-        self.set_engines(self._engines)
-
-    def _set_engine(self, engine):
-        if isinstance(engine, JySearchEngine):
-            self._engines.append(engine)
-            logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成')
-        return self
-
-    def set_engines(self, engines):
-        if isinstance(engines, list):
-            for engine in engines:
-                self._set_engine(engine)
-        else:
-            self._set_engine(engines)
-        return self
-
-    def search(self, engine):
-        ename = engine.__class__.__name__
-        threading.currentThread().setName(ename)
-        logger.info(f'[搜索引擎]启动 - <{ename}>')
-        while True:
-            tasks = self.scheduler.get_query_task()
-            if len(tasks) == 0:
-                self.loops_interval(self._interval)
-                continue
-
-            task_key, task = tasks
-            word = task['search']
-            task['update_at'] = int2long(int(time.time()))
-            if task['groups'] == self.org_groups:
-                '''使用企查查服务检索site'''
-                logger.info(f"<QccSearch> {task['groups']} >>> {word}")
-                try:
-                    url = engine.by_org_get_site(task['search'])
-                    task['url'] = url
-                    task['name'] = word
-                    task['domain'] = extract_domain(url)
-                    '''保存数据'''
-                    self.push_data('save', task, MGO_SEARCH)
-                    if not is_url(url):
-                        continue
-                    if self.validator.url(task['domain']):
-                        continue
-                    '''domain - 添加过滤器'''
-                    self.validator.add_url(task['domain'])
-                    '''推送数据挖掘队列'''
-                    task['classify'] = self.visit_classify
-                    self.scheduler.add_excavate(task, level=task['weight'])
-                except HostsRetrieveError as e:
-                    logger.exception(e)
-                    '''重新放回任务队列'''
-                    self.scheduler.add_query(task, level=task['weight'])
-            else:
-                '''使用搜索引擎查询关键词'''
-                logger.info(f"<{ename}> {task['groups']} >>> {word}")
-                cur_page = 0
-                while cur_page < self._max_pages:
-                    cur_page += 1
-                    '''检索文本'''
-                    lst = []
-                    urls = engine.search(word, cur_page)
-                    '''生成数据挖掘任务'''
-                    for url in urls:
-                        domain = extract_domain(url)
-                        if self.validator.url(domain):
-                            continue
-                        lst.append(self.make_task(
-                            url=extract_base_url(url),
-                            origin=task['origin'],
-                            groups=task['groups'],
-                            classify=self.visit_classify,
-                            weight=task['weight'],
-                        ))
-                    '''推送数据挖掘队列'''
-                    self.scheduler.add_excavate(lst, level=task['weight'])
-                    logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
-
-    def start(self):
-        if len(self._engines) > 0:
-            logger.info(f'[搜索引擎]初始化加载')
-            # 根据搜索引擎最大数量设置最大线程池
-            max_workers = len(self._engines)
-            with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor:
-                futures = []
-                for engine in self._engines:
-                    f = executor.submit(self.search, engine)
-                    f.add_done_callback(err_details)
-                    futures.append(f)
-                wait(futures)
-
-
-class DataExcavate(BasicSearch):
-
-    def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
-        super(DataExcavate, self).__init__(**kwargs)
-        self._interval = loop_excavate_interval
-        self._workers = excavate_workers
-
-    def save(self, task: Task):
-        """保存数据挖掘符合要求的站点"""
-        self.push_data('save', task, MGO_DOMAIN)
-        '''url添加去重库'''
-        self.push_data('remove', task, MGO_REMOVAL_DUPLICATE)
-        logger.info(f"[上传数据]{task['name']} - {task['domain']}")
-
-    def retrieve_site(self, task: Task):
-        if self.validator.url(task['url']):
-            return
-
-        logger.info(f'[数据挖掘]开始请求 -> {task["url"]}')
-        response = self.downloader.get(task['url'])
-        if response.status_code != 200 or response.text in ['', None]:
-            logger.error(f'[数据挖掘]访问异常 -> {task["url"]}')
-            return
-
-        task['domain'] = extract_domain(task['url'])
-        page_source = response.text
-        task['name'] = extract_page_title(page_source)
-        task['base_url'] = extract_base_url(task['url'])
-
-        items = self.parser.site_items(page_source, task['base_url'])
-        lst = []
-        _c = 0  # 页面包含的关键词计数器
-        for item in items:
-            name, url = item['name'], item['host']
-            if self.validator.requirement_word(name):
-                lst.append(self.make_task(
-                    url=url,
-                    name=name,
-                    origin=task['origin'],
-                    groups=task['groups'],
-                    classify=self.visit_classify,
-                    weight=task['weight']
-                ))
-                _c += 1
-
-        if _c > 1:
-            self.save(task)
-        self.scheduler.add_excavate(lst, level=task['weight'])
-
-    def excavate(self):
-        t_name = threading.currentThread().getName()
-        logger.info(f'[数据挖掘]启动 - {t_name}')
-        while True:
-            tasks = self.scheduler.get_excavate_task()
-            if len(tasks) == 0:
-                self.loops_interval(self._interval)
-                continue
-
-            task_key, task = tasks
-            task['update_at'] = int2long(int(time.time()))
-            self.retrieve_site(task)
-            '''domain - 添加过滤器'''
-            self.validator.add_url(task['domain'])
-            '''url - 添加过滤器'''
-            self.validator.add_url(task['url'])
-
-    def start(self):
-        logger.info(f'[数据挖掘]初始化加载')
-        with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
-            futures = []
-            for _ in range(1, self._workers + 1):
-                f = executor.submit(self.excavate)
-                f.add_done_callback(err_details)
-                futures.append(f)
-            wait(futures)