123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- # -*- coding: utf-8 -*-
- import json
- import math
- import random
- import time
- import requests
- from db.RedisDB import RedisFilter
- from utils.config_parms import (
- account_pool,
- area_dict,
- city_dict,
- province_dict,
- channel_dict,
- REQUEST_DATA_MAP
- )
- from utils.databases import mongo_table
- from utils.log import logger
- from utils.sessions_521 import http_session_521
- from utils.tools import sha1, get_today_of_day
- '''
- https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0
- 搜索-2.0
- 1 = 招标信息
- 2 = 中标信息
- 3 = 拟在建项目
- 4 = 审批项目
- '''
- qlm = mongo_table('qlm', 'data_merge')
- dedup = RedisFilter('redis://:k5ZJR5KV4q7DRZ92DQ@172.17.189.142:7361/2')
- session = requests.session()
- class AccountViolationRiskError(Exception):
- pass
- def send_wechat_warning(msg, send=True):
- markdown = f'千里马列表页采集异常,请相关同事注意。'
- markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
- if not send:
- logger.info(markdown)
- return
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
- headers_ = {'Content-Type': 'application/json'}
- json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}}
- request_params = dict(headers=headers_, json=json_data, timeout=10)
- response = requests.post(url, **request_params)
- logger.info(response.json())
- def get_account(area):
- return next((p for p in account_pool if area in p['follow']), None)
- def disrupt_account_pool():
- results = []
- copy_account_pool = list(account_pool)
- while copy_account_pool:
- idx = random.randint(0, len(copy_account_pool) - 1)
- results.append(copy_account_pool.pop(idx))
- return results
- def request(url, data, account, retries=5):
- global session
- ip, _ = str(account['proxies']['http']).replace('socks5://', '').split(':')
- phone = account['phone']
- resp, msg = None, ''
- usages, usages_521 = 0, 1
- while usages < retries:
- request_params = {}
- proxies = account['proxies']
- request_params.setdefault('data', data)
- request_params.setdefault('headers', account['headers'])
- request_params.setdefault('cookies', account['cookies'])
- request_params.setdefault('proxies', proxies)
- request_params.setdefault('timeout', 60)
- try:
- resp = session.post(url, **request_params)
- if resp.status_code == 521:
- while usages_521 < retries:
- success, _, cookies = http_session_521(session,
- url,
- headers=account['headers'],
- cookies=account['cookies'],
- data=data,
- proxies=account['proxies'])
- if success:
- break
- msg = f'账号[{phone}]反爬破解失败,次数:{usages_521}'
- logger.warning(msg)
- time.sleep(1)
- usages_521 += 1
- usages += 1
- elif resp.status_code == 429:
- msg = f'账号[{phone}]访问频繁,图形验证,异常状态码:{resp.status_code}'
- logger.error(msg)
- logger.warning(resp.content.decode())
- break
- elif resp.status_code in [401, 403, 404]:
- msg = f'账号[{phone}]登录已失效或封停,,异常状态码:{resp.status_code}'
- logger.error(msg)
- break
- elif str(resp.status_code).startswith('4'):
- msg = f'公网代理IP[{ip}]被封禁,,异常状态码:{resp.status_code}'
- logger.error(msg)
- break
- else:
- break
- except requests.RequestException as e:
- if not isinstance(e, requests.ConnectTimeout):
- usages += 1
- msg = f'访问失败,原因:{e.__class__.__name__}'
- else:
- usages = retries
- msg = f'访问失败,原因:代理访问超时[{proxies["https"]}]'
- logger.error(msg)
- return resp, msg
- def downloader(begin_date, end_date, category, address, page, page_size, account):
- """
- :param str begin_date: 开始时间,格式:xxxx-xx-xxx
- :param str end_date: 结束时间,格式:xxxx-xx-xxx
- :param int category: 栏目编号
- :param int address: 地区编号
- :param int page: 页码
- :param int page_size: 单页数据条数
- :param dict account: 采集账号
- """
- url = 'https://search.vip.qianlima.com/rest/service/website/search/solr'
- data = REQUEST_DATA_MAP[category]
- data['newAreas'] = str(address) # 设置地区
- data['timeType'] = 4 # 自定义时间参数
- data['beginTime'] = begin_date
- data['endTime'] = end_date
- data['currentPage'] = page
- data['numPerPage'] = page_size
- data = json.dumps(data)
- # 请求资源响应自定义状态, 成功=success 失败=failure 停止=stop IP封停=disable等
- request_status = 'failure'
- response, err = request(url, data, account)
- if response is None:
- request_status = 'server_error'
- return request_status, err
- results = []
- row_count = 0
- if response.status_code == 200:
- resp_json = response.json()
- if resp_json['code'] == 200:
- row_count = resp_json['data']['rowCount']
- items = resp_json['data']['data']
- for item in items:
- cid = sha1(str(item['contentid']))
- if not dedup.get(item['contentid']):
- dedup.add(item['contentid'])
- if 'popTitle' in item:
- item['title'] = item['popTitle']
- else:
- item['title'] = item['showTitle']
- addr = str(item['areaName']).split('-')
- area_ = addr[0] if len(addr) > 0 else ''
- city_ = addr[1] if len(addr) > 1 else ''
- if '国土' in item.get('progName', ''):
- channel = item['progName']
- else:
- channel = (item['noticeSegmentTypeName'] or item['progName'])
- results.append({
- 'site': '千里马',
- 'channel': channel,
- 'area': area_,
- 'city': city_,
- 'title': item['title'],
- 'publishtime': item['updateTime'],
- 'href': item.get('url', '')
- })
- if len(results) > 0:
- qlm.insert_many(results, ordered=False)
- request_status = 'success'
- if len(items) < page_size or len(results) == 0:
- request_status = 'stop'
- else:
- '''
- {
- "code": 200520,
- "msg": "抱歉,您在单位时间内的搜索次数已达上限,请联系客服购买会员!咨询电话:400-688-2000",
- "data": null
- }
- '''
- err = resp_json['msg']
- logger.info(err)
- elif response.status_code in [401, 403, 404]:
- request_status = 'disable'
- elif response.status_code in [405]:
- request_status = 'method_not_allowed'
- elif response.status_code in [429]:
- request_status = 'captcha_required'
- elif str(response.status_code).startswith('4'):
- request_status = 'client_ip_disable'
- if request_status in ['stop', 'success']:
- if page == 1:
- logger.info(f'千里马 {begin_date} 网站发布 {row_count} 条数据')
- logger.info(f'入库 {len(results)} 条')
- return request_status, err
- def automatic_pagination(**kwargs):
- reason = '' # 采集失败时原因
- close_spider = False
- send_warning = False
- retry_times, max_retries = 0, 3
- pages = list(range(1, 101)) # 目前qlm仅支持查看前10000数据
- while len(pages) > 0:
- if close_spider:
- if send_warning:
- send_wechat_warning(reason)
- break
- if send_warning and retry_times > max_retries:
- send_wechat_warning(reason)
- break
- page = pages.pop(0)
- logger.info(f'下载第{page}页数据')
- while True:
- err, reason = downloader(page=page, **kwargs)
- if err in ['server_error', 'client_ip_disable', 'captcha_required']:
- close_spider = True
- send_warning = True
- elif err == 'failure':
- interval = math.log(random.randint(100, 2400), 2)
- logger.debug(f'等待{interval}s,异常重试...')
- time.sleep(interval)
- continue
- elif err == 'disable':
- logger.warning('账号被禁止访问')
- retry_times += 1
- send_warning = True
- elif err == 'method_not_allowed':
- logger.warning('服务器禁止使用当前 HTTP 方法的请求')
- retry_times += 1
- send_warning = True
- elif err == 'stop':
- time.sleep(math.log(random.randint(100, 2400), 2))
- close_spider = True
- else:
- time.sleep(math.log(random.randint(100, 2400), 2))
- break
- if send_warning:
- raise AccountViolationRiskError
- def core(date: str, category: int, address: int, account, page_size=20):
- try:
- automatic_pagination(
- begin_date=date,
- end_date=date,
- category=category,
- address=address,
- page_size=page_size, # 每页数据最大条数
- account=account
- )
- return True
- except AccountViolationRiskError:
- return False
- def spider(date, page_size=40):
- logger.info('+++ 采集开始 +++')
- dates = [date] if not isinstance(date, list) else date
- try:
- for date in dates:
- for category, category_name in channel_dict.items():
- for area, cities in area_dict.items():
- account = get_account(area)
- if not account:
- # raise ValueError('采集账号不能为空!')
- logger.warning('暂无可用采集账号与代理!')
- continue
- for city in cities:
- logger.info(' && '.join([
- date,
- category_name,
- province_dict[area],
- city_dict[city]
- ]))
- if len(cities) == 1:
- city = area # 千里马取消了直辖市的分区,直接采集省市区域
- yield core(date, category, city, account, page_size=page_size)
- except Exception as e:
- logger.error(e)
- except KeyboardInterrupt:
- pass
- finally:
- logger.info('+++ 采集结束 +++')
- def history(date_lst: list):
- for result in spider(date_lst):
- if not result:
- break
- def start():
- date = get_today_of_day(-1)
- for result in spider(date, page_size=100):
- if not result:
- break
- if __name__ == '__main__':
- start()
|