Browse Source

fixbug:修正Task不正确的属性

dongzhaorui 2 năm trước cách đây
mục cha
commit
2274c0004c

+ 5 - 6
find_source/common/databases.py

@@ -61,14 +61,13 @@ def update_one(collection: Collection, item):
 
 def update_one_by_domain(collection: Collection, item):
     lst = []
-    items = collection.find_one({'domain': item['domain']})
-    if items is not None:
-        origin = items.get('origin', [])
-        if item['origin'] not in origin:
-            lst.append(item['origin'])
-        lst.extend(origin)
+    res = collection.find_one({'domain': item['domain']})
+    if res is not None and item['origin'] not in res['origin']:
+        lst.append(item['origin'])  # 添加挖掘新成员
+        lst.extend(res['origin'])  # 合并已收录旧成员
     else:
         lst.append(item['origin'])
+
     item.update({'origin': lst})
     collection.update_one(
         {'domain': item['domain']},

+ 36 - 0
find_source/crawler/Task.py

@@ -57,3 +57,39 @@ class Task(UserDict):
     def __getitem__(self, item):
         self._update_at()
         return super(Task, self).__getitem__(item)
+
+    @property
+    def retrieve_task_to_dict(self):
+        item = {
+            'name': self.data['name'],
+            'url': self.data['url'],
+            'domain': self.data['domain'],
+            'origin': self.data['origin'],
+            'groups': self.data['groups'],
+            'create_at': self.data['create_at'],
+            'update_at': self.data['update_at'],
+        }
+        return item
+
+    @property
+    def excavate_task_to_dict(self):
+        item = {
+            'name': self.data['name'],
+            'url': self.data['url'],
+            'domain': self.data['domain'],
+            'depth': self.data['depth'],
+            'origin': self.data['origin'],
+            'groups': self.data['groups'],
+            'create_at': self.data['create_at'],
+            'update_at': self.data['update_at'],
+        }
+        return item
+
+    @property
+    def validate_task_to_dict(self):
+        item = {
+            'domain': self.data['domain'],
+            'origin': self.data['origin'],
+            'create_at': self.data['update_at'],
+        }
+        return item

+ 7 - 42
find_source/crawler/services/basics.py

@@ -22,7 +22,7 @@ from settings import (
     MGO_ORGS,
     MGO_KEYWORDS,
     MGO_COMPETING_GOODS,
-    MGO_DATA_GARBAGE,
+    MGO_GARBAGE,
     MGO_DOMAIN,
     MGO_QUERY,
     MGO_RECORDS
@@ -76,53 +76,18 @@ class BasicService:
         """生成Task对象"""
         return Task(**kwargs)
 
-    @staticmethod
-    def make_retrieve_item(task: Task):
-        item = {
-            'name': task['name'],
-            'url': task['url'],
-            'domain': task['domain'],
-            'origin': task['origin'],
-            'groups': task['groups'],
-            'create_at': task['create_at'],
-            'update_at': task['update_at'],
-        }
-        return item
-
-    @staticmethod
-    def make_domain_item(task: Task):
-        item = {
-            'name': task['name'],
-            'url': task['url'],
-            'domain': task['domain'],
-            'depth': task['depth'],
-            'origin': task['origin'],
-            'groups': task['groups'],
-            'create_at': task['create_at'],
-            'update_at': task['update_at'],
-        }
-        return item
-
-    @staticmethod
-    def make_duplicate_removal(task: Task):
-        item = {
-            'domain': task['domain'],
-            'origin': task['origin'],
-            'create_at': task['update_at'],
-        }
-        return item
-
     def _push_data(self, purpose: str, task: Task, collection):
         if purpose == 'query':
-            item = self.make_retrieve_item(task)
+            item = task.retrieve_task_to_dict
             insert_one(collection, item)
             logger.info(f'<{self.thread_name}> - 查询结果 - {item["_id"]}')
         elif purpose == 'domain':
-            item = self.make_domain_item(task)
+            item = task.excavate_task_to_dict
             insert_one(collection, item)
-            logger.info(f'<{self.thread_name}> - 寻源结果 - {task["domain"]}')
+            logger.info(f'<{self.thread_name}> - 挖掘结果 - {task["domain"]}')
         elif purpose == 'remove':
-            item = self.make_duplicate_removal(task)
+            item = task.validate_task_to_dict
+            item['source'] = 'system'
             update_one_by_domain(collection, item)
             logger.info(f'<{self.thread_name}> - 添加过滤 - {task["url"]}')
         else:
@@ -132,7 +97,7 @@ class BasicService:
     def push_remove(self, task: Task):
         """数据去重表"""
         if not self.validator.data(task['url']):
-            self._push_data('remove', task, MGO_DATA_GARBAGE)
+            self._push_data('remove', task, MGO_GARBAGE)
             self.validator.add_data(task['url'])
             return True
         return False

+ 33 - 29
find_source/crawler/services/excavate.py

@@ -31,13 +31,13 @@ def predict_data(html, task: Task):
         return data_json
     data = {'contenthtml': html}
     data_json = predict_bidding_model(data)
-    Dzr.insert_one({
-        'site': task['origin'],
-        # 'html': compress_str(html),
-        'url': task['url'],
-        'predict': data_json['predict'],
-        'comeintime': int2long(int(time.time()))
-    })
+    # Dzr.insert_one({
+    #     'site': task['origin'],
+    #     # 'html': compress_str(html),
+    #     'url': task['url'],
+    #     'predict': data_json['predict'],
+    #     'comeintime': int2long(int(time.time()))
+    # })
     return data_json
 
 
@@ -161,34 +161,38 @@ class DataExcavate(BasicService):
     def fetch_page(self, task: Task):
         url = task['url']
         response = self.downloader.get(url, timeout=5)
-        status_code = response.status_code
-        page_source = response.text
+        task['status_code'] = status_code = response.status_code
+        html = response.text
         reason = response.reason
-        if status_code != 200 or page_source in ['', None]:
+        if status_code != 200 or html in ['', None]:
             task['err_reason'] = reason
-            msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}'
+            msg = '<{thread_name}> - {url} - {status_code} - {reason}'.format(
+                thread_name=self.thread_name,
+                url=url,
+                status_code=status_code,
+                reason=reason
+            )
             logger.error(msg)
-            return status_code, None
-        return status_code, page_source
+            return None
+        return html
 
     def process(self, task: Task):
         logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
-        status_code, page_source = self.fetch_page(task)
-        task['status_code'] = status_code
-
-        if page_source is None:
-            # 访问失败的域名是否添加过滤器?
-            self.push_remove(task)
-            return False
-
-        predict_res = predict_data(page_source, task)  # 招投标预测结果
-        if predict_res['predict']:
-            task['domain'] = extract_domain(task['url'])
-            task['base_url'] = extract_host(task['url'])
-            task['name'] = extract_page_title(page_source)
-            self.same_origin_strategy(page_source, task)
-            self.non_origin_strategy(page_source, task)
-        return True
+        status = False
+        # 预处理域名[domain]与基准地址[base_url]
+        task['domain'] = extract_domain(task['url'])
+        task['base_url'] = extract_host(task['url'])
+        html = self.fetch_page(task)  # 发起请求
+        if html is not None:
+            predict_res = predict_data(html, task)  # 招投标预测结果
+            if predict_res['predict']:
+                task['name'] = extract_page_title(html)
+                self.same_origin_strategy(html, task)
+                self.non_origin_strategy(html, task)
+                status = True
+        # 处理过的任务加入过滤器,防止重复处理
+        self.push_remove(task)
+        return status
 
     def excavate(self):
         logger.info(f'开启线程 - <{self.thread_name}>')

+ 34 - 25
find_source/crawler/services/sync_data.py

@@ -7,7 +7,7 @@ from settings import (
     MGO_URLS,
     MGO_ORGS,
     MGO_COMPETING_GOODS,
-    MGO_DATA_GARBAGE,
+    MGO_GARBAGE,
     MGO_LUA_SPIDERS
 )
 
@@ -28,7 +28,6 @@ class SyncData(BasicService):
 
     def sync_keywords(self):
         """同步搜索词数据"""
-        logger.info(f'[加载数据]搜索词表')
         words = self.keywords_table()
         # 处理关键词格式并推送到任务队列
         words = [str(word).replace('&nbsp;', '').strip() for word in words]
@@ -46,7 +45,6 @@ class SyncData(BasicService):
 
     def sync_orgs(self):
         """同步组织单位数据"""
-        logger.info(f'[加载数据]单位组织表')
         items = self.orgs_table()
         # 处理单位组织名称并推送到任务队列
         orgs = []
@@ -73,11 +71,10 @@ class SyncData(BasicService):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[单位组织表]任务队列加载{len(items)}家单位组织')
+        logger.info(f'[单位网址查询]任务队列加载{len(items)}家单位组织')
 
     def sync_seed_urls(self):
         """同步网址数据"""
-        logger.info(f'[加载数据]种子网址表')
         items = self.seed_urls_table()
         lst = []
         for item in items:
@@ -97,11 +94,10 @@ class SyncData(BasicService):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[种子网址表]任务队列加载{len(items)}条种子网址')
+        logger.info(f'[种子寻源]任务队列加载{len(items)}条种子网址')
 
     def sync_competing_goods(self):
         """同步竞品urls"""
-        logger.info(f'[加载数据]竞品网址表')
         items = self.competing_goods_table()
         # 处理竞品urls并推送到任务队列
         lst = []
@@ -127,13 +123,12 @@ class SyncData(BasicService):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[竞品网址表]任务队列加载{len(items)}条网址')
+        logger.info(f'[竞品寻源]任务队列加载{len(items)}条网址')
 
     def data_collector(self):
-        """收录器存放新发现和已拥有的网址域名"""
+        """收录器 - 存放新发现和已拥有的网址域名"""
         if self._init_collector:
-            logger.info(f'[加载数据]收录器')
-            count = 0
+            domains = []
             q = {"param_common.11": {'$exists': True}}
             projection = {'param_common': 1}
             cursor = MGO_LUA_SPIDERS.find(q, projection=projection)
@@ -142,37 +137,47 @@ class SyncData(BasicService):
                 if not is_url(url):
                     continue
                 domain = extract_domain(url)
-                logger.debug(f'[收录器]{domain}')
+                logger.debug(f'[收录器]拉取收录域名特征:{domain}')
+                if domain not in domains:
+                    domains.append(domain)
+
+            count = 0
+            for domain in domains:
+                logger.debug(f'[收录器]更新收录域名特征:{domain}')
                 if not self.collector.data(domain):
+                    logger.debug(f'[收录器]添加收录域名特征:{domain}')
                     self.collector.add_data(domain)
                     count += 1
-            logger.info(f'[收录器]读取{count}条网址域名')
+            logger.info(f'[收录器]加载收录网址特征{count}条')
 
     def data_validator(self):
-        """垃圾池:存放寻源过程中垃圾网址和没有相关信息的网址"""
+        """过滤器 - 存放寻源过程中垃圾网址和没有招投标相关信息的网站"""
         if self._init_validator:
-            logger.info(f'[加载数据]过滤器')
             count = 0
-            cursor = MGO_DATA_GARBAGE.find(projection={'domain': 1})
+            q = {
+                "source": {"$exists": False},  # 来源
+                "domain": {"$type": "string"}
+            }
+            cursor = MGO_GARBAGE.find(q, projection={'domain': 1})
             for item in cursor.sort(self.sort):
-                try:
-                    domain = item['domain']
-                    if not isinstance(domain, str):
-                        MGO_DATA_GARBAGE.delete_one({'_id': item['_id']})
-                        continue
-                except IndexError:
-                    continue
-                logger.debug(f'[过滤器]{domain}')
+                domain = item['domain']
+                logger.debug(f'[过滤器]拉取过滤网址特征:{domain}')
                 if not self.validator.data(domain):
+                    logger.debug(f'[过滤器]更新过滤网址特征:{domain}')
                     self.validator.add_data(domain)
+                    MGO_GARBAGE.update_one(
+                        {'_id': item['_id']},
+                        {'$set': {"source": "other"}}
+                    )
                     count += 1
-            logger.info(f'[过滤器]读取{count}条去重特征')
+            logger.info(f'[过滤器]加载去重特征{count}条')
 
     def start(self):
         """程序入口"""
 
         def _validate():
             """数据过滤"""
+            logger.info('[自动寻源]加载任务过滤模块')
             while True:
                 try:
                     self.data_collector()
@@ -185,6 +190,7 @@ class SyncData(BasicService):
 
         def _keywords():
             """搜索词"""
+            logger.info('[自动寻源]加载搜索词模块')
             while True:
                 if self._allow_load_data:
                     try:
@@ -197,6 +203,7 @@ class SyncData(BasicService):
 
         def _competing_goods():
             """竞品列表"""
+            logger.info('[自动寻源]加载竞品寻源模块')
             while True:
                 if self._allow_load_data:
                     try:
@@ -209,6 +216,7 @@ class SyncData(BasicService):
 
         def _seed_urls():
             """种子url"""
+            logger.info('[自动寻源]加载种子寻源模块')
             while True:
                 if self._allow_load_data:
                     try:
@@ -221,6 +229,7 @@ class SyncData(BasicService):
 
         def _orgs():
             """单位组织"""
+            logger.info('[自动寻源]加载单位网址查询模块')
             while True:
                 if self._allow_load_data:
                     try:

+ 4 - 4
find_source/settings.py

@@ -6,13 +6,13 @@ MGO_DATABASE = 'shujuziyuan'
 '''招投标数据预测结果'''
 Dzr = mongo_table(db=MGO_DATABASE, name='predict_results')
 '''垃圾表'''
-MGO_DATA_GARBAGE = mongo_table(db=MGO_DATABASE, name='data_garbage')
-'''寻源结果表'''
-MGO_DOMAIN = mongo_table(db=MGO_DATABASE, name='data_domains')
+MGO_GARBAGE = mongo_table(db=MGO_DATABASE, name='garbage')
+'''数据挖掘结果表'''
+MGO_DOMAIN = mongo_table(db=MGO_DATABASE, name='data_excavate')
 '''查询结果表'''
 MGO_QUERY = mongo_table(db=MGO_DATABASE, name='data_query')
 ''''数据采集记录表'''
-MGO_RECORDS = mongo_table(db=MGO_DATABASE, name='excavate_records')
+MGO_RECORDS = mongo_table(db=MGO_DATABASE, name='crawl_records')
 '''组织|单位表'''
 MGO_ORGS = mongo_table(db=MGO_DATABASE, name='retrieve_orgs')
 '''搜索词'''