dongzhaorui пре 1 година
родитељ
комит
c9c4bfc347
1 измењених фајлова са 116 додато и 94 уклоњено
  1. 116 94
      qlm/source_qianlima.py

+ 116 - 94
qlm/source_qianlima.py

@@ -1,4 +1,5 @@
-# coding: utf-8
+# -*- coding: utf-8 -*-
+
 import datetime
 import json
 import math
@@ -15,14 +16,13 @@ from utils.tools import sha1
 
 qlm = mongo_table('qlm', 'data_merge')
 r = redis_client()
-redis_key = "qianlima_2024"
+redis_key = 'qianlima_2024'
 
 session = requests.session()
-# proxies = {
-#     'http': 'socks5://123.101.64.83:8861',
-#     'https': 'socks5://123.101.64.83:8861'
-# }
-proxies = None
+proxies = {
+    'http': 'socks5://119.3.159.234:8860',
+    'https': 'socks5://119.3.159.234:8860',
+}
 
 '''
 https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0
@@ -34,7 +34,7 @@ https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=
 '''
 
 
-class AccountLoginExpirationError(Exception):
+class AccountViolationRiskError(Exception):
     pass
 
 
@@ -54,12 +54,12 @@ def send_wechat_warning(msg, send=True):
     logger.info(response.json())
 
 
-def get_today_of_day(offset, fmt="%Y-%m-%d"):
+def get_today_of_day(offset, fmt='%Y-%m-%d'):
     date = datetime.datetime.now() + datetime.timedelta(days=int(offset))
     return date.strftime(fmt)
 
 
-def crawl_request(url, data, retries=5):
+def request(url, data, retries=5):
     global session, cookies, proxies
     resp, msg = None, ''
     usages, usages_521 = 0, 1
@@ -77,69 +77,80 @@ def crawl_request(url, data, retries=5):
                     success, _, cookies = http_session_521(session, url, headers, cookies, data=data, proxies=proxies)
                     if success:
                         break
-                    msg = f"反爬破解失败,次数:{usages_521}"
+                    msg = f'反爬破解失败,次数:{usages_521}'
                     logger.warning(msg)
                     time.sleep(1)
                     usages_521 += 1
                 usages += 1
             elif resp.status_code in [401, 403, 404]:
-                msg = f"账号登录已失效或封停,异常状态码:{resp.status_code}"
+                msg = f'账号登录已失效或封停,异常状态码:{resp.status_code}'
+                logger.error(msg)
+                break
+            elif resp.status_code in [429]:
+                msg = f'图形验证,异常状态码:{resp.status_code}'
                 logger.error(msg)
+                logger.warning(resp.content.decode())
                 break
             elif str(resp.status_code).startswith('4'):
-                msg = f"公网IP被封禁,异常状态码:{resp.status_code}"
+                msg = f'公网IP被封禁,异常状态码:{resp.status_code}'
                 logger.error(msg)
                 break
             else:
                 break
         except requests.RequestException as e:
-            msg = f"访问失败,原因:{e.__class__.__name__}"
+            msg = f'访问失败,原因:{e.__class__.__name__}'
             logger.error(msg)
             usages += 1
     return resp, msg
 
 
-def spider(area: str, type_: int, page: int, **kwargs):
-    request_status = 'failure'  # 资源请求结果状态, 成功=success 失败=failure 停止=stop 封停=disable
+def downloader(begin_date, end_date, category, address, page, page_size):
+    """
 
-    results = []
-    begin_time = kwargs.pop('begin_time')
-    end_time = kwargs.pop('end_time')
-    max_per_page = kwargs.pop('max_per_page', 20)
-
-    url = "https://search.vip.qianlima.com/rest/service/website/search/solr"
-    data = REQUEST_DATA_MAP[type_]
-    data['newAreas'] = area  # 设置地区
-    data['currentPage'] = page  # 页码
-    data['numPerPage'] = max_per_page  # 每页的条目数
+    :param str begin_date: 开始时间,格式:xxxx-xx-xxx
+    :param str end_date: 结束时间,格式:xxxx-xx-xxx
+    :param int category: 栏目编号
+    :param int address: 地区编号
+    :param int page: 页码
+    :param int page_size: 单页数据条数
+
+    """
+    url = 'https://search.vip.qianlima.com/rest/service/website/search/solr'
+    data = REQUEST_DATA_MAP[category]
+    data['newAreas'] = str(address)  # 设置地区
     data['timeType'] = 4  # 自定义时间参数
