فهرست منبع

添加新爬虫

dongzhaorui 1 سال پیش
والد
کامیت
de24b8d370
2فایلهای تغییر یافته به همراه511 افزوده شده و 0 حذف شده
  1. 285 0
      zgztb_cookie/detail_m.py
  2. 226 0
      zgztb_cookie/zgzbtb_spider_d.py

+ 285 - 0
zgztb_cookie/detail_m.py

@@ -0,0 +1,285 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2021-12-13 13:25:15
+---------
+@summary: firefox爬虫-不处理阿里滑块
+---------
+@author: 马国鹏
+"""
+import io
+import time
+from enum import Enum
+
+import execjs
+import feapder
+import feapder.utils.tools as tools
+from feapder import Item
+from feapder.db.mongodb import MongoDB
+from feapder.network.proxy_pool import swordfish_proxy
+from feapder.network.request import requests
+from feapder.network.response import Response
+from feapder.utils.cleaner import cleaner
+from feapder.utils.log import log
+from tqdm import tqdm
+
+from encode_info import encode_info
+
+# 兆字节,单位:M
+MEGABYTES = 1024 * 1024  # 1M
+
+
+def show_html(business_name, response):
+    path = 'showDetails.js'
+    with open(path, encoding='utf-8') as rp:
+        js_script = rp.read()
+        ctx = execjs.compile(js_script)
+        html = ctx.call('getHtml', business_name, response)
+        return html
+
+
+def get_acw_sc_v2(param):
+    path = 'acw_sc_v2.js'
+    with open(path, encoding='utf-8') as rp:
+        js_script = rp.read()
+        ctx = execjs.compile(js_script)
+        result = ctx.call('l', param)
+        log.info(f"acw_sc_v2 >>> {result}")
+        return result
+
+
+class DataStreamReadStatus(Enum):
+    """数据流读取状态"""
+    NORMAL = 2  # 数据正常接收
+    NULL = 3  # 暂无详情数据
+    NOMATCH = 4  # 没有符合的数据
+    EMPTY = 5  # 非结构化数据内容为空
+    LOSE = 10086  # 文件内容不全
+
+
+class DetailSpider(feapder.AirSpider):
+
+    __custom_setting__ = dict(
+        ITEM_FILTER_ENABLE=False,
+        SPIDER_MAX_RETRY_TIMES=100
+    )
+
+    _to_db = None
+
+    @property
+    def to_db(self):
+        if not self._to_db:
+            self._to_db = MongoDB()
+        return self._to_db
+
+    @property
+    def proxy(self):
+        return swordfish_proxy()
+
+    def get_response(self, request, response):
+        """
+            接收响应体,并设置响应体大小["content-length"]。若超过数据上限则熔断接收流程
+        """
+        title = request.item['title']
+        content_length = 0  # 单位:字节
+        limit = self._data_transmission_limit * MEGABYTES  # 接收数据的大小,单位:M
+        obj = io.BytesIO()
+        with tqdm(desc=title, total=limit, unit='iB', unit_scale=True, unit_divisor=1024) as bar:
+            for r in response.iter_content(chunk_size=MEGABYTES):  # chunk_size 单位:字节
+                n = obj.write(r)
+                content_length += n
+                bar.update(n)
+                if content_length >= limit:
+                    # 接收的数据内容超过上限时,影响后续流程处理,因此添加此熔断条件
+                    break
+
+        response = Response.from_dict(
+            {
+                "url": request.url,
+                "cookies": response.cookies,
+                "_content": obj.getvalue(),
+                "status_code": response.status_code,
+                "elapsed": response.elapsed.microseconds,
+                "headers": {
+                    **response.headers,
+                    "content-length": content_length
+                }
+            }
+        )
+        return response
+
+    def unpack_large_content(self, response):
+        if self._data_transmission_limit <= 3:
+            # 数据的内容越大(3M以上)首次解码耗时越长,且解码时会将无法识别的字符转换成替换字符
+            text = response.text
+        else:
+            text = response.content.decode(errors='ignore')
+        return text
+
+    def unpack_large_json(self, response):
+        if self._data_transmission_limit <= 3:
+            resp_json = response.json
+        else:
+            body = response.content.decode(errors='ignore')
+            resp_json = tools.get_json(body)
+        return resp_json
+
+    def extract_html(self, request, response):
+        business_keyword = request.item['businessKeyWord']
+        content_length_limit = self._data_transmission_limit * MEGABYTES
+        upper_limit = response.headers['content-length'] > content_length_limit
+        if not upper_limit:
+            # 情况2.1:结构化数据,直接提取数据
+            resp_json = self.unpack_large_json(response)
+            try:
+                data_lst = resp_json['object'][business_keyword]
+                if isinstance(data_lst, list) and len(data_lst) == 0:
+                    # 暂无详情的数据,调用js拼接时因json中不存在关键的分类名称而导致TypeError错误;
+                    # 置空分类名称不影响js拼接,js脚本会自动处理内容分类
+                    business_keyword = ''
+            except Exception:
+                # 该项目发生变更会导致分类名称发生变更或者返回没有符合的数据
+                pass
+        else:
+            # 情况2.2:非结构化数据
+            if business_keyword == 'openBidRecord':
+                return None, DataStreamReadStatus.LOSE
+
+            html = self.unpack_large_content(response)
+            # 模糊查询结果,返回的数据内容是按照时间降序排列
+            content = tools.get_info(html, '\"object\":({.*?}),', fetch_one=True)
+            content = ":".join(content.split(':')[1:])[1:]  # [{...} -> {...}
+            if not content:
+                return None, DataStreamReadStatus.NULL
+            elif not content.endswith('}'):
+                # raise EOFError('content 不是以"}"结尾,文件内容不全,丢弃')
+                return None, DataStreamReadStatus.LOSE
+            else:
+                ret = tools.repair_json(content)
+                resp_json = {
+                    "message": "",
+                    "success": True,
+                    "object": {business_keyword: [ret]}
+                }
+
+        html = show_html(business_keyword, resp_json)
+        return html, DataStreamReadStatus.NORMAL
+
+    def start_callback(self):
+        self._data_transmission_limit = 1  # 数据传输内容接收上限,单位:M, 建议不要超过3M
+        self._proxies = None  # 全局代理
+        self._cookies = None  # 全局浏览器信息
+        self._coll_name = 'data_bak'  # 生产表
+        self._task_coll_name = 'zgzb_list'  # 任务表
+        self._ignore = ['_id', 'type', 'businessKeyWord', 'rowGuid']
+
+    def start_requests(self):
+        self._proxies = self.proxy
+        q = {"type": "0", "timeout": None}
+        task_lst = self.to_db.find(self._task_coll_name, q, sort={"_id": -1}, limit=100)
+        for item in task_lst:
+            schemaversion = item.pop("schemaVersion")
+            try:
+                businessid, tenderprojectcode, _ = item['href'].split("&")
+            except ValueError:
+                continue
+
+            data = {
+                "schemaVersion": schemaversion,
+                "businessKeyWord": item['businessKeyWord'],
+                "tenderProjectCode": encode_info(tenderprojectcode),
+                "businessObjectName": item['title'],
+                "businessId": encode_info(businessid),
+            }
+            meta = {
+                'uuid': businessid + tenderprojectcode,
+                '_id': item['_id'],
+                'msg': f"{item['channel']} - {item['title']} - {item['publishtime']}"
+            }
+            yield feapder.Request(data=data, meta=meta, item=item, count=0)
+
+    def download_midware(self, request):
+        request.url = 'http://www.cebpubservice.com/ctpsp_iiss/SecondaryAction/findDetails.do'
+        request.proxies = self._proxies
+        request.method = "POST"
+        request.timeout = 60
+        request.use_session = True
+        request.headers = {
+            'Host': 'www.cebpubservice.com',
+            'Accept': 'application/json, text/javascript, */*; q=0.01',
+            'X-Requested-With': 'XMLHttpRequest',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
+            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+            'Origin': 'http://www.cebpubservice.com',
+            'Referer': 'http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/showDetails.do',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+        }
+
+    def exception_request(self, request, response, e):
+        err_types = (requests.exceptions.ConnectionError,
+                     requests.exceptions.ChunkedEncodingError,
+                     requests.exceptions.Timeout)
+        if isinstance(e, err_types):
+            self._proxies = self.proxy
+            if request._session is not None:
+                request.session.cookies.clear_session_cookies()
+
+    def parse(self, request, response):
+        task_item = request.item
+        meta = request.meta
+        msg = meta['msg']
+        href = f"http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/showDetails.do#uuid={meta['uuid']}"
+        task_item["href"] = href
+        request = request.copy()  # 复制request实例,复用session会话
+        response = self.get_response(request, response)  # 接收数据
+        if response.is_html:
+            html = response.text
+            # 情况1:静态页,处理反爬
+            arg1 = tools.get_info(html, "arg1='(\w+)';", fetch_one=True)
+            if arg1 != '':
+                # 情况1.1、acw_2
+                acw_sc_v2 = get_acw_sc_v2(arg1)
+                request.session.cookies.update({'acw_sc__v2': acw_sc_v2})
+                self._cookies = request.session.cookies.get_dict()
+                time.sleep(1)  # 服务器缓存时间
+                request.count = 0
+                yield request
+            else:
+                # 清cookies切代理
+                request.session.cookies.clear_session_cookies()
+                self._proxies = self.proxy
+                yield request
+
+        else:
+            contenthtml, state = self.extract_html(request, response)
+            # 删除页面中的图片或者base64
+            contenthtml = tools.re.sub('data:image/(.*?)["|\']', '', contenthtml) if contenthtml else ''
+            task_item["contenthtml"] = contenthtml
+            # 源码清洗
+            special = {
+                '<\!\[cdata[^>]*>|<?cdata [^>]*>': '',
+                '</body[^>]*>|]]>': '',
+            }
+            detail = cleaner(contenthtml, special=special)
+            task_item["detail"] = detail
+            # 汉字数量检查
+            flag = "false" if tools.chinese_character(detail).total >= 20 else "true"
+            task_item["sendflag"] = flag
+            # 保存data_bak
+            data = {k: v for k, v in task_item.items() if k not in self._ignore}
+            data['comeintime'] = tools.ensure_int64(tools.get_current_timestamp())
+            item = Item(**data)
+            item.table_name = self._coll_name
+            yield item
+            log.info(f"{msg} --上传成功,状态:{state.value}")
+            # 更新任务表
+            item = Item(timeout=state.value, _id=meta['_id'])
+            item.table_name = self._task_coll_name
+            yield item.to_UpdateItem()
+            log.debug(f"{msg} --采集完成")
+
+
+if __name__ == "__main__":
+    while True:
+        spider = DetailSpider(thread_count=1)
+        spider.start()
+        spider.join()

+ 226 - 0
zgztb_cookie/zgzbtb_spider_d.py

@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-08-11
+---------
+@summary: 中国招标投标公共服务平台-列表页 - 补采
+---------
+@author: dzr
+"""
+import datetime
+from collections import namedtuple
+
+import feapder
+import feapder.utils.tools as tools
+from feapder import Item
+from feapder.network.proxy_pool import swordfish_proxy
+from feapder.network.request import requests
+from feapder.utils.log import log as logger
+from feapder.utils.tools import json
+
+Menu = namedtuple('Menu', ['channel', 'code', 'type', 'businessKeyWord'])
+
+
+class ListPageSpider(feapder.AirSpider):
+
+    @property
+    def proxy(self):
+        return swordfish_proxy()
+
+    @staticmethod
+    def extract_address(region):
+        if region:
+            args = region.split(' ')
+            if len(args) == 2:
+                area, city = args
+            elif len(args) == 1:
+                area, city = args[0], ''
+            else:
+                area, city, *argi = args
+        else:
+            area, city = '全国', ''
+
+        area = area.strip().replace('省', '').replace('市', '')
+        city = city.strip()
+        return area, city
+
+    def start_callback(self):
+        self._task_coll_name = 'zgzb_list'
+        self._proxies = None
+
+    def visit_day_lst(self, start_ts, days):
+        day_lst = []
+        for diff_v in range(1, days + 1):
+            start_dt = datetime.date.fromtimestamp(start_ts)
+            sdt = (start_dt - datetime.timedelta(days=diff_v))
+            edt = (start_dt - datetime.timedelta(days=diff_v - 1))
+            day_lst.append((sdt.strftime("%Y-%m-%d"), edt.strftime("%Y-%m-%d")))
+
+        # 对日期升序排列
+        day_lst.sort(key=lambda x: x[0])
+        return day_lst
+
+    def start_requests(self):
+        self._proxies = self.proxy
+        task_menus = [
+            Menu('未按数据规范-招标公告', 'a_zgzbtbggfwpt_wasjgf_zbgg', '招标公告', 'tenderBulletin'),
+            Menu('未按数据规范-开标记录', 'a_zgzbtbggfwpt_wasjgf_kbjl', '开标记录', 'openBidRecord'),
+            Menu('未按数据规范-评标公示', 'a_zgzbtbggfwpt_wasjgf_pbgs', '评标公示', 'winCandidateBulletin'),
+            Menu('未按数据规范-中标公告', 'a_zgzbtbggfwpt_wasjgf_zhbgg', '中标公告', 'winBidBulletin'),
+        ]
+        page_size = 1000
+        start_ts = tools.get_current_timestamp() - 24 * 3600  # 开始时间
+        days = 3  # 补采天数
+
+        for start_day, end_day in self.visit_day_lst(start_ts, days):
+            for menu in task_menus:
+                business_type = menu.type
+                for page in range(1, 10):
+                    data = {
+                        'searchName': '',
+                        'searchArea': '',
+                        'searchIndustry': '',
+                        'centerPlat': '',
+                        'businessType': business_type,
+                        'searchTimeStart': '',  # 起始时间
+                        'searchTimeStop': '',  # 终止时间
+                        'timeTypeParam': '',
+                        'bulletinIssnTime': '',
+                        'bulletinIssnTimeStart': '',  # 起始时间
+                        'bulletinIssnTimeStop': '',  # 终止时间
+                        'pageNo': page,
+                        'row': page_size,
+                    }
+                    if business_type in ['开标记录', '评标公示', '中标公告']:
+                        data['searchTimeStart'] = start_day
+                        data['searchTimeStop'] = end_day
+                    else:
+                        data['bulletinIssnTimeStart'] = start_day
+                        data['bulletinIssnTimeStop'] = end_day
+
+                    meta = {
+                        'msg': f'{business_type}-第{page}页',
+                        'day': start_day,
+                        'interval': 1,  # 切换代理间隔时长
+                    }
+                    yield feapder.Request(data=data, meta=meta, menu=menu)
+
+    def download_midware(self, request):
+        request.url = 'http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getStringMethod.do'
+        request.proxies = self._proxies
+        request.method = 'POST'
+        request.timeout = 5
+        request.headers = {
+            'Accept': 'application/json, text/javascript, */*; q=0.01',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,sq;q=0.7',
+            'Cache-Control': 'no-cache',
+            'Connection': 'keep-alive',
+            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
+            'Origin': 'http://www.cebpubservice.com',
+            'Pragma': 'no-cache',
+            'X-Requested-With': 'XMLHttpRequest',
+            "Referer": "http://www.cebpubservice.com/ctpsp_iiss/searchbusinesstypebeforedooraction/getSearch.do?tabledivIds=searchTabLi2",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3877.400 QQBrowser/10.8.4506.400"
+        }
+
+    def validate(self, request, response):
+        day = request.meta['day']
+        msg = request.meta['msg']
+        menu = request.menu
+        resp_json = response.json  # 检测数据接口是否请求成功
+        if not resp_json:
+            logger.info(f'{day}-{msg}-无列表数据')
+            return False
+
+        # 汇总消息
+        data = resp_json.get("object")
+        return_list = data.get("returnlist")
+
+        page = data.get("page")
+        total_page = page.get("totalPage", 0)
+        total_count = page.get("totalCount", 0)
+        page_no = page.get("pageNo", 0)
+        row = page.get("row", 0)
+
+        msg = f"{day}-{menu.channel},超出最大采集页码"
+        if page_no <= total_page:
+            tips = [
+                day,
+                menu.channel,
+                f'共{total_page}页{total_count}/{len(return_list)}条',
+                f'第{page_no}页{row}条',
+            ]
+            msg = "-".join(tips)
+
+        logger.info(msg)
+        return True
+
+    def exception_request(self, request, response, e):
+        msg = request.meta['msg']
+        proxy_errors = (
+            json.decoder.JSONDecodeError,
+            requests.exceptions.ConnectionError
+        )
+        if isinstance(e, proxy_errors):
+            interval = request.meta['interval']
+            logger.warning(f"{msg}--代理失效,{interval}s后重试...")
+            tools.delay_time(interval)
+            self._proxies = self.proxy
+        else:
+            logger.error(f"{msg}--请求失败")
+            logger.exception(f'异常原因:{e}')
+
+    def parse(self, request, response):
+        menu = request.menu
+        resp_json = response.json
+        items = resp_json.get("object").get("returnlist")
+        for jtme in items:
+            title = jtme.get("businessObjectName")
+            if not title:
+                continue
+
+            businessid = jtme.get("businessId")
+            tenderprojectcode = jtme.get("tenderProjectCode")
+            platf_code = jtme.get("transactionPlatfCode")
+            href = "&".join([businessid, tenderprojectcode, platf_code])
+            publish_time = jtme.get("receiveTime")
+            l_np_publishtime = tools.date_to_timestamp(publish_time, '%Y-%m-%d')
+            region = jtme.get('regionName', '') or ''
+            area, city = self.extract_address(region)
+            item = Item(**{
+                "schemaVersion": jtme.get("schemaVersion"),
+                "type": jtme.get("type"),
+                "businessKeyWord": menu.businessKeyWord,
+                "rowGuid": jtme.get("rowGuid"),
+                "title": title,
+                "href": href,
+                "site": "中国招标投标公共服务平台",
+                "channel": menu.channel,
+                "spidercode": menu.code,
+                "area": area,
+                "city": city,
+                "district": "",
+                "comeintime": tools.ensure_int64(tools.get_current_timestamp()),
+                "publishtime": publish_time,
+                "l_np_publishtime": tools.ensure_int64(l_np_publishtime),
+                "list_comeintime": tools.ensure_int64(tools.get_current_timestamp()),
+                "is_mixed": False,
+                "detail": "",
+                "contenthtml": "",
+                "T": "bidding",
+                "sendflag": "false",
+                "iscompete": True,
+                "_d": "comeintime",
+                "publishdept": "",
+                "infoformat": 1
+            })
+            item.table_name = self._task_coll_name
+            item.unique_key = ('href',)
+            yield item
+            logger.info(f"采集成功--{menu.channel}-{item['title']}-{publish_time}")
+
+    def end_callback(self):
+        logger.info("列表页采集结束")
+
+
+if __name__ == '__main__':
+    ListPageSpider(thread_count=5).start()