Эх сурвалжийг харах

优化 - retrieve队列分类细分: retrieve_query与retrieve_excavate 任务队列

dongzhaorui 3 жил өмнө
parent
commit
3a29b89fdf

+ 1 - 18
find_source/crawler/retrieve/verify.py

@@ -5,8 +5,7 @@ from common.log import logger
 from crawler.bloom_filter.RedisBloomFilter import RedisFilter
 from settings import (
     MGO_REMOVAL_DUPLICATE,
-    REQUIREMENT_PHRASE,
-    SENSITIVE_WORDS
+    REQUIREMENT_PHRASE
 )
 
 
@@ -18,20 +17,11 @@ def _requirement_phrase(val: str):
     return False
 
 
-def _sensitive_word(val: str):
-    """垃圾词"""
-    for word in SENSITIVE_WORDS:
-        if val.find(word) != -1:
-            return True
-    return False
-
-
 class Validator:
 
     def __init__(self):
         self._rbf = RedisFilter(redis_key='RemovalDuplicate_')
         self._rbf.start(1000000000, 0.00001)
-        self._sensitive_word = _sensitive_word
         self._requirement_phrase = _requirement_phrase
         self._loop_Interval = 7200
 
@@ -62,13 +52,6 @@ class Validator:
     def add_url(self, url: str):
         self._rbf.add(url)
 
-    def sensitive_word(self, val):
-        if val is None or len(val) < 5:
-            return True
-        if self._sensitive_word(val):
-            return True
-        return False
-
     def requirement_word(self, val):
         return self._requirement_phrase(val)
 

+ 36 - 15
find_source/crawler/schedule.py

@@ -1,5 +1,5 @@
 from crawler.q import RedisQueue
-from settings import REDIS_KEY
+from settings import REDIS_QUERY, REDIS_EXCAVATE
 
 
 class Scheduler:
@@ -7,22 +7,43 @@ class Scheduler:
     def __init__(self, queue=None):
         self.mrq = (queue or RedisQueue())
 
-    def get_task(self, redis_key=None):
-        if redis_key is not None:
-            return self.mrq.pop_task_by_key(redis_key)
-        else:
-            return self.mrq.pop_task([REDIS_KEY])
-
-    def insert_tasks(self, tasks, level=1, **kwargs):
-        """批量插入任务"""
+    def _add_tasks(self, classify: str, tasks, level=1, **kwargs):
         allow_output_log = kwargs.get('allow_output_log', True)
         if isinstance(tasks, list):
             _tasks = tasks
         else:
             _tasks = [tasks]
-        self.mrq.push_task(
-            REDIS_KEY,
-            _tasks,
-            level=level,
-            allow_output_log=allow_output_log
-        )
+
+        if classify.lower() == 'query':
+            self.mrq.push_task(
+                REDIS_QUERY,
+                _tasks,
+                level=level,
+                allow_output_log=allow_output_log
+            )
+        elif classify.lower() == 'excavate':
+            self.mrq.push_task(
+                REDIS_EXCAVATE,
+                _tasks,
+                level=level,
+                allow_output_log=allow_output_log
+            )
+
+    def add_query(self, tasks, level=1, **kwargs):
+        self._add_tasks('query', tasks, level, **kwargs)
+
+    def add_excavate(self, tasks, level=1, **kwargs):
+        self._add_tasks('excavate', tasks, level, **kwargs)
+
+    def _get_task(self, classify: str):
+        if classify.lower() == 'query':
+            return self.mrq.pop_task([REDIS_QUERY])
+
+        elif classify.lower() == 'excavate':
+            return self.mrq.pop_task([REDIS_EXCAVATE])
+
+    def get_excavate_task(self):
+        return self._get_task('excavate')
+
+    def get_query_task(self):
+        return self._get_task('query')

+ 42 - 47
find_source/crawler/spiders.py

