source_qianlima.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import math
  4. import random
  5. import time
  6. import requests
  7. from utils.config_parms import (
  8. account_pool,
  9. area_dict,
  10. city_dict,
  11. province_dict,
  12. channel_dict,
  13. REQUEST_DATA_MAP
  14. )
  15. from utils.databases import mongo_table, redis_client
  16. from utils.log import logger
  17. from utils.sessions_521 import http_session_521
  18. from utils.tools import sha1, get_today_of_day
  19. '''
  20. https://search.vip.qianlima.com/index.html#?sortType=6&isSearchWord=1&tab_index=0
  21. 搜索-2.0
  22. 1 = 招标信息
  23. 2 = 中标信息
  24. 3 = 拟在建项目
  25. 4 = 审批项目
  26. '''
  27. qlm = mongo_table('qlm', 'data_merge')
  28. r = redis_client()
  29. redis_key = 'qianlima_2024'
  30. session = requests.session()
  31. class AccountViolationRiskError(Exception):
  32. pass
  33. def send_wechat_warning(msg, send=True):
  34. markdown = f'千里马列表页采集异常,请相关同事注意。'
  35. markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
  36. if not send:
  37. logger.info(markdown)
  38. return
  39. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
  40. headers_ = {'Content-Type': 'application/json'}
  41. json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}}
  42. request_params = dict(headers=headers_, json=json_data, timeout=10)
  43. response = requests.post(url, **request_params)
  44. logger.info(response.json())
  45. def get_account(area):
  46. return next((p for p in account_pool if area in p['follow']), None)
  47. def disrupt_account_pool():
  48. results = []
  49. copy_account_pool = list(account_pool)
  50. while copy_account_pool:
  51. idx = random.randint(0, len(copy_account_pool) - 1)
  52. results.append(copy_account_pool.pop(idx))
  53. return results
  54. def request(url, data, account, retries=5):
  55. global session
  56. ip, _ = str(account['proxies']['http']).replace('socks5://', '').split(':')
  57. phone = account['phone']
  58. resp, msg = None, ''
  59. usages, usages_521 = 0, 1
  60. while usages < retries:
  61. request_params = {}
  62. request_params.setdefault('data', data)
  63. request_params.setdefault('headers', account['headers'])
  64. request_params.setdefault('cookies', account['cookies'])
  65. request_params.setdefault('proxies', account['proxies'])
  66. request_params.setdefault('timeout', 60)
  67. try:
  68. resp = session.post(url, **request_params)
  69. if resp.status_code == 521:
  70. while usages_521 < retries:
  71. success, _, cookies = http_session_521(session,
  72. url,
  73. headers=account['headers'],
  74. cookies=account['cookies'],
  75. data=data,
  76. proxies=account['proxies'])
  77. if success:
  78. break
  79. msg = f'账号[{phone}]反爬破解失败,次数:{usages_521}'
  80. logger.warning(msg)
  81. time.sleep(1)
  82. usages_521 += 1
  83. usages += 1
  84. elif resp.status_code == 429:
  85. msg = f'账号[{phone}]访问频繁,图形验证,异常状态码:{resp.status_code}'
  86. logger.error(msg)
  87. logger.warning(resp.content.decode())
  88. break
  89. elif resp.status_code in [401, 403, 404]:
  90. msg = f'账号[{phone}]登录已失效或封停,,异常状态码:{resp.status_code}'
  91. logger.error(msg)
  92. break
  93. elif str(resp.status_code).startswith('4'):
  94. msg = f'公网代理IP[{ip}]被封禁,,异常状态码:{resp.status_code}'
  95. logger.error(msg)
  96. break
  97. else:
  98. break
  99. except requests.RequestException as e:
  100. msg = f'访问失败,原因:{e.__class__.__name__}'
  101. logger.error(msg)
  102. usages += 1
  103. return resp, msg
  104. def downloader(begin_date, end_date, category, address, page, page_size, account):
  105. """
  106. :param str begin_date: 开始时间,格式:xxxx-xx-xxx
  107. :param str end_date: 结束时间,格式:xxxx-xx-xxx
  108. :param int category: 栏目编号
  109. :param int address: 地区编号
  110. :param int page: 页码
  111. :param int page_size: 单页数据条数
  112. :param dict account: 采集账号
  113. """
  114. url = 'https://search.vip.qianlima.com/rest/service/website/search/solr'
  115. data = REQUEST_DATA_MAP[category]
  116. data['newAreas'] = str(address) # 设置地区
  117. data['timeType'] = 4 # 自定义时间参数
  118. data['beginTime'] = begin_date
  119. data['endTime'] = end_date
  120. data['currentPage'] = page
  121. data['numPerPage'] = page_size
  122. data = json.dumps(data)
  123. # 请求资源响应自定义状态, 成功=success 失败=failure 停止=stop IP封停=disable等
  124. request_status = 'failure'
  125. response, err = request(url, data, account)
  126. if response is None:
  127. request_status = 'server_error'
  128. return request_status, err
  129. results = []
  130. row_count = 0
  131. if response.status_code == 200:
  132. resp_json = response.json()
  133. if resp_json['code'] == 200:
  134. row_count = resp_json['data']['rowCount']
  135. items = resp_json['data']['data']
  136. for item in items:
  137. cid = sha1(str(item['contentid']))
  138. if not r.hexists(redis_key, cid):
  139. r.hset(redis_key, cid, '')
  140. if 'popTitle' in item:
  141. item['title'] = item['popTitle']
  142. else:
  143. item['title'] = item['showTitle']
  144. addr = str(item['areaName']).split('-')
  145. area_ = addr[0] if len(addr) > 0 else ''
  146. city_ = addr[1] if len(addr) > 1 else ''
  147. if '国土' in item.get('progName', ''):
  148. channel = item['progName']
  149. else:
  150. channel = (item['noticeSegmentTypeName'] or item['progName'])
  151. results.append({
  152. 'site': '千里马',
  153. 'channel': channel,
  154. 'area': area_,
  155. 'city': city_,
  156. 'title': item['title'],
  157. 'publishtime': item['updateTime'],
  158. 'href': item.get('url', '')
  159. })
  160. if len(results) > 0:
  161. qlm.insert_many(results, ordered=False)
  162. request_status = 'success'
  163. if len(items) < page_size or len(results) == 0:
  164. request_status = 'stop'
  165. else:
  166. '''
  167. {
  168. "code": 200520,
  169. "msg": "抱歉,您在单位时间内的搜索次数已达上限,请联系客服购买会员!咨询电话:400-688-2000",
  170. "data": null
  171. }
  172. '''
  173. err = resp_json['msg']
  174. logger.info(err)
  175. elif response.status_code in [401, 403, 404]:
  176. request_status = 'disable'
  177. elif response.status_code in [405]:
  178. request_status = 'method_not_allowed'
  179. elif response.status_code in [429]:
  180. request_status = 'captcha_required'
  181. elif str(response.status_code).startswith('4'):
  182. request_status = 'client_ip_disable'
  183. if request_status in ['stop', 'success']:
  184. if page == 1:
  185. logger.info(f'千里马 {begin_date} 网站发布 {row_count} 条数据')
  186. logger.info(f'入库 {len(results)} 条')
  187. return request_status, err
  188. def automatic_pagination(**kwargs):
  189. reason = '' # 采集失败时原因
  190. close_spider = False
  191. send_warning = False
  192. retry_times, max_retries = 0, 3
  193. pages = list(range(1, 101)) # 目前qlm仅支持查看前10000数据
  194. while len(pages) > 0:
  195. if close_spider:
  196. if send_warning:
  197. send_wechat_warning(reason)
  198. break
  199. if send_warning and retry_times > max_retries:
  200. send_wechat_warning(reason)
  201. break
  202. page = pages.pop(0)
  203. logger.info(f'下载第{page}页数据')
  204. while True:
  205. err, reason = downloader(page=page, **kwargs)
  206. if err in ['server_error', 'client_ip_disable', 'captcha_required']:
  207. close_spider = True
  208. send_warning = True
  209. elif err == 'failure':
  210. interval = math.log(random.randint(100, 2400), 2)
  211. logger.debug(f'等待{interval}s,异常重试...')
  212. time.sleep(interval)
  213. continue
  214. elif err == 'disable':
  215. logger.warning('账号被禁止访问')
  216. retry_times += 1
  217. send_warning = True
  218. elif err == 'method_not_allowed':
  219. logger.warning('服务器禁止使用当前 HTTP 方法的请求')
  220. retry_times += 1
  221. send_warning = True
  222. elif err == 'stop':
  223. time.sleep(math.log(random.randint(100, 2400), 2))
  224. close_spider = True
  225. else:
  226. time.sleep(math.log(random.randint(100, 2400), 2))
  227. break
  228. if send_warning:
  229. raise AccountViolationRiskError
  230. def core(date: str, category: int, address: int, account, page_size=20):
  231. try:
  232. automatic_pagination(
  233. begin_date=date,
  234. end_date=date,
  235. category=category,
  236. address=address,
  237. page_size=page_size, # 每页数据最大条数
  238. account=account
  239. )
  240. return True
  241. except AccountViolationRiskError:
  242. return False
  243. def spider(date, page_size=40):
  244. logger.info('+++ 采集开始 +++')
  245. dates = [date] if not isinstance(date, list) else date
  246. try:
  247. for date in dates:
  248. for category, category_name in channel_dict.items():
  249. for area, cities in area_dict.items():
  250. account = get_account(area)
  251. if not account:
  252. # raise ValueError('采集账号不能为空!')
  253. logger.warning('暂无可用采集账号与代理!')
  254. continue
  255. for city in cities:
  256. logger.info(' && '.join([
  257. date,
  258. category_name,
  259. province_dict[area],
  260. city_dict[city]
  261. ]))
  262. if len(cities) == 1:
  263. city = area # 千里马取消了直辖市的分区,直接采集省市区域
  264. yield core(date, category, city, account, page_size=page_size)
  265. except Exception as e:
  266. logger.error(e)
  267. except KeyboardInterrupt:
  268. pass
  269. finally:
  270. logger.info('+++ 采集结束 +++')
  271. def history(date_lst: list):
  272. for result in spider(date_lst):
  273. if not result:
  274. break
  275. def start():
  276. date = get_today_of_day(-1)
  277. for result in spider(date, page_size=100):
  278. if not result:
  279. break
  280. if __name__ == '__main__':
  281. start()