Browse Source

重构采集逻辑

dongzhaorui 2 years ago
parent
commit
c4105750a0
1 changed files with 89 additions and 116 deletions
  1. 89 116
      zgztb_cookie/detail_firefox.py

+ 89 - 116
zgztb_cookie/detail_firefox.py

@@ -11,6 +11,7 @@ sys.path.append('/mnt/FworkSpider')
 
 import io
 import time
+from enum import Enum
 
 import execjs
 from tqdm import tqdm
@@ -22,7 +23,9 @@ from encode_info import encode_info
 from feapder.db.mongodb import MongoDB
 from feapder.network.proxy_pool import swordfish_proxy
 from feapder.network.request import requests
+from feapder.network.response import Response
 from feapder.utils.cleaner import cleaner
+from feapder.utils.data_process import get_json
 from utils.log import logger as log
 
 # 兆字节,单位:M
@@ -48,6 +51,15 @@ def get_acw_sc_v2(param):
         return result
 
 
+class DataStreamReadStatus(Enum):
+    """数据流读取状态"""
+    NORMAL = 2  # 数据正常接收
+    NULL = 3  # 暂无详情数据
+    NOMATCH = 4  # 没有符合的数据
+    EMPTY = 5  # 非结构化数据内容为空
+    LOSE = 10086  # 文件内容不全
+
+
 class DetailSpider(feapder.AirSpider):
     cookie_pool = WebCookiePool(redis_key='zgztbcookie',
                                 page_url="http://www.cebpubservice.com/ctpsp_iiss/SecondaryAction/findDetails.do")
@@ -65,22 +77,13 @@ class DetailSpider(feapder.AirSpider):
         return swordfish_proxy()
 
     def start_callback(self):
-        self._data_transmission_limit = 2  # 数据传输内容大小上限,单位:M
+        self._data_transmission_limit = 15  # 数据传输内容接收上限,单位:M, 建议不要超过3M
         self._proxies = None  # 全局代理
         self._cookies = None  # 全局浏览器信息
 
-        # 采集任务处理结果的状态标识
-        self.extract_state = {
-            0: (2, "数据正常"),
-            1: (3, "暂无详情数据"),
-            2: (4, "没有符合的数据"),
-            3: (5, "非结构化数据内容为空"),
-            4: (10086, "文件内容不全"),
-        }
-
     def start_requests(self):
         task_lst = self.to_db.find(self.db_name,
-                                   {"type": "0", "timeout": None, 'title':'(公告名称)红原县绿色产业园牦牛现代农业园区基础设施建设项目 (二期)'},
+                                   {"type": "0", "timeout": None},
                                    sort={"_id": -1},
                                    limit=100)
         self._proxies = self.proxy
@@ -135,12 +138,10 @@ class DetailSpider(feapder.AirSpider):
         msg = meta['msg']
         href = f"http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/showDetails.do#uuid={meta['uuid']}"
         item["href"] = href
-        # 复制request对象,复用session会话
-        request = request.copy()
-        # 接收数据段
-        self.receiving_response_body(request, response)
-        html = response.text  # 数据的内容越大(10M及以上)转码耗时越长,偶尔会将无法识别的字符转换成替换字符
+        request = request.copy()  # 复制request实例,复用session会话
+        response = self.get_response(request, response)  # 接收数据
         if response.is_html:
+            html = response.text
             # 情况1:静态页,处理反爬
             arg1 = tools.get_info(html, "arg1='(\w+)';", fetch_one=True)
             if arg1 != '':
@@ -155,33 +156,30 @@ class DetailSpider(feapder.AirSpider):
                 # 情况1.2、acw_3
                 self.ali_robots(request)
                 if request.count > 4:
-                    log.info(f'阿里人机验证失败,尝试次数:{request.count}')
+                    log.error(f'阿里人机验证失败,尝试次数:{request.count}')
                     return
                 request.count += 1
                 yield request
         else:
