# coding: utf-8 import datetime import json import math import random import time import requests from utils.config_parms import * from utils.databases import mongo_table, redis_client from utils.log import logger from utils.sessions_521 import http_session_521 from utils.tools import sha1 qlm = mongo_table('qlm', 'data_merge') r = redis_client() redis_key = "qianlima_2024" session = requests.session() # proxies = { # 'http': 'socks5://123.101.64.83:8861', # 'https': 'socks5://123.101.64.83:8861' # } proxies = None ''' https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0 搜索-2.0 1 = 招标信息 2 = 中标信息 3 = 拟在建项目 4 = 审批项目 ''' class AccountLoginExpirationError(Exception): pass def send_wechat_warning(msg, send=True): markdown = f'千里马列表页采集异常,请相关同事注意。' markdown += f'\n>异常详情:**{msg}**' if not send: logger.info(markdown) return url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11' headers_ = {'Content-Type': 'application/json'} json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}} request_params = dict(headers=headers_, json=json_data, timeout=10) response = requests.post(url, **request_params) logger.info(response.json()) def get_today_of_day(offset, fmt="%Y-%m-%d"): date = datetime.datetime.now() + datetime.timedelta(days=int(offset)) return date.strftime(fmt) def crawl_request(url, data, retries=5): global session, cookies, proxies resp, msg = None, '' usages, usages_521 = 0, 1 while usages < retries: request_params = {} request_params.setdefault('data', data) request_params.setdefault('headers', headers) request_params.setdefault('cookies', cookies) request_params.setdefault('proxies', proxies) request_params.setdefault('timeout', 60) try: resp = session.post(url, **request_params) if resp.status_code == 521: while usages_521 < retries: success, _, cookies = http_session_521(session, url, headers, cookies, data=data, proxies=proxies) if success: break msg = f"反爬破解失败,次数:{usages_521}" logger.warning(msg) time.sleep(1) usages_521 += 1 usages += 1 elif resp.status_code in [401, 403, 404]: msg = f"账号登录已失效或封停,异常状态码:{resp.status_code}" logger.error(msg) break elif str(resp.status_code).startswith('4'): msg = f"公网IP被封禁,异常状态码:{resp.status_code}" logger.error(msg) break else: break except requests.RequestException as e: msg = f"访问失败,原因:{e.__class__.__name__}" logger.error(msg) usages += 1 return resp, msg def spider(area: str, type_: int, page: int, **kwargs): request_status = 'failure' # 资源请求结果状态, 成功=success 失败=failure 停止=stop 封停=disable results = [] begin_time = kwargs.pop('begin_time') end_time = kwargs.pop('end_time') max_per_page = kwargs.pop('max_per_page', 20) url = "https://search.vip.qianlima.com/rest/service/website/search/solr" data = REQUEST_DATA_MAP[type_] data['newAreas'] = area # 设置地区 data['currentPage'] = page # 页码 data['numPerPage'] = max_per_page # 每页的条目数 data['timeType'] = 4 # 自定义时间参数 data['beginTime'] = begin_time # 开始时间,格式:xxxx-xx-xxx data['endTime'] = end_time # 结束时间,格式:xxxx-xx-xxx data = json.dumps(data) response, err = crawl_request(url, data) if response is None: request_status = 'server_error' return request_status, err row_count = 0 if response.status_code == 200: resp_json = response.json() if resp_json['code'] == 200: row_count = resp_json["data"]["rowCount"] items = resp_json["data"]["data"] for item in items: cid = sha1(str(item["contentid"])) if not r.hexists(redis_key, cid): r.hset(redis_key, cid, '') if "popTitle" in item: item["title"] = item["popTitle"] else: item["title"] = item["showTitle"] addr = str(item["areaName"]).split('-') _area = addr[0] if len(addr) > 0 else '' _city = addr[1] if len(addr) > 1 else '' if "国土" in item.get('progName', ''): channel = item['progName'] else: channel = (item['noticeSegmentTypeName'] or item['progName']) results.append({ 'site': '千里马', 'channel': channel, 'area': _area, 'city': _city, 'title': item["title"], 'publishtime': item['updateTime'], 'href': item.get('url', '') }) request_status = 'success' if len(items) < max_per_page: request_status = 'stop' else: ''' { "code": 200520, "msg": "抱歉,您在单位时间内的搜索次数已达上限,请联系客服购买会员!咨询电话:400-688-2000", "data": null } ''' err = resp_json['msg'] logger.info(err) elif response.status_code in [401, 403, 404]: request_status = 'disable' elif response.status_code == 405: request_status = 'method_not_allowed' elif str(response.status_code).startswith('4'): request_status = 'client_ip_disable' if len(results) > 0: qlm.insert_many(results) if request_status in ['stop', 'success']: logger.info("{}-{}-{}-共{}条-第{}页,成功上传{}条数据".format( begin_time, city_dict.get(int(area)), channel_dict.get(type_), row_count, page, len(results)) ) return request_status, err def downloader(area="", type_=0, **kwargs): close_spider = False send_warning = False reason = '' retry_times, max_retries = 0, 3 pages = list(range(1, 101)) # 目前qlm仅支持查看前10000数据 while len(pages) > 0: if close_spider: if send_warning: send_wechat_warning(reason) break if send_warning and retry_times > max_retries: # 此处可以添加通知邮件或者企业微信机器人接口,通知采集异常信息 send_wechat_warning(reason) break page = pages.pop(0) logger.info(f"访问-{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据") while True: err, reason = spider(area, type_, page, **kwargs) if err in ['server_error', 'client_ip_disable']: close_spider = True send_warning = True elif err == 'failure': interval = math.log(random.randint(100, 2400), 2) logger.debug(f'异常重试,等待{interval}s') time.sleep(interval) continue elif err == 'disable': logger.warning(f"账号被禁止访问-{city_dict.get(int(area))}-第{page}页数据") retry_times += 1 send_warning = True elif err == 'method_not_allowed': logger.warning("服务器禁止使用当前 HTTP 方法的请求") retry_times += 1 send_warning = True elif err == 'stop': close_spider = True else: logger.info(f"{city_dict.get(int(area))}-{channel_dict.get(type_)}-第{page}页数据采集成功") time.sleep(math.log(random.randint(100, 2400), 2)) break if send_warning: raise AccountLoginExpirationError def select_types(date: str, area: str, prov: str): for type_ in [1, 2, 3, 4]: downloader( area=area, type_=type_, begin_time=date, end_time=date, max_per_page=100 ) tips = [ f'{date}', f'{channel_dict.get(type_)}', f'{province_dict.get(int(prov))}', '完成采集' ] logger.info(f"+++ {' && '.join(tips)} +++ ") def select_area(date: str): logger.info(f" +++ 开始采集 - {date} +++") try: for province in range(1, 32): for city_ in area_dict.get(province): select_types(date, area=str(city_), prov=str(province)) return True except AccountLoginExpirationError: return False finally: logger.info(f" +++ 采集结束 - {date} +++") def history(date_lst: list): for date in date_lst: success = select_area(date) if not success: break def start(): date = get_today_of_day(-1) select_area(date) if __name__ == '__main__': start()