@@ -21,7 +21,6 @@ from crawler.utils import (
     err_details,
 )
 from settings import (
-    REDIS_KEY,
     MGO_LUA_SPIDERS,
     MGO_SEARCH,
     MGO_DOMAIN,
@@ -174,34 +173,6 @@ class SyncData(BasicSearch):
         if allow_load_filter:
             self.validator.load_filter()
 
-    def sync_data_urls(self):
-        """同步网址数据"""
-        logger.info(f'[同步数据]加载种子网址')
-        items = self.seed_urls()
-        lst = []
-        for item in items:
-            if not is_url(item['name']):
-                items.remove(item)
-                continue
-            exists_url = self.validator.url(item['name'])
-            if exists_url:
-                items.remove(item)
-                continue
-            lst.append(self.make_task(
-                url=item['name'],
-                origin=item['name'],
-                groups=self.url_groups,
-                classify=self.visit_classify,
-                weight=self.url_weight
-            ))
-        self.scheduler.insert_tasks(lst, level=self.url_weight)
-        for item in items:
-            MGO_URLS.update_many(
-                {'_id': item['_id']},
-                {'$set': {'enable_added': True}}
-            )
-        logger.info(f'[同步数据]更新{len(items)}条网址数据')
-
     def sync_data_keywords(self):
         """同步关键词数据"""
         logger.info(f'[同步数据]加载关键词')
@@ -217,7 +188,7 @@ class SyncData(BasicSearch):
                 classify=self.query_classify,
                 weight=self.keyword_weight
             ))
-        self.scheduler.insert_tasks(lst, level=self.keyword_weight)
+        self.scheduler.add_query(lst, level=self.keyword_weight)
         logger.info(f'[同步数据]更新{len(words)}条关键词')
 
     def sync_data_orgs(self):
@@ -242,7 +213,7 @@ class SyncData(BasicSearch):
                 classify=self.query_classify,
                 weight=self.org_weight
             ))
-        self.scheduler.insert_tasks(lst, level=self.org_weight)
+        self.scheduler.add_query(lst, level=self.org_weight)
         # 已添加的组织单位名称进行标记,之后不在推送到任务队列
         for item in items:
             MGO_ORGS.update_one(
@@ -251,9 +222,37 @@ class SyncData(BasicSearch):
             )
         logger.info(f'[同步数据]更新{len(items)}个单位组织')
 
+    def sync_data_urls(self):
+        """同步网址数据"""
+        logger.info(f'[同步数据]加载种子url列表')
+        items = self.seed_urls()
+        lst = []
+        for item in items:
+            if not is_url(item['name']):
+                items.remove(item)
+                continue
+            exists_url = self.validator.url(item['name'])
+            if exists_url:
+                items.remove(item)
+                continue
+            lst.append(self.make_task(
+                url=item['name'],
+                origin=item['name'],
+                groups=self.url_groups,
+                classify=self.visit_classify,
+                weight=self.url_weight
+            ))
+        self.scheduler.add_excavate(lst, level=self.url_weight)
+        for item in items:
+            MGO_URLS.update_many(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}条网址数据')
+
     def sync_data_competing_goods(self):
         """同步竞品urls"""
-        logger.info(f'[同步数据]加载竞品列表数据')
+        logger.info(f'[同步数据]加载竞品url列表')
         items = self.seed_competing_goods()
         # 处理竞品urls并推送到任务队列
         lst = []
@@ -271,14 +270,14 @@ class SyncData(BasicSearch):
                 groups=self.competing_groups,
                 classify=self.visit_classify,
                 weight=self.url_weight))
