Browse Source

fixbug - 布隆过滤器的redis分布式锁问题修复

dongzhaorui 2 years ago
parent
commit
895a7176dc
1 changed files with 26 additions and 3 deletions
  1. 26 3
      find_source/crawler/services/excavate.py

+ 26 - 3
find_source/crawler/services/excavate.py

@@ -1,6 +1,8 @@
 import re
+import time
 from concurrent.futures import ThreadPoolExecutor, wait
 
+from common.databases import int2long
 from common.log import logger
 from crawler.Task import Task
 from crawler.analysis import TimeExtractor
@@ -13,14 +15,32 @@ from crawler.utils import (
     is_url,
     html2element,
     iter_node,
-    check_page_by_words
+    check_page_by_words,
+    predict_bidding_model
 )
+from settings import Dzr
 
 TLDS = ['com', 'cn', 'net', 'org']
 URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
 URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
 
 
+def predict_data(html, task: Task):
+    data_json = {'html': html, 'predict': 0}
+    if html is None:
+        return data_json
+    data = {'contenthtml': html}
+    data_json = predict_bidding_model(data)
+    Dzr.insert_one({
+        'site': task['origin'],
+        'html': html,
+        'url': task['url'],
+        'predict': data_json['predict'],
+        'comeintime': int2long(int(time.time()))
+    })
+    return data_json
+
+
 class DataExcavate(BasicService):
     """数据挖掘服务"""
 
@@ -54,8 +74,8 @@ class DataExcavate(BasicService):
             curr_depth > self._max_depth,  # 检查任务层级
             is_attachment_url,  # 检查网址是否附件下载地址
             self.validator.data(url),  # 垃圾池 - 判重任务请求网址
-            self.validator.data(domain),  # 垃圾池 - 过滤域名
-            self.collector.data(domain),  # 收录池 - 判重域名
+            self.validator.data(domain),  # 垃圾池 - 过滤域名(一级)
+            self.collector.data(domain),  # 收录池 - 判重域名(一级)
         ]):
             logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
             return True
@@ -153,6 +173,9 @@ class DataExcavate(BasicService):
         logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
         status_code, page_source = self.fetch_page(task)
         task['status_code'] = status_code
+
+        predict_data(page_source, task)
+
         if page_source is None:
             # 访问失败的域名是否添加过滤器?
             self.push_remove(task)