-            sendflag = "true"
-            # 情况2:json,数据结构化处理
-            resp_json, ret = self.get_json_data(request, response)
-            contenthtml, state = self.extract_html(request, resp_json, ret)
-            detail = None
-            if contenthtml:
-                special = {
-                    '<\!\[cdata[^>]*>|<?cdata [^>]*>': '',
-                    '</body[^>]*>|]]>': '',
-                }
-                detail = cleaner(contenthtml, special=special)
-            # 汉字数量检查
-            if tools.chinese_character(detail).total >= 20:
-                sendflag = "false"
-                state = 0
-            # 数据结构化
-            item["sendflag"] = sendflag
+            contenthtml, state = self.extract_html(request, response)
             item["contenthtml"] = contenthtml or ''
+
+            # 源码清洗
+            special = {
+                '<\!\[cdata[^>]*>|<?cdata [^>]*>': '',
+                '</body[^>]*>|]]>': '',
+            }
+            detail = cleaner(contenthtml, special=special) if contenthtml else None
             item["detail"] = detail or ''
+
+            # 汉字数量检查
+            flag = "false" if tools.chinese_character(detail).total >= 20 else "true"
+            item["sendflag"] = flag
+
             # 更新采集任务状态
-            update_data = {"timeout": self.extract_state[state][0]}
+            update_data = {"timeout": state.value}
             self.to_db.update(self.db_name, update_data, {"_id": meta['_id']})
+
             # 数据推送生产库
             ignore = ['_id', 'type', 'businessKeyWord']
             insert = {k: v for k, v in item.items() if k not in ignore}
@@ -189,6 +187,38 @@ class DetailSpider(feapder.AirSpider):
             self.to_db.add("data_bak", insert)
             log.info(f"{msg}--采集成功")
 