-    data['beginTime'] = begin_time  # 开始时间,格式:xxxx-xx-xxx
-    data['endTime'] = end_time  # 结束时间,格式:xxxx-xx-xxx
+    data['beginTime'] = begin_date
+    data['endTime'] = end_date
+    data['currentPage'] = page
+    data['numPerPage'] = page_size
     data = json.dumps(data)
-    response, err = crawl_request(url, data)
+
+    response, err = request(url, data)
+    request_status = 'failure'  # 资源请求结果状态, 成功=success 失败=failure 停止=stop 封停=disable
     if response is None:
         request_status = 'server_error'
         return request_status, err
 
+    results = []
     row_count = 0
     if response.status_code == 200:
         resp_json = response.json()
         if resp_json['code'] == 200:
-            row_count = resp_json["data"]["rowCount"]
-            items = resp_json["data"]["data"]
+            row_count = resp_json['data']['rowCount']
+            items = resp_json['data']['data']
             for item in items:
-                cid = sha1(str(item["contentid"]))
+                cid = sha1(str(item['contentid']))
                 if not r.hexists(redis_key, cid):
                     r.hset(redis_key, cid, '')
-                    if "popTitle" in item:
-                        item["title"] = item["popTitle"]
+                    if 'popTitle' in item:
+                        item['title'] = item['popTitle']
                     else:
-                        item["title"] = item["showTitle"]
+                        item['title'] = item['showTitle']
 
-                    addr = str(item["areaName"]).split('-')
-                    _area = addr[0] if len(addr) > 0 else ''
-                    _city = addr[1] if len(addr) > 1 else ''
-                    if "国土" in item.get('progName', ''):
+                    addr = str(item['areaName']).split('-')
+                    area_ = addr[0] if len(addr) > 0 else ''
+                    city_ = addr[1] if len(addr) > 1 else ''
+                    if '国土' in item.get('progName', ''):
                         channel = item['progName']
                     else:
                         channel = (item['noticeSegmentTypeName'] or item['progName'])
@@ -147,15 +158,18 @@ def spider(area: str, type_: int, page: int, **kwargs):
                     results.append({
                         'site': '千里马',
                         'channel': channel,
-                        'area': _area,
-                        'city': _city,
-                        'title': item["title"],
+                        'area': area_,
+                        'city': city_,
+                        'title': item['title'],
                         'publishtime': item['updateTime'],
                         'href': item.get('url', '')
                     })
 
+            if len(results) > 0:
+                qlm.insert_many(results, ordered=False)
+
             request_status = 'success'
