Jelajahi Sumber

使用feapder重构列表页爬虫

dongzhaorui 2 tahun lalu
induk
melakukan
d5708c757f
1 mengubah file dengan 153 tambahan dan 150 penghapusan
  1. 153 150
      zgztb_cookie/zgzbtb_spider.py

+ 153 - 150
zgztb_cookie/zgzbtb_spider.py

@@ -1,109 +1,165 @@
 # -*- coding: utf-8 -*-
-#  中国招标投标公共服务平台
-#  @CreatDate    : 4/11/2021 上午 10:04
-#  @Author  : 马国鹏
-import json
-import time
+"""
+Created on 2023-08-11
+---------
+@summary: 中国招标投标公共服务平台-列表页
+---------
+@author: dzr
+"""
 from collections import namedtuple
 
-import requests
-
-from utils.databases import mongo_table, int2long, redis_client
-from utils.log import logger
-from utils.tools import redis_exists, redis_set
+import feapder
+import feapder.utils.tools as tools
+from feapder import Item
+from feapder.network.proxy_pool import swordfish_proxy
+from feapder.network.request import requests
+from feapder.utils.log import log as logger
+from feapder.utils.tools import json
 
 Menu = namedtuple('Menu', ['channel', 'code', 'type', 'businessKeyWord'])
 
 
-def socks_proxy():
-    """剑鱼代理"""
-    url = 'http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch'
-    headers = {"Authorization": 'Basic amlhbnl1MDAxOjEyM3F3ZSFB'}
-    proxy = requests.get(url, headers=headers).json()
-    proxies = proxy.get('data')
-    logger.info(f"切换代理:{proxies}")
-    return proxies
-
-
-def date_to_timestamp(date, time_format="%Y-%m-%d %H:%M:%S"):
-    """
-    @summary:
-    ---------
-    @param date:将"2011-09-28 10:00:00"时间格式转化为时间戳
-    @param time_format:时间格式
-    ---------
-    @result: 返回时间戳
-    """
+class ListPageSpider(feapder.AirSpider):
 
-    timestamp = time.mktime(time.strptime(date, time_format))
-    return int(timestamp)
+    __custom_setting__ = dict(
+        ITEM_FILTER_ENABLE=True,
+        ITEM_FILTER_SETTING=dict(
+            filter_type=5
+        )
+    )
 
+    @property
+    def proxy(self):
+        return swordfish_proxy()
 
-class CebPubServiceListPageSpider:
+    def start_callback(self):
+        self._task_coll_name = 'zgzb_list'
+        self._proxies = None
 
-    def __init__(self):
-        self.menus = [
+    def start_requests(self):
+        self._proxies = self.proxy
+        start_time = '2天'
+        page_size = 1000
+        # today = datetime.date.today()
+        # end_day = today - datetime.timedelta(days=-3)
+        url = 'http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getStringMethod.do'
+        task_menus = [
             Menu('未按数据规范-招标公告', 'a_zgzbtbggfwpt_wasjgf_zbgg', '招标公告', 'tenderBulletin'),
-            Menu('未按数据规范-开标记录', 'a_zgzbtbggfwpt_wasjgf_kbjl', '开标记录', 'openBidRecord'),
-            Menu('未按数据规范-评标公示', 'a_zgzbtbggfwpt_wasjgf_pbgs', '评标公示', 'winCandidateBulletin'),
-            Menu('未按数据规范-中标公告', 'a_zgzbtbggfwpt_wasjgf_zhbgg', '中标公告', 'winBidBulletin'),
-            # Menu('未按数据规范-签约履行', 'a_zgzbtbggfwpt_wasjgf_qylx', "签约履行", "tenderBulletin"),
-            # Menu('未按数据规范-招标项目', 'a_zgzbtbggfwpt_wasjgf_zbxm', '招标项目',  'tenderProject'), # 已废除
+            # Menu('未按数据规范-开标记录', 'a_zgzbtbggfwpt_wasjgf_kbjl', '开标记录', 'openBidRecord'),
+            # Menu('未按数据规范-评标公示', 'a_zgzbtbggfwpt_wasjgf_pbgs', '评标公示', 'winCandidateBulletin'),
+            # Menu('未按数据规范-中标公告', 'a_zgzbtbggfwpt_wasjgf_zhbgg', '中标公告', 'winBidBulletin'),
         ]
-        self.crawl_list = mongo_table('py_spider', 'zgzb_list')
-        self.r = redis_client()
-        self.url = 'http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getStringMethod.do'
-
-    def request(self, data, headers, **kwargs):
-        msg = kwargs.get('msg')
-        logger.info(f"开始请求{msg}")
-        retries_int = 1
-        while True:
-            try:
-                request_param = dict(
-                    headers=headers,
-                    data=data,
-                    proxies=socks_proxy(),
-                    timeout=5,
-                )
-                response = requests.post(self.url, **request_param)
-                logger.info(f'{msg}--请求成功')
-                response.json()  # 检测数据是否请求成功
-                return response
-            except json.decoder.JSONDecodeError:
-                logger.error(f"{msg}--代理受限,重试...")
-            except requests.exceptions.ConnectionError:
-                logger.warning(f"{msg}--访问频繁,{retries_int}s后重试...")
-                time.sleep(retries_int)
-            except Exception as e:
-                logger.error(f"{msg}--请求失败")
-                logger.exception(f'异常原因:{e}')
-                return
-
-    def parse(self, items, menu):
+        for menu in task_menus:
+            business_type = menu.type
+            for page in range(1, 10):
+                data = {
+                    'searchName': '',
+                    'searchArea': '',
+                    'searchIndustry': '',
+                    'centerPlat': '',
+                    'businessType': business_type,
+                    'searchTimeStart': '',
+                    'searchTimeStop': '',
+                    'timeTypeParam': '',
+                    'bulletinIssnTime': start_time,
+                    'bulletinIssnTimeStart': '',
+                    'bulletinIssnTimeStop': '',
+                    'pageNo': page,
+                    'row': page_size,
+                }
+                meta = {
+                    'msg': f'{business_type}-第{page}页',
+                    'interval': 1,  # 切换代理间隔时长
+                }
+                yield feapder.Request(url, timeout=5, data=data, meta=meta,
+                                      menu=menu)
+
+    def download_midware(self, request):
+        request.proxies = self._proxies
+        request.method = 'POST'
+        request.headers = {
+            'Accept': 'application/json, text/javascript, */*; q=0.01',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+            'Cache-Control': 'no-cache',
+            'Connection': 'keep-alive',
+            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+            'Origin': 'http://www.cebpubservice.com',
+            'Pragma': 'no-cache',
+            'X-Requested-With': 'XMLHttpRequest',
+            "Referer": "http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getSearch.do?tabledivIds=searchTabLi2",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3877.400 QQBrowser/10.8.4506.400"
+        }
+
+    def validate(self, request, response):
+        msg = request.meta['msg']
+        menu = request.menu
+        resp_json = response.json  # 检测数据接口是否请求成功
+        if not resp_json:
+            logger.info(f'{msg}-无列表数据')
+            return False
+
+        # 汇总消息
+        data = resp_json.get("object")
+        total_page = data.get("page").get("totalPage", 0)
+        total_count = data.get("page").get("totalCount", 0)
+        page_no = data.get("page").get("pageNo", 0)
+        row = data.get("page").get("row", 0)
+        items = data.get("returnlist")
+        if page_no < total_page:
+            info = f'{menu.channel}-共{total_page}页-{total_count}条-第{page_no}页-返回{row}条-实际{len(items)}条'
+        else:
+            info = f'{menu.channel},超出最大采集页码'
+        logger.info(info)
+        return True
+
+    def exception_request(self, request, response, e):
+        msg = request.meta['msg']
+        proxy_errors = (
+            json.decoder.JSONDecodeError,
+            requests.exceptions.ConnectionError
+        )
+        if isinstance(e, proxy_errors):
+            interval = request.meta['interval']
+            logger.warning(f"{msg}--代理失效,{interval}s后重试...")
+            tools.delay_time(interval)
+            self._proxies = self.proxy
+        else:
+            logger.error(f"{msg}--请求失败")
+            logger.exception(f'异常原因:{e}')
+
+    @staticmethod
+    def extract_address(region):
+        if region:
+            args = region.split(' ')
+            if len(args) == 2:
+                area, city = args
+            elif len(args) == 1:
+                area, city = args[0], ''
+            else:
+                area, city, *argi = args
+        else:
+            area, city = '全国', ''
+        area, city = area.strip().replace('省', ''), city.strip()
+        return area, city
+
+    def parse(self, request, response):
+        menu = request.menu
+        resp_json = response.json
+        items = resp_json.get("object").get("returnlist")
         for jtme in items:
+            title = jtme.get("businessObjectName")
+            if not title:
+                continue
+
             businessid = jtme.get("businessId")
             tenderprojectcode = jtme.get("tenderProjectCode")
             platf_code = jtme.get("transactionPlatfCode")
             href = "&".join([businessid, tenderprojectcode, platf_code])
             publish_time = jtme.get("receiveTime")
-            title = jtme.get("businessObjectName")
-            if not title:
-                continue
-
+            l_np_publishtime = tools.date_to_timestamp(publish_time, '%Y-%m-%d')
             region = jtme.get('regionName', '') or ''
-            if region:
-                args = region.split(' ')
-                if len(args) == 2:
-                    area, city = args
-                elif len(args) == 1:
-                    area, city = args[0], ''
-                else:
-                    area, city, *argi = args
-            else:
-                area, city = '全国', ''
-
-            item = {
+            area, city = self.extract_address(region)
+            item = Item(**{
                 "schemaVersion": jtme.get("schemaVersion"),
                 "type": jtme.get("type"),
                 "businessKeyWord": menu.businessKeyWord,
@@ -113,12 +169,12 @@ class CebPubServiceListPageSpider:
                 "site": "中国招标投标公共服务平台",
                 "channel": menu.channel,
                 "spidercode": menu.code,
-                "area": area.strip().replace('省', ''),
-                "city": city.strip(),
+                "area": area,
+                "city": city,
                 "district": "",
-                "comeintime": int2long(int(time.time())),
+                "comeintime": tools.ensure_int64(tools.get_current_timestamp()),
                 "publishtime": publish_time,
-                "l_np_publishtime": int2long(date_to_timestamp(publish_time, '%Y-%m-%d')),
+                "l_np_publishtime": tools.ensure_int64(l_np_publishtime),
                 "detail": "",
                 "contenthtml": "",
                 "T": "bidding",
@@ -127,68 +183,15 @@ class CebPubServiceListPageSpider:
                 "_d": "comeintime",
                 "publishdept": "",
                 "infoformat": 1
-            }
-            if not redis_exists(href, self.r):
-                self.crawl_list.insert_one(item)
-                redis_set(href, self.r)
-                msg = f"{item['title']} - {publish_time}"
-                logger.info(f"{menu.channel} >>> {msg} --上传成功")
-
-    def start(self):
-        headers = {
-            'Accept': 'application/json, text/javascript, */*; q=0.01',
-            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
-            'Cache-Control': 'no-cache',
-            'Connection': 'keep-alive',
-            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
-            'Origin': 'http://www.cebpubservice.com',
-            'Pragma': 'no-cache',
-            'X-Requested-With': 'XMLHttpRequest',
-            "Referer": "http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getSearch.do?tabledivIds=searchTabLi2",
-            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3877.400 QQBrowser/10.8.4506.400"
-        }
-        start_time = '2天'
-        page_size = 1000
+            })
+            item.table_name = self._task_coll_name
+            item.unique_key = ('href',)
+            yield item
+            logger.info(f"采集成功--{menu.channel}-{item['title']}-{publish_time}")
 
-        # today = datetime.date.today()
-        # end_day = today - datetime.timedelta(days=-3)
-        for menu in self.menus:
-            business_type = menu.type
-            for page in range(1, 10):
-                msg = f'{business_type}-第{page}页'
-                data = {
-                    'searchName': '',
-                    'searchArea': '',
-                    'searchIndustry': '',
-                    'centerPlat': '',
-                    'businessType': business_type,
-                    'searchTimeStart': '',
-                    'searchTimeStop': '',
-                    'timeTypeParam': '',
-                    'bulletinIssnTime': start_time,
-                    'bulletinIssnTimeStart': '',
-                    'bulletinIssnTimeStop': '',
-                    'pageNo': page,
-                    'row': page_size,
-                }
-                response = self.request(data, headers, msg=msg)
-                if not response:
-                    logger.info(f'{msg}-接口无数据')
-                    break
-
-                resp_json = response.json()
-                items = resp_json.get("object")
-                if not items:
-                    logger.info(f'{msg}-无列表数据')
-                    break
-
-                total_page = items.get("page").get("totalPage", 0)
-                logger.info(f'{business_type}-共{total_page}页')
-                return_list = items.get("returnlist")
-                logger.info(f"{msg}-采集{len(return_list)}条数据")
-                self.parse(return_list, menu)
+    def end_callback(self):
         logger.info("列表页采集结束")
 
 
 if __name__ == '__main__':
-    CebPubServiceListPageSpider().start()
+    ListPageSpider(thread_count=1).start()