# -*- coding: utf-8 -*- import json import math import random import time import requests from db.RedisDB import RedisFilter from utils.config_parms import ( account_pool, area_dict, city_dict, province_dict, channel_dict, REQUEST_DATA_MAP ) from utils.databases import mongo_table from utils.log import logger from utils.sessions_521 import http_session_521 from utils.tools import sha1, get_today_of_day ''' https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0 搜索-2.0 1 = 招标信息 2 = 中标信息 3 = 拟在建项目 4 = 审批项目 ''' qlm = mongo_table('qlm', 'data_merge') dedup = RedisFilter('redis://:k5ZJR5KV4q7DRZ92DQ@172.17.189.142:7361/2') session = requests.session() class AccountViolationRiskError(Exception): pass def send_wechat_warning(msg, send=True): markdown = f'千里马列表页采集异常,请相关同事注意。' markdown += f'\n>异常详情:**{msg}**' if not send: logger.info(markdown) return url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11' headers_ = {'Content-Type': 'application/json'} json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}} request_params = dict(headers=headers_, json=json_data, timeout=10) response = requests.post(url, **request_params) logger.info(response.json()) def get_account(area): return next((p for p in account_pool if area in p['follow']), None) def disrupt_account_pool(): results = [] copy_account_pool = list(account_pool) while copy_account_pool: idx = random.randint(0, len(copy_account_pool) - 1) results.append(copy_account_pool.pop(idx)) return results def request(url, data, account, retries=5): global session ip, _ = str(account['proxies']['http']).replace('socks5://', '').split(':') phone = account['phone'] resp, msg = None, '' usages, usages_521 = 0, 1 while usages < retries: request_params = {} proxies = account['proxies'] request_params.setdefault('data', data) request_params.setdefault('headers', account['headers']) request_params.setdefault('cookies', account['cookies']) request_params.setdefault('proxies', proxies) request_params.setdefault('timeout', 60) try: resp = session.post(url, **request_params) if resp.status_code == 521: while usages_521 < retries: success, _, cookies = http_session_521(session, url, headers=account['headers'], cookies=account['cookies'], data=data, proxies=account['proxies']) if success: break msg = f'账号[{phone}]反爬破解失败,次数:{usages_521}' logger.warning(msg) time.sleep(1) usages_521 += 1 usages += 1 elif resp.status_code == 429: msg = f'账号[{phone}]访问频繁,图形验证,异常状态码:{resp.status_code}' logger.error(msg) logger.warning(resp.content.decode()) break elif resp.status_code in [401, 403, 404]: msg = f'账号[{phone}]登录已失效或封停,,异常状态码:{resp.status_code}' logger.error(msg) break elif str(resp.status_code).startswith('4'): msg = f'公网代理IP[{ip}]被封禁,,异常状态码:{resp.status_code}' logger.error(msg) break else: break except requests.RequestException as e: if not isinstance(e, requests.ConnectTimeout): usages += 1 msg = f'访问失败,原因:{e.__class__.__name__}' else: usages = retries msg = f'访问失败,原因:代理访问超时[{proxies["https"]}]' logger.error(msg) return resp, msg def downloader(begin_date, end_date, category, address, page, page_size, account): """ :param str begin_date: 开始时间,格式:xxxx-xx-xxx :param str end_date: 结束时间,格式:xxxx-xx-xxx :param int category: 栏目编号 :param int address: 地区编号 :param int page: 页码 :param int page_size: 单页数据条数 :param dict account: 采集账号 """ url = 'https://search.vip.qianlima.com/rest/service/website/search/solr' data = REQUEST_DATA_MAP[category] data['newAreas'] = str(address) # 设置地区 data['timeType'] = 4 # 自定义时间参数 data['beginTime'] = begin_date data['endTime'] = end_date data['currentPage'] = page data['numPerPage'] = page_size data = json.dumps(data) # 请求资源响应自定义状态, 成功=success 失败=failure 停止=stop IP封停=disable等 request_status = 'failure' response, err = request(url, data, account) if response is None: request_status = 'server_error' return request_status, err results = [] row_count = 0 if response.status_code == 200: resp_json = response.json() if resp_json['code'] == 200: row_count = resp_json['data']['rowCount'] items = resp_json['data']['data'] for item in items: cid = sha1(str(item['contentid'])) if not dedup.get(item['contentid']): dedup.add(item['contentid']) if 'popTitle' in item: item['title'] = item['popTitle'] else: item['title'] = item['showTitle'] addr = str(item['areaName']).split('-') area_ = addr[0] if len(addr) > 0 else '' city_ = addr[1] if len(addr) > 1 else '' if '国土' in item.get('progName', ''): channel = item['progName'] else: channel = (item['noticeSegmentTypeName'] or item['progName']) results.append({ 'site': '千里马', 'channel': channel, 'area': area_, 'city': city_, 'title': item['title'], 'publishtime': item['updateTime'], 'href': item.get('url', '') }) if len(results) > 0: qlm.insert_many(results, ordered=False) request_status = 'success' if len(items) < page_size or len(results) == 0: request_status = 'stop' else: ''' { "code": 200520, "msg": "抱歉,您在单位时间内的搜索次数已达上限,请联系客服购买会员!咨询电话:400-688-2000", "data": null } ''' err = resp_json['msg'] logger.info(err) elif response.status_code in [401, 403, 404]: request_status = 'disable' elif response.status_code in [405]: request_status = 'method_not_allowed' elif response.status_code in [429]: request_status = 'captcha_required' elif str(response.status_code).startswith('4'): request_status = 'client_ip_disable' if request_status in ['stop', 'success']: if page == 1: logger.info(f'千里马 {begin_date} 网站发布 {row_count} 条数据') logger.info(f'入库 {len(results)} 条') return request_status, err def automatic_pagination(**kwargs): reason = '' # 采集失败时原因 close_spider = False send_warning = False retry_times, max_retries = 0, 3 pages = list(range(1, 101)) # 目前qlm仅支持查看前10000数据 while len(pages) > 0: if close_spider: if send_warning: send_wechat_warning(reason) break if send_warning and retry_times > max_retries: send_wechat_warning(reason) break page = pages.pop(0) logger.info(f'下载第{page}页数据') while True: err, reason = downloader(page=page, **kwargs) if err in ['server_error', 'client_ip_disable', 'captcha_required']: close_spider = True send_warning = True elif err == 'failure': interval = math.log(random.randint(100, 2400), 2) logger.debug(f'等待{interval}s,异常重试...') time.sleep(interval) continue elif err == 'disable': logger.warning('账号被禁止访问') retry_times += 1 send_warning = True elif err == 'method_not_allowed': logger.warning('服务器禁止使用当前 HTTP 方法的请求') retry_times += 1 send_warning = True elif err == 'stop': time.sleep(math.log(random.randint(100, 2400), 2)) close_spider = True else: time.sleep(math.log(random.randint(100, 2400), 2)) break if send_warning: raise AccountViolationRiskError def core(date: str, category: int, address: int, account, page_size=20): try: automatic_pagination( begin_date=date, end_date=date, category=category, address=address, page_size=page_size, # 每页数据最大条数 account=account ) return True except AccountViolationRiskError: return False def spider(date, page_size=40): logger.info('+++ 采集开始 +++') dates = [date] if not isinstance(date, list) else date try: for date in dates: for category, category_name in channel_dict.items(): for area, cities in area_dict.items(): account = get_account(area) if not account: # raise ValueError('采集账号不能为空!') logger.warning('暂无可用采集账号与代理!') continue for city in cities: logger.info(' && '.join([ date, category_name, province_dict[area], city_dict[city] ])) if len(cities) == 1: city = area # 千里马取消了直辖市的分区,直接采集省市区域 yield core(date, category, city, account, page_size=page_size) except Exception as e: logger.error(e) except KeyboardInterrupt: pass finally: logger.info('+++ 采集结束 +++') def history(date_lst: list): for result in spider(date_lst): if not result: break def start(): date = get_today_of_day(-1) for result in spider(date, page_size=100): if not result: break if __name__ == '__main__': start()