Эх сурвалжийг харах

Merge branch 'master' of https://jygit.jydev.jianyu360.cn/data_spider/match_spider

lizongze 1 жил өмнө
parent
commit
c788869f7e

+ 118 - 50
qlm/source_qianlima.py

@@ -6,6 +6,7 @@ import random
 import time
 
 import requests
+
 from utils.config_parms import *
 from utils.databases import mongo_table, redis_client
 from utils.log import logger
@@ -17,6 +18,11 @@ r = redis_client()
 redis_key = "qianlima_2024"
 
 session = requests.session()
+# proxies = {
+#     'http': 'socks5://123.101.64.83:8861',
+#     'https': 'socks5://123.101.64.83:8861'
+# }
+proxies = None
 
 '''
 https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0
@@ -28,53 +34,80 @@ https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=
 '''
 
 
-def delay_by_day(days, fmt="%Y-%m-%d"):
-    """按天延时"""
-    _days = int(days)
-    _current_now = datetime.datetime.now()
-    return (_current_now + datetime.timedelta(days=_days)).strftime(fmt)
+class AccountLoginExpirationError(Exception):
+    pass
+
+
+def send_wechat_warning(msg, send=True):
+    markdown = f'千里马列表页采集异常,请相关同事注意。'
+    markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
+
+    if not send:
+        logger.info(markdown)
+        return
+
+    url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
+    headers_ = {'Content-Type': 'application/json'}
+    json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}}
+    request_params = dict(headers=headers_, json=json_data, timeout=10)
+    response = requests.post(url, **request_params)
+    logger.info(response.json())
+
+
+def get_today_of_day(offset, fmt="%Y-%m-%d"):
+    date = datetime.datetime.now() + datetime.timedelta(days=int(offset))
+    return date.strftime(fmt)
 
 
 def crawl_request(url, data, retries=5):
-    global session, cookies
-    resp = None
+    global session, cookies, proxies
+    resp, msg = None, ''
     usages, usages_521 = 0, 1
     while usages < retries:
         request_params = {}
         request_params.setdefault('data', data)
         request_params.setdefault('headers', headers)
         request_params.setdefault('cookies', cookies)
+        request_params.setdefault('proxies', proxies)
         request_params.setdefault('timeout', 60)
         try:
             resp = session.post(url, **request_params)
             if resp.status_code == 521:
                 while usages_521 < retries:
-                    success, _, cookies = http_session_521(session, url, headers, cookies, data=data)
+                    success, _, cookies = http_session_521(session, url, headers, cookies, data=data, proxies=proxies)
                     if success:
                         break
-                    logger.warning(f"反爬破解失败,次数:{usages_521}")
+                    msg = f"反爬破解失败,次数:{usages_521}"
+                    logger.warning(msg)
                     time.sleep(1)
                     usages_521 += 1
                 usages += 1
             elif resp.status_code in [401, 403, 404]:
-                logger.error(f"账号登录已失效或封停,异常状态码:{resp.status_code}")
+                msg = f"账号登录已失效或封停,异常状态码:{resp.status_code}"
+                logger.error(msg)
+                break
+            elif str(resp.status_code).startswith('4'):
+                msg = f"公网IP被封禁,异常状态码:{resp.status_code}"
+                logger.error(msg)
                 break
             else:
                 break
         except requests.RequestException as e:
-            logger.error(f"访问失败,失败原因:{e.__class__.__name__}")
+            msg = f"访问失败,原因:{e.__class__.__name__}"
+            logger.error(msg)
             usages += 1
-    return resp
+    return resp, msg
 
 
-def crawl_spider(area: str, type_: int, page: int, **kwargs):
-    results = []
-    request_status = 'failure'  # 资源请求结果, 成功=success 失败=failure 停止=stop 账号封停=disable
+def spider(area: str, type_: int, page: int, **kwargs):
+    request_status = 'failure'  # 资源请求结果状态, 成功=success 失败=failure 停止=stop 封停=disable
 
