Browse Source

新增历史数据补采爬虫

dongzhaorui 1 năm trước cách đây
mục cha
commit
03343aa15b
1 tập tin đã thay đổi với 225 bổ sung0 xóa
  1. 225 0
      zgztb_cookie/zgzbtb_spider_history.py

+ 225 - 0
zgztb_cookie/zgzbtb_spider_history.py

@@ -0,0 +1,225 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-08-11
+---------
+@summary: 中国招标投标公共服务平台-列表页
+---------
+@author: dzr
+"""
+import datetime
+from collections import namedtuple
+
+import feapder
+import feapder.utils.tools as tools
+from feapder import Item
+from feapder.network.proxy_pool import swordfish_proxy
+from feapder.network.request import requests
+from feapder.utils.log import log as logger
+from feapder.utils.tools import json
+
+Menu = namedtuple('Menu', ['channel', 'code', 'type', 'businessKeyWord'])
+
+
+class ListPageSpider(feapder.AirSpider):
+
+    @property
+    def proxy(self):
+        return swordfish_proxy()
+
+    @staticmethod
+    def extract_address(region):
+        if region:
+            args = region.split(' ')
+            if len(args) == 2:
+                area, city = args
+            elif len(args) == 1:
+                area, city = args[0], ''
+            else:
+                area, city, *argi = args
+        else:
+            area, city = '全国', ''
+
+        area = area.strip().replace('省', '').replace('市', '')
+        city = city.strip()
+        return area, city
+
+    def start_callback(self):
+        self._task_coll_name = 'zgzb_list'
+        self._proxies = None
+
+    def visit_day_lst(self, start_ts):
+        day_lst = []
+        diff = datetime.date.today() - datetime.date.fromtimestamp(start_ts)
+        for diff_v in range(1, diff.days + 1):
+            sdt = (datetime.date.today() - datetime.timedelta(days=diff_v))
+            edt = (datetime.date.today() - datetime.timedelta(days=diff_v - 1))
+            day_lst.append((sdt.strftime("%Y-%m-%d"), edt.strftime("%Y-%m-%d")))
+
+        # 对日期升序排列
+        day_lst.sort(key=lambda x: x[0])
+        return day_lst
+
+    def start_requests(self):
+        self._proxies = self.proxy
+        task_menus = [
+            Menu('未按数据规范-招标公告', 'a_zgzbtbggfwpt_wasjgf_zbgg', '招标公告', 'tenderBulletin'),
+            Menu('未按数据规范-开标记录', 'a_zgzbtbggfwpt_wasjgf_kbjl', '开标记录', 'openBidRecord'),
+            Menu('未按数据规范-评标公示', 'a_zgzbtbggfwpt_wasjgf_pbgs', '评标公示', 'winCandidateBulletin'),
+            Menu('未按数据规范-中标公告', 'a_zgzbtbggfwpt_wasjgf_zhbgg', '中标公告', 'winBidBulletin'),
+        ]
+        page_size = 1000
+        start_ts = 1672502400  # 开始时间
+
+        for start_day, end_day in self.visit_day_lst(start_ts):
+            for menu in task_menus:
+                business_type = menu.type
+                for page in range(1, 10):
+                    data = {
+                        'searchName': '',
+                        'searchArea': '',
+                        'searchIndustry': '',
+                        'centerPlat': '',
+                        'businessType': business_type,
+                        'searchTimeStart': '',  # 起始时间
+                        'searchTimeStop': '',  # 终止时间
+                        'timeTypeParam': '',
+                        'bulletinIssnTime': '',
+                        'bulletinIssnTimeStart': '',  # 起始时间
+                        'bulletinIssnTimeStop': '',  # 终止时间
+                        'pageNo': page,
+                        'row': page_size,
+                    }
+                    if business_type in ['开标记录', '评标公示', '中标公告']:
+                        data['searchTimeStart'] = start_day
+                        data['searchTimeStop'] = end_day
+                    else:
+                        data['bulletinIssnTimeStart'] = start_day
+                        data['bulletinIssnTimeStop'] = end_day
+
+                    meta = {
+                        'msg': f'{business_type}-第{page}页',
+                        'day': start_day,
+                        'interval': 1,  # 切换代理间隔时长
+                    }
+                    yield feapder.Request(data=data, meta=meta, menu=menu)
+
+    def download_midware(self, request):
+        request.url = 'http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getStringMethod.do'
+        request.proxies = self._proxies
+        request.method = 'POST'
+        request.timeout = 5
+        request.headers = {
+            'Accept': 'application/json, text/javascript, */*; q=0.01',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+            'Cache-Control': 'no-cache',
+            'Connection': 'keep-alive',
+            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+            'Origin': 'http://www.cebpubservice.com',
+            'Pragma': 'no-cache',
+            'X-Requested-With': 'XMLHttpRequest',
+            "Referer": "http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getSearch.do?tabledivIds=searchTabLi2",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3877.400 QQBrowser/10.8.4506.400"
+        }
+
+    def validate(self, request, response):
+        day = request.meta['day']
+        msg = request.meta['msg']
+        menu = request.menu
+        resp_json = response.json  # 检测数据接口是否请求成功
+        if not resp_json:
+            logger.info(f'{day}-{msg}-无列表数据')
+            return False
+
+        # 汇总消息
+        data = resp_json.get("object")
+        return_list = data.get("returnlist")
+
+        page = data.get("page")
+        total_page = page.get("totalPage", 0)
+        total_count = page.get("totalCount", 0)
+        page_no = page.get("pageNo", 0)
+        row = page.get("row", 0)
+
+        msg = f"{day}-{menu.channel},超出最大采集页码"
+        if page_no <= total_page:
+            tips = [
+                day,
+                menu.channel,
+                f'共{total_page}页{total_count}/{len(return_list)}条',
+                f'第{page_no}页{row}条',
+            ]
+            msg = "-".join(tips)
+
+        logger.info(msg)
+        return True
+
+    def exception_request(self, request, response, e):
+        msg = request.meta['msg']
+        proxy_errors = (
+            json.decoder.JSONDecodeError,
+            requests.exceptions.ConnectionError
+        )
+        if isinstance(e, proxy_errors):
+            interval = request.meta['interval']
+            logger.warning(f"{msg}--代理失效,{interval}s后重试...")
+            tools.delay_time(interval)
+            self._proxies = self.proxy
+        else:
+            logger.error(f"{msg}--请求失败")
+            logger.exception(f'异常原因:{e}')
+
+    def parse(self, request, response):
+        menu = request.menu
+        resp_json = response.json
+        items = resp_json.get("object").get("returnlist")
+        for jtme in items:
+            title = jtme.get("businessObjectName")
+            if not title:
+                continue
+
+            businessid = jtme.get("businessId")
+            tenderprojectcode = jtme.get("tenderProjectCode")
+            platf_code = jtme.get("transactionPlatfCode")
+            href = "&".join([businessid, tenderprojectcode, platf_code])
+            publish_time = jtme.get("receiveTime")
+            l_np_publishtime = tools.date_to_timestamp(publish_time, '%Y-%m-%d')
+            region = jtme.get('regionName', '') or ''
+            area, city = self.extract_address(region)
+            item = Item(**{
+                "schemaVersion": jtme.get("schemaVersion"),
+                "type": jtme.get("type"),
+                "businessKeyWord": menu.businessKeyWord,
+                "rowGuid": jtme.get("rowGuid"),
+                "title": title,
+                "href": href,
+                "site": "中国招标投标公共服务平台",
+                "channel": menu.channel,
+                "spidercode": menu.code,
+                "area": area,
+                "city": city,
+                "district": "",
+                "comeintime": tools.ensure_int64(tools.get_current_timestamp()),
+                "publishtime": publish_time,
+                "l_np_publishtime": tools.ensure_int64(l_np_publishtime),
+                "list_comeintime": tools.ensure_int64(tools.get_current_timestamp()),
+                "is_mixed": False,
+                "detail": "",
+                "contenthtml": "",
+                "T": "bidding",
+                "sendflag": "false",
+                "iscompete": True,
+                "_d": "comeintime",
+                "publishdept": "",
+                "infoformat": 1
+            })
+            item.table_name = self._task_coll_name
+            item.unique_key = ('href',)
+            yield item
+            logger.info(f"采集成功--{menu.channel}-{item['title']}-{publish_time}")
+
+    def end_callback(self):
+        logger.info("列表页采集结束")
+
+
+if __name__ == '__main__':
+    ListPageSpider(thread_count=20).start()