-        self.scheduler.insert_tasks(lst, level=self.url_weight)
+        self.scheduler.add_excavate(lst, level=self.url_weight)
         # 更新已推送竞品urls状态
         for item in items:
             MGO_COMPETING_GOODS.update_one(
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[同步数据]更新{len(items)}条竞品源网址')
+        logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url')
 
     def sync_lua_commons(self):
         """同步lua采集爬虫中网址与网址名称"""
@@ -342,7 +341,7 @@ class SearchEngine(BasicSearch):
         threading.currentThread().setName(ename)
         logger.info(f'[搜索引擎]启动 - <{ename}>')
         while True:
-            tasks = self.scheduler.get_task()
+            tasks = self.scheduler.get_query_task()
             if len(tasks) == 0:
                 self.loops_interval(self._interval)
                 continue
@@ -350,19 +349,17 @@ class SearchEngine(BasicSearch):
             task_key, task = tasks
             word = task['search']
             task['update_at'] = int2long(int(time.time()))
-            if task['classify'] == self.visit_classify:
-                self.scheduler.insert_tasks(task, level=task['weight'])
-            elif task['groups'] == self.org_groups:
+            if task['groups'] == self.org_groups:
                 '''使用企查查服务检索site'''
                 logger.info(f"<QccSearch> {task['groups']} >>> {word}")
                 try:
                     url = engine.by_org_get_site(task['search'])
-                    task['url'] = url
+                    task['url'] = extract_base_url(url)
                     task['name'] = word
                     task['domain'] = extract_domain(task['url'])
                     '''保存数据'''
                     self.push_data('save', task, MGO_SEARCH)
-                    if not is_url(url):
+                    if not is_url(task['url']):
                         continue
                     if self.validator.url(task['domain']):
                         continue
@@ -370,8 +367,7 @@ class SearchEngine(BasicSearch):
                     self.validator.add_url(task['domain'])
                     '''推送数据挖掘队列'''
                     task['classify'] = self.visit_classify
-                    task['weight'] = self.url_weight
-                    self.scheduler.insert_tasks(task, level=self.url_weight)
+                    self.scheduler.add_excavate(task, level=task['weight'])
                 except HostsRetrieveError as e:
                     logger.exception(e)
             else:
@@ -393,10 +389,10 @@ class SearchEngine(BasicSearch):
                             origin=task['origin'],
                             groups=task['groups'],
                             classify=self.visit_classify,
-                            weight=self.url_weight,
+                            weight=task['weight'],
                         ))
                     '''推送数据挖掘队列'''
-                    self.scheduler.insert_tasks(lst, level=self.url_weight)
+                    self.scheduler.add_excavate(lst, level=task['weight'])
                     logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
 
     def start(self):
@@ -460,7 +456,7 @@ class DataExcavate(BasicSearch):
 
         if _c > 1:
             self.save(task)
-        self.scheduler.insert_tasks(lst, level=self.url_weight)
+        self.scheduler.add_excavate(lst, level=task['weight'])
         '''domain - 添加过滤器'''
         self.validator.add_url(task['domain'])
         '''url - 添加过滤器'''
@@ -470,8 +466,7 @@ class DataExcavate(BasicSearch):
         t_name = threading.currentThread().getName()
         logger.info(f'[数据挖掘]启动 - {t_name}')
         while True:
-            _redis_key = REDIS_KEY + '-' + str(self.url_weight)
-            tasks = self.scheduler.get_task(_redis_key)
+            tasks = self.scheduler.get_excavate_task()
             if len(tasks) == 0:
                 self.loops_interval(self._interval)
                 continue

+ 4 - 5
find_source/settings.py

@@ -21,7 +21,10 @@ MGO_COMPETING_GOODS = mongo_table(db=MGO_DATABASE, name='retrieve_competing_good
 MGO_LUA_SPIDERS = mongo_table(db='editor', name='luaconfig')
 '''redis'''
 REDIS = redis_client()
-REDIS_KEY = 'retrieves'
+'''词组查询redis队列'''
+REDIS_QUERY = 'retrieve_query'
+'''数据挖掘redis队列'''
+REDIS_EXCAVATE = 'retrieve_excavate'
 '''关键词'''
 REQUIREMENT_PHRASE = [
     '竞谈', '发包', '比价', '开标', '邀标', '采购', '招标', '中标', '废标', '成交', '单一', '询价',
@@ -29,10 +32,6 @@ REQUIREMENT_PHRASE = [
     '议价', '中选', '答疑', '合同', '竞价', '变更', '更正', '预告', '集采', '抽取', '抽签',
     '中止公告', '终止公告', '竞卖', '竞买', '论证', '拟建', '审批', '环评'
 ]
-'''剔除或者过滤的内容'''
-SENSITIVE_WORDS = [
-    '通知', '邮箱', '登录', '注册', '亿元', '地址', '招聘', '试驾', '信用卡',
-]
 '''搜索引擎过滤特征'''
 ENGINE_FEATURE_RETRIEVES = [
     'microsoft.com',