-            if len(items) < max_per_page:
+            if len(items) < page_size or len(results) == 0:
                 request_status = 'stop'
         else:
             '''
@@ -169,30 +183,26 @@ def spider(area: str, type_: int, page: int, **kwargs):
             logger.info(err)
     elif response.status_code in [401, 403, 404]:
         request_status = 'disable'
-    elif response.status_code == 405:
+    elif response.status_code in [405]:
         request_status = 'method_not_allowed'
+    elif response.status_code in [429]:
+        request_status = 'captcha_required'
     elif str(response.status_code).startswith('4'):
         request_status = 'client_ip_disable'
 
-    if len(results) > 0:
-        qlm.insert_many(results)
-
     if request_status in ['stop', 'success']:
-        logger.info("{}-{}-{}-共{}条-第{}页,成功上传{}条数据".format(
-            begin_time,
-            city_dict.get(int(area)),
-            channel_dict.get(type_),
-            row_count,
-            page,
-            len(results))
-        )
+        if page == 1:
+            logger.info(f'千里马 {begin_date} 发布 {row_count} 条数据')
+        logger.info(f'入库 {len(results)} 条')
+
     return request_status, err
 
 
-def downloader(area="", type_=0, **kwargs):
+def automatic_pagination(**kwargs):
+    reason = ''  # 采集失败时原因
+
     close_spider = False
     send_warning = False
-    reason = ''
 
     retry_times, max_retries = 0, 3
     pages = list(range(1, 101))  # 目前qlm仅支持查看前10000数据
@@ -203,82 +213,94 @@ def downloader(area="", type_=0, **kwargs):
             break
 
         if send_warning and retry_times > max_retries:
-            # 此处可以添加通知邮件或者企业微信机器人接口,通知采集异常信息
             send_wechat_warning(reason)
             break
 
         page = pages.pop(0)
-        logger.info(f"访问-{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据")
+        logger.info(f'下载第{page}页数据')
         while True:
-            err, reason = spider(area, type_, page, **kwargs)
-            if err in ['server_error', 'client_ip_disable']:
+            err, reason = downloader(page=page, **kwargs)
+            if err in ['server_error', 'client_ip_disable', 'captcha_required']:
                 close_spider = True
                 send_warning = True
             elif err == 'failure':
                 interval = math.log(random.randint(100, 2400), 2)
-                logger.debug(f'异常重试,等待{interval}s')
+                logger.debug(f'等待{interval}s,异常重试...')
                 time.sleep(interval)
                 continue
             elif err == 'disable':
-                logger.warning(f"账号被禁止访问-{city_dict.get(int(area))}-第{page}页数据")
+                logger.warning('账号被禁止访问')
                 retry_times += 1
                 send_warning = True
             elif err == 'method_not_allowed':
-                logger.warning("服务器禁止使用当前 HTTP 方法的请求")
+                logger.warning('服务器禁止使用当前 HTTP 方法的请求')
                 retry_times += 1
                 send_warning = True
             elif err == 'stop':
                 close_spider = True
             else:
-                logger.info(f"{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据采集成功")
                 time.sleep(math.log(random.randint(100, 2400), 2))
             break
 
     if send_warning:
-        raise AccountLoginExpirationError
+        raise AccountViolationRiskError
 
 
-def select_types(date: str, area: str, prov: str):
-    for type_ in [1, 2, 3, 4]:
-        downloader(
-            area=area,
-            type_=type_,
-            begin_time=date,
-            end_time=date,
-            max_per_page=100
+def core(date: str, category: int, address: int, page_size=20):
+    try:
+        automatic_pagination(
+            begin_date=date,
+            end_date=date,
+            category=category,
+            address=address,
+            page_size=page_size  # 每页数据最大条数
         )
-        tips = [
-            f'{date}',
-            f'{channel_dict.get(type_)}',
-            f'{province_dict.get(int(prov))}',
-            '完成采集'
-        ]
-        logger.info(f"+++ {' && '.join(tips)} +++ ")
+        return True
+    except AccountViolationRiskError:
+        return False
 
 
-def select_area(date: str):
-    logger.info(f" +++ 开始采集 - {date} +++")
+def spider(date, page_size=40):
+    logger.info('+++ 采集开始 +++')
+    dates = [date] if not isinstance(date, list) else date
     try:
-        for province in range(1, 32):
-            for city_ in area_dict.get(province):
-                select_types(date, area=str(city_), prov=str(province))
-        return True
-    except AccountLoginExpirationError:
-        return False
+        for date in dates:
+            for category, category_name in channel_dict.items():
+                for area, cities in area_dict.items():
+                    for city in cities:
+                        logger.info(' && '.join([
+                            date,
+                            category_name,
+                            province_dict[area],
+                            city_dict[city]
+                        ]))
+
+                        if len(cities) == 1:
+                            city = area  # 千里马取消了直辖市的分区,直接采集省市区域
+
+                        yield core(date, category, city, page_size=page_size)
+
+    except Exception as e:
+        logger.error(e)
+
+    except KeyboardInterrupt:
+        pass
+
     finally:
-        logger.info(f" +++ 采集结束 - {date} +++")
+        logger.info('+++ 采集结束 +++')
 
 
 def history(date_lst: list):
-    for date in date_lst:
-        success = select_area(date)
-        if not success:
+    for result in spider(date_lst):
+        if not result:
             break
 
 
 def start():
     date = get_today_of_day(-1)
-    select_area(date)
+    for result in spider(date, page_size=100):
+        if not result:
+            break
 
 
 if __name__ == '__main__':