-    curr_date = delay_by_day(0)
-    begin_time = kwargs.pop('begin_time', curr_date)
-    end_time = kwargs.pop('end_time', curr_date)
+    results = []
+    begin_time = kwargs.pop('begin_time')
+    end_time = kwargs.pop('end_time')
     max_per_page = kwargs.pop('max_per_page', 20)
+
+    url = "https://search.vip.qianlima.com/rest/service/website/search/solr"
     data = REQUEST_DATA_MAP[type_]
     data['newAreas'] = area  # 设置地区
     data['currentPage'] = page  # 页码
@@ -83,10 +116,13 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
     data['beginTime'] = begin_time  # 开始时间,格式:xxxx-xx-xxx
     data['endTime'] = end_time  # 结束时间,格式:xxxx-xx-xxx
     data = json.dumps(data)
-    url = "https://search.vip.qianlima.com/rest/service/website/search/solr"
-    response = crawl_request(url, data)
+    response, err = crawl_request(url, data)
+    if response is None:
+        request_status = 'server_error'
+        return request_status, err
+
     row_count = 0
-    if response is not None and response.status_code == 200:
+    if response.status_code == 200:
         resp_json = response.json()
         if resp_json['code'] == 200:
             row_count = resp_json["data"]["rowCount"]
@@ -103,11 +139,12 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                     addr = str(item["areaName"]).split('-')
                     _area = addr[0] if len(addr) > 0 else ''
                     _city = addr[1] if len(addr) > 1 else ''
-                    if "国土" in item.get('progName',''):
+                    if "国土" in item.get('progName', ''):
                         channel = item['progName']
                     else:
                         channel = (item['noticeSegmentTypeName'] or item['progName'])
-                    res = {
+
+                    results.append({
                         'site': '千里马',
                         'channel': channel,
                         'area': _area,
@@ -115,10 +152,9 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                         'title': item["title"],
                         'publishtime': item['updateTime'],
                         'href': item.get('url', '')
-                    }
-                    results.append(res)
-            request_status = 'success'
+                    })
 
+            request_status = 'success'
             if len(items) < max_per_page:
                 request_status = 'stop'
         else:
@@ -129,11 +165,14 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                 "data": null
             }
             '''
-            logger.info(resp_json['msg'])
-    elif response is not None and response.status_code in [401, 403, 404]:
+            err = resp_json['msg']
+            logger.info(err)
+    elif response.status_code in [401, 403, 404]:
         request_status = 'disable'
-    elif response is not None and response.status_code == 405:
+    elif response.status_code == 405:
         request_status = 'method_not_allowed'
+    elif str(response.status_code).startswith('4'):
+        request_status = 'client_ip_disable'
 
     if len(results) > 0:
         qlm.insert_many(results)
@@ -147,70 +186,99 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
             page,
             len(results))
         )
-    return request_status
+    return request_status, err
 
 
-def by_area_crawl_data(area="", type_=0, **kwargs):
+def downloader(area="", type_=0, **kwargs):
     close_spider = False
-    disable_page, max_disable_page = 0, 3
-    pages = list(range(1, 101))  # 目前仅支持前10000数据的搜索
+    send_warning = False
+    reason = ''
+
+    retry_times, max_retries = 0, 3
+    pages = list(range(1, 101))  # 目前qlm仅支持查看前10000数据
     while len(pages) > 0:
         if close_spider:
+            if send_warning:
+                send_wechat_warning(reason)
             break
-        elif disable_page > max_disable_page:
+
+        if send_warning and retry_times > max_retries:
             # 此处可以添加通知邮件或者企业微信机器人接口,通知采集异常信息
+            send_wechat_warning(reason)
             break
 
         page = pages.pop(0)
         logger.info(f"访问-{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据")
         while True:
-            success = crawl_spider(area, type_, page, **kwargs)
-            if success == 'failure':
+            err, reason = spider(area, type_, page, **kwargs)
+            if err in ['server_error', 'client_ip_disable']:
+                close_spider = True
+                send_warning = True
+            elif err == 'failure':
                 interval = math.log(random.randint(100, 2400), 2)
                 logger.debug(f'异常重试,等待{interval}s')
                 time.sleep(interval)
                 continue
-            elif success == 'disable':
+            elif err == 'disable':
                 logger.warning(f"账号被禁止访问-{city_dict.get(int(area))}-第{page}页数据")
-                disable_page += 1
-            elif success == 'method_not_allowed':
+                retry_times += 1
+                send_warning = True
+            elif err == 'method_not_allowed':
                 logger.warning("服务器禁止使用当前 HTTP 方法的请求")
-                disable_page += 1
-            elif success == 'stop':
+                retry_times += 1
+                send_warning = True
+            elif err == 'stop':
                 close_spider = True
             else:
                 logger.info(f"{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据采集成功")
                 time.sleep(math.log(random.randint(100, 2400), 2))
             break
 
+    if send_warning:
+        raise AccountLoginExpirationError
+
 
 def select_types(date: str, area: str, prov: str):
     for type_ in [1, 2, 3, 4]:
-        by_area_crawl_data(
+        downloader(
             area=area,
             type_=type_,
             begin_time=date,
             end_time=date,
             max_per_page=100
         )
-    logger.info(f"{date}-{province_dict.get(int(prov))}地区-{channel_dict.get(type_)}采集结束")
+        tips = [
+            f'{date}',
+            f'{channel_dict.get(type_)}',
+            f'{province_dict.get(int(prov))}',
+            '完成采集'
+        ]
+        logger.info(f"+++ {' && '.join(tips)} +++ ")
 
 
 def select_area(date: str):
-    for province in range(1, 32):
-        for city_ in area_dict.get(province):
-            select_types(date, area=str(city_), prov=str(province))
-    logger.info(f"任务结束")
+    logger.info(f" +++ 开始采集 - {date} +++")
+    try:
+        for province in range(1, 32):
+            for city_ in area_dict.get(province):
+                select_types(date, area=str(city_), prov=str(province))
+        return True
+    except AccountLoginExpirationError:
+        return False
+    finally:
+        logger.info(f" +++ 采集结束 - {date} +++")
 
 
 def history(date_lst: list):
     for date in date_lst:
-        select_area(date)
+        success = select_area(date)
+        if not success:
+            break
 
 
 def start():
-    date_str = delay_by_day(-1)
-    select_area(date_str)
+    date = get_today_of_day(-1)
+    select_area(date)
 
 
 if __name__ == '__main__':

+ 26 - 214
qlm/source_qianlima_history.py

@@ -1,217 +1,29 @@
-# coding: utf-8
-import datetime
-import json
-import math
-import random
-import time
-
-import requests
-from utils.config_parms import *
-from utils.databases import mongo_table, redis_client
-from utils.log import logger
-from utils.sessions_521 import http_session_521
-from utils.tools import sha1
-
-qlm = mongo_table('qlm', 'data_merge')
-r = redis_client()
-redis_key = "qianlima_2024"
-
-session = requests.session()
-
-'''
-https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0
-搜索-2.0
-1 = 招标信息
-2 = 中标信息
-3 = 拟在建项目
-4 = 审批项目
-'''
-
-
-def delay_by_day(days, fmt="%Y-%m-%d"):
-    """按天延时"""
-    _days = int(days)
-    _current_now = datetime.datetime.now()
-    return (_current_now + datetime.timedelta(days=_days)).strftime(fmt)
-
-
-def crawl_request(url, data, retries=5):
-    global session, cookies1
-    resp = None
-    usages, usages_521 = 0, 1
-    while usages < retries:
-        request_params = {}
-        request_params.setdefault('data', data)
-        request_params.setdefault('headers', headers1)
-        request_params.setdefault('cookies', cookies1)
-        request_params.setdefault('timeout', 60)
-        try:
-            resp = session.post(url, **request_params)
-            if resp.status_code == 521:
-                while usages_521 < retries:
-                    success, _, cookies1 = http_session_521(session, url, headers1, cookies1, data=data)
-                    if success:
-                        break
-                    logger.warning(f"反爬破解失败,次数:{usages_521}")
-                    time.sleep(1)
-                    usages_521 += 1
-                usages += 1
-            elif resp.status_code in [401, 403, 404]:
-                logger.error(f"账号登录已失效或封停,异常状态码:{resp.status_code}")
-                break
-            else:
-                break
-        except requests.RequestException as e:
-            logger.error(f"访问失败,失败原因:{e.__class__.__name__}")
-            usages += 1
-    return resp
-
-
-def crawl_spider(area: str, type_: int, page: int, **kwargs):
-    results = []
-    request_status = 'failure'  # 资源请求结果, 成功=success 失败=failure 停止=stop 账号封停=disable
-
-    curr_date = delay_by_day(0)
-    begin_time = kwargs.pop('begin_time', curr_date)
-    end_time = kwargs.pop('end_time', curr_date)
-    max_per_page = kwargs.pop('max_per_page', 20)
-    data = REQUEST_DATA_MAP[type_]
-    data['newAreas'] = area  # 设置地区
-    data['currentPage'] = page  # 页码
-    data['numPerPage'] = max_per_page  # 每页的条目数
-    data['timeType'] = 4  # 自定义时间参数
-    data['beginTime'] = begin_time  # 开始时间,格式:xxxx-xx-xxx
-    data['endTime'] = end_time  # 结束时间,格式:xxxx-xx-xxx
-    data = json.dumps(data)
-    url = "https://search.vip.qianlima.com/rest/service/website/search/solr"
-    response = crawl_request(url, data)
-    row_count = 0
-    if response is not None and response.status_code == 200:
-        resp_json = response.json()
-        if resp_json['code'] == 200:
-            row_count = resp_json["data"]["rowCount"]
-            items = resp_json["data"]["data"]
-            for item in items:
-                cid = sha1(str(item["contentid"]))
-                if not r.hexists(redis_key, cid):
-                    r.hset(redis_key, cid, '')
-                    if "popTitle" in item:
-                        item["title"] = item["popTitle"]
-                    else:
-                        item["title"] = item["showTitle"]
-
-                    addr = str(item["areaName"]).split('-')
-                    _area = addr[0] if len(addr) > 0 else ''
-                    _city = addr[1] if len(addr) > 1 else ''
-                    if "国土" in item.get('progName', ''):
-                        channel = item['progName']
-                    else:
-                        channel = (item['noticeSegmentTypeName'] or item['progName'])
-                    res = {
-                        'site': '千里马',
-                        'channel': channel,
-                        'area': _area,
-                        'city': _city,
-                        'title': item["title"],
-                        'publishtime': item['updateTime'],
-                        'href': item.get('url', '')
-                    }
-                    results.append(res)
-            request_status = 'success'
-
-            if len(items) < max_per_page:
-                request_status = 'stop'
-        else:
-            '''
-            {
-                "code": 200520,
-                "msg": "抱歉,您在单位时间内的搜索次数已达上限,请联系客服购买会员!咨询电话:400-688-2000",
-                "data": null
-            }
-            '''
-            logger.info(resp_json['msg'])
-    elif response is not None and response.status_code in [401, 403, 404]:
-        request_status = 'disable'
-    elif response is not None and response.status_code == 405:
-        request_status = 'method_not_allowed'
-
-    if len(results) > 0:
-        qlm.insert_many(results)
-
-    if request_status in ['stop', 'success']:
-        logger.info("{}-{}-{}-共{}条-第{}页,成功上传{}条数据".format(
-            begin_time,
-            city_dict.get(int(area)),
-            channel_dict.get(type_),
-            row_count,
-            page,
-            len(results))
-        )
-    return request_status
-
-
-def by_area_crawl_data(area="", type_=0, **kwargs):
-    close_spider = False
-    disable_page, max_disable_page = 0, 3
-    pages = list(range(1, 101))  # 目前仅支持前10000数据的搜索
-    while len(pages) > 0:
-        if close_spider:
-            break
-        elif disable_page > max_disable_page:
-            # 此处可以添加通知邮件或者企业微信机器人接口,通知采集异常信息
-            break
-
-        page = pages.pop(0)
-        logger.info(f"访问-{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据")
-        while True:
-            success = crawl_spider(area, type_, page, **kwargs)
-            if success == 'failure':
-                interval = math.log(random.randint(100, 2400), 2)
-                logger.debug(f'异常重试,等待{interval}s')
-                time.sleep(interval)
-                continue
-            elif success == 'disable':
-                logger.warning(f"账号被禁止访问-{city_dict.get(int(area))}-第{page}页数据")
-                disable_page += 1
-            elif success == 'method_not_allowed':
-                logger.warning("服务器禁止使用当前 HTTP 方法的请求")
-                disable_page += 1
-            elif success == 'stop':
-                close_spider = True
-            else:
-                logger.info(f"{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据采集成功")
-                time.sleep(math.log(random.randint(100, 2400), 2))
-            break
-
-
-def select_types(date: str, area: str, prov: str):
-    for type_ in [1, 2, 3, 4]:
-        by_area_crawl_data(
-            area=area,
-            type_=type_,
-            begin_time=date,
-            end_time=date,
-            max_per_page=100
-        )
-    logger.info(f"{date}-{province_dict.get(int(prov))}地区-{channel_dict.get(type_)}采集结束")
-
-
-def select_area(date: str):
-    for province in range(1, 32):
-        for city_ in area_dict.get(province):
-            select_types(date, area=str(city_), prov=str(province))
-    logger.info(f"任务结束")
-
-
-def history(date_lst: list):
-    for date in date_lst:
-        select_area(date)
-
-
-def start():
-    date_str = "2023-09-25"
-    select_area(date_str)
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: 千里马历史补采
+---------
+@author: Lzz
+"""
+from source_qianlima import history
+
+
+def main():
+    history(
+        date_lst=[
+            '2024-06-26',
+            '2024-06-27',
+            '2024-06-28',
+            '2024-06-29',
+            '2024-06-30',
+            '2024-07-01',
+            '2024-07-02',
+            '2024-07-03',
+            '2024-07-04',
+        ]
+    )
 
 
 if __name__ == '__main__':
-    start()
+    main()

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 3 - 12
qlm/utils/config_parms.py


+ 2 - 2
qlm/utils/log.py

@@ -3,10 +3,10 @@ from pathlib import Path
 from loguru import logger
 
 _absolute = Path(__file__).absolute().parent.parent
-_log_path = (_absolute / 'logs/crawl-{time:YYYY-MM-DD}.log').resolve()
+_log_path = (_absolute / 'logs/{time:YYYYMMDD}.log').resolve()
 logger.add(
     _log_path,
-    format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',
+    format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {thread.name} - {name}:{function}:{line} - {message}',
     level='INFO',
     rotation='00:00',
     retention='1 week',

+ 1 - 1
zbytb/Dockerfile

@@ -21,7 +21,7 @@ RUN grep -qxF 'set encoding=utf8' /etc/virc || echo 'set encoding=utf8' >> /etc/
 
 WORKDIR /opt
 # 安装node, 更换npm源
-RUN curl -fsSL https://rpm.nodesource.com/setup_14.x | bash && yum -y install nodejs && npm config set registry https://registry.npm.taobao.org
+RUN curl -fsSL https://rpm.nodesource.com/setup_14.x | bash && yum -y install nodejs && npm config set registry http://registry.npmmirror.com
 
 # 安装 python3.8.10 gcc相关配置
 RUN yum --exclude=kernel* update -y && yum groupinstall -y 'Development Tools' && yum install -y gcc openssl-devel bzip2-devel libffi-devel gtk3 libXt glibc-common

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно