Forráskód Böngészése

添加机器人告警

dongzhaorui 1 éve
szülő
commit
4441615da0
1 módosított fájl, 74 hozzáadás és 32 törlés
  1. 74 32
      qlm/source_qianlima.py

+ 74 - 32
qlm/source_qianlima.py

@@ -6,6 +6,7 @@ import random
 import time
 
 import requests
+
 from utils.config_parms import *
 from utils.databases import mongo_table, redis_client
 from utils.log import logger
@@ -28,16 +29,34 @@ https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=
 '''
 
 
-def delay_by_day(days, fmt="%Y-%m-%d"):
-    """按天延时"""
-    _days = int(days)
-    _current_now = datetime.datetime.now()
-    return (_current_now + datetime.timedelta(days=_days)).strftime(fmt)
+class AccountLoginExpirationError(Exception):
+    pass
+
+
+def send_wechat_warning(msg, send=True):
+    markdown = f'千里马列表页采集异常,请相关同事注意。'
+    markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
+
+    if not send:
+        logger.info(markdown)
+        return
+
+    url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
+    headers_ = {'Content-Type': 'application/json'}
+    json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}}
+    request_params = dict(headers=headers_, json=json_data, timeout=10)
+    response = requests.post(url, **request_params)
+    logger.info(response.json())
+
+
+def get_today_of_day(offset, fmt="%Y-%m-%d"):
+    date = datetime.datetime.now() + datetime.timedelta(days=int(offset))
+    return date.strftime(fmt)
 
 
 def crawl_request(url, data, retries=5):
     global session, cookies
-    resp = None
+    resp, msg = None, ''
     usages, usages_521 = 0, 1
     while usages < retries:
         request_params = {}
@@ -52,29 +71,31 @@ def crawl_request(url, data, retries=5):
                     success, _, cookies = http_session_521(session, url, headers, cookies, data=data)
                     if success:
                         break
-                    logger.warning(f"反爬破解失败,次数:{usages_521}")
+                    msg = f"反爬破解失败,次数:{usages_521}"
+                    logger.warning(msg)
                     time.sleep(1)
                     usages_521 += 1
                 usages += 1
             elif resp.status_code in [401, 403, 404]:
-                logger.error(f"账号登录已失效或封停,异常状态码:{resp.status_code}")
+                msg = f"账号登录已失效或封停,异常状态码:{resp.status_code}"
+                logger.error(msg)
                 break
             else:
                 break
         except requests.RequestException as e:
-            logger.error(f"访问失败,失败原因:{e.__class__.__name__}")
+            msg = f"访问失败,失败原因:{e.__class__.__name__}"
+            logger.error(msg)
             usages += 1
-    return resp
+    return resp, msg
 
 
 def crawl_spider(area: str, type_: int, page: int, **kwargs):
     results = []
     request_status = 'failure'  # 资源请求结果, 成功=success 失败=failure 停止=stop 账号封停=disable
-
-    curr_date = delay_by_day(0)
-    begin_time = kwargs.pop('begin_time', curr_date)
-    end_time = kwargs.pop('end_time', curr_date)
+    begin_time = kwargs.pop('begin_time')
+    end_time = kwargs.pop('end_time')
     max_per_page = kwargs.pop('max_per_page', 20)
+
     data = REQUEST_DATA_MAP[type_]
     data['newAreas'] = area  # 设置地区
     data['currentPage'] = page  # 页码
@@ -84,7 +105,7 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
     data['endTime'] = end_time  # 结束时间,格式:xxxx-xx-xxx
     data = json.dumps(data)
     url = "https://search.vip.qianlima.com/rest/service/website/search/solr"
-    response = crawl_request(url, data)
+    response, err = crawl_request(url, data)
     row_count = 0
     if response is not None and response.status_code == 200:
         resp_json = response.json()
@@ -103,11 +124,12 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                     addr = str(item["areaName"]).split('-')
                     _area = addr[0] if len(addr) > 0 else ''
                     _city = addr[1] if len(addr) > 1 else ''
-                    if "国土" in item.get('progName',''):
+                    if "国土" in item.get('progName', ''):
                         channel = item['progName']
                     else:
                         channel = (item['noticeSegmentTypeName'] or item['progName'])
-                    res = {
+
+                    results.append({
                         'site': '千里马',
                         'channel': channel,
                         'area': _area,
@@ -115,8 +137,8 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                         'title': item["title"],
                         'publishtime': item['updateTime'],
                         'href': item.get('url', '')
-                    }
-                    results.append(res)
+                    })
+
             request_status = 'success'
 
             if len(items) < max_per_page:
@@ -129,7 +151,8 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
                 "data": null
             }
             '''
-            logger.info(resp_json['msg'])
+            err = resp_json['msg']
+            logger.info(err)
     elif response is not None and response.status_code in [401, 403, 404]:
         request_status = 'disable'
     elif response is not None and response.status_code == 405:
@@ -147,24 +170,28 @@ def crawl_spider(area: str, type_: int, page: int, **kwargs):
             page,
             len(results))
         )
-    return request_status
+    return request_status, err
 
 
-def by_area_crawl_data(area="", type_=0, **kwargs):
+def downloader(area="", type_=0, **kwargs):
     close_spider = False
+    send_warning = False
+    msg = ''
+
     disable_page, max_disable_page = 0, 3
     pages = list(range(1, 101))  # 目前仅支持前10000数据的搜索
     while len(pages) > 0:
         if close_spider:
             break
-        elif disable_page > max_disable_page:
+        elif disable_page > max_disable_page and send_warning:
             # 此处可以添加通知邮件或者企业微信机器人接口,通知采集异常信息
+            send_wechat_warning(msg)
             break
 
         page = pages.pop(0)
         logger.info(f"访问-{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据")
         while True:
-            success = crawl_spider(area, type_, page, **kwargs)
+            success, msg = crawl_spider(area, type_, page, **kwargs)
             if success == 'failure':
                 interval = math.log(random.randint(100, 2400), 2)
                 logger.debug(f'异常重试,等待{interval}s')
@@ -173,9 +200,11 @@ def by_area_crawl_data(area="", type_=0, **kwargs):
             elif success == 'disable':
                 logger.warning(f"账号被禁止访问-{city_dict.get(int(area))}-第{page}页数据")
                 disable_page += 1
+                send_warning = True
             elif success == 'method_not_allowed':
                 logger.warning("服务器禁止使用当前 HTTP 方法的请求")
                 disable_page += 1
+                send_warning = True
             elif success == 'stop':
                 close_spider = True
             else:
@@ -183,24 +212,37 @@ def by_area_crawl_data(area="", type_=0, **kwargs):
                 time.sleep(math.log(random.randint(100, 2400), 2))
             break
 
+    if send_warning:
+        raise AccountLoginExpirationError
+
 
 def select_types(date: str, area: str, prov: str):
     for type_ in [1, 2, 3, 4]:
-        by_area_crawl_data(
+        downloader(
             area=area,
             type_=type_,
             begin_time=date,
             end_time=date,
             max_per_page=100
         )
-    logger.info(f"{date}-{province_dict.get(int(prov))}地区-{channel_dict.get(type_)}采集结束")
+        tips = [
+            f'{date}',
+            f'{channel_dict.get(type_)}',
+            f'{province_dict.get(int(prov))}',
+            '完成采集'
+        ]
+        logger.info(f"+++ {' && '.join(tips)} +++ ")
 
 
 def select_area(date: str):
-    for province in range(1, 32):
-        for city_ in area_dict.get(province):
-            select_types(date, area=str(city_), prov=str(province))
-    logger.info(f"任务结束")
+    logger.info(f" +++ 开始采集 - {date} +++")
+    try:
+        for province in range(1, 32):
+            for city_ in area_dict.get(province):
+                select_types(date, area=str(city_), prov=str(province))
+    except AccountLoginExpirationError:
+        pass
+    logger.info(f" +++ 采集结束 - {date} +++")
 
 
 def history(date_lst: list):
@@ -209,8 +251,8 @@ def history(date_lst: list):
 
 
 def start():
-    date_str = delay_by_day(-1)
-    select_area(date_str)
+    date = get_today_of_day(-1)
+    select_area(date)
 
 
 if __name__ == '__main__':