+    def get_response(self, request, response):
+        """
+            接收响应体,并设置响应体大小["content-length"]。若超过数据上限则熔断接收流程
+        """
+        title = request.item['title']
+        content_length = 0  # 单位:字节
+        limit = self._data_transmission_limit * MEGABYTES  # 接收数据的大小,单位:M
+        obj = io.BytesIO()
+        with tqdm(desc=title, total=limit, unit='iB', unit_scale=True, unit_divisor=1024) as bar:
+            for r in response.iter_content(chunk_size=MEGABYTES):  # chunk_size 单位:字节
+                n = obj.write(r)
+                content_length += n
+                bar.update(n)
+                if content_length >= limit:
+                    # 接收的数据内容超过上限时,影响后续流程处理,因此添加此熔断条件
+                    break
+
+        response = Response.from_dict(
+            {
+                "url": request.url,
+                "cookies": response.cookies,
+                "_content": obj.getvalue(),
+                "status_code": response.status_code,
+                "elapsed": response.elapsed.microseconds,
+                "headers": {
+                    **response.headers,
+                    "content-length": content_length
+                }
+            }
+        )
+        return response
+
     def ali_robots(self, request, limit=3):
         """
         阿里无感机器人验证
@@ -212,96 +242,39 @@ class DetailSpider(feapder.AirSpider):
             self._proxies = proxies  # 全局代理
             break
 
-    def receiving_response_body(self, request, response):
-        """
-            接收响应体,并设置响应体大小["content-length"]。若超过数据上限则熔断接收流程
-        """
-        title = request.item['title']
-        content_length = 0  # 单位:字节
-        total = self._data_transmission_limit * MEGABYTES  # 接收数据的大小,单位:M
-        obj = io.BytesIO()
-        kw = dict(desc=title, total=total, unit='iB', unit_scale=True, unit_divisor=1024)
-        with tqdm(**kw) as bar:
-            for r in response.iter_content(chunk_size=MEGABYTES): # chunk_size 单位:字节
-                n = obj.write(r)
-                bar.update(n)
-                content_length += n
-                if content_length >= total:
-                    # 接收的数据内容超过上限时,影响后续流程处理,因此添加此熔断条件
-                    break
-
-        response.content = obj.getvalue()
-        response.headers['content-length'] = content_length
-
-    def get_json_data(self, request, response):
+    def extract_html(self, request, response):
         business_keyword = request.item['businessKeyWord']
         content_length_limit = self._data_transmission_limit * MEGABYTES
-        is_overload = response.headers['content-length'] > content_length_limit
-        resp_json = {}
-        if not is_overload:
-            # 情况2.1:非大json数据流,直接提取数据
+        upper_limit = response.headers['content-length'] > content_length_limit
+        if not upper_limit:
+            # 情况2.1:结构化数据,直接提取数据
             resp_json = response.json
-            data = resp_json.get("object")
         else:
-            # 情况2.2:大json数据流,非结构化数据清洗、加工、结构化数据
-            html = response.text
-            # content = tools.get_info(html, '\"object\":({.*?}),', fetch_one=True)
-            # content = tools.get_info(html, '\"object\":({.*?})(,| )', fetch_one=True)
-            content = tools.get_info(html, '\"object\":({.*| |(?:[^{}])*})', fetch_one=True)
+            # 情况2.2:非结构化数据
+            if self._data_transmission_limit <= 3:
+                # 数据的内容越大(3M以上)首次解码耗时越长,且解码时会将无法识别的字符转换成替换字符
+                html = response.text
+            else:
+                html = response.content.decode(errors='ignore')
+
+            # 模糊查询结果,返回的数据内容是按照时间降序排列
+            content = tools.get_info(html, '\"object\":({.*?}),', fetch_one=True)
             content = ":".join(content.split(':')[1:])[1:]  # [{...} -> {...}
             if not content:
-                data = "无数据"
+                return None, DataStreamReadStatus.NULL
             elif not content.endswith('}'):
-                data = "丢弃"
                 # raise EOFError('content 不是以"}"结尾,文件内容不全,丢弃')
+                return None, DataStreamReadStatus.LOSE
             else:
-                literal_ret = tools.literal_eval(content)
-                data = {business_keyword: [literal_ret]}
-        return resp_json, data
-
-    def unpack(self, details, data):
-        """对单个字典解包"""
-        if details and data is None:
-            # 存在未知的数据类型关键词 -> amendBulletin
-            # {'message': '', 'success': True, 'object': {'amendBulletin': []}}
-            (key, value), = details.items()
-            data = details[key]
-        return data
-
-    def extract_html(self, request, json_data, data):
-        business_keyword = request.item['businessKeyWord']
-        state = 0
-        resp_json = json_data
-        if not data:
-            state = 1
-            contenthtml = splicing(business_keyword, resp_json)
-        elif data == '没有符合的数据':
-            state = 2
-            contenthtml = None
-        elif data == '无数据':
-            state = 3
-            contenthtml = None
-        elif data == '丢弃':
-            state = 4
-            contenthtml = None
-        else:
-            # 获取json详情信息
-            details = data.get(business_keyword)
-            details = self.unpack(data, details)
-            if business_keyword == "tenderProject":
-                #  response.json = {'message': '', 'success': True, 'object': {'tenderProject': []}}
-                #  data = {'tenderProject': []}
-                contenthtml = splicing(business_keyword, resp_json)
-            elif business_keyword == "openBidRecord":
-                contenthtml = splicing(business_keyword, resp_json)
-            else:
-                if not details:
-                    state = 1
-                    detail_items = {}
-                else:
-                    detail_items = details[0]
-                contenthtml = detail_items.get("bulletinContent")
-        return contenthtml, state
+                ret = get_json(content)
+                resp_json = {
+                    "message": "",
+                    "success": True,
+                    "object": {business_keyword: [ret]}
+                }
+
+        html = splicing(business_keyword, resp_json)
+        return html, DataStreamReadStatus.NORMAL
 
 
 if __name__ == "__main__":