采集列表页(地域).py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-06-01
  4. ---------
  5. @summary: 千里马列表页采集
  6. ---------
  7. @author: Dzr
  8. """
  9. import json
  10. import math
  11. import random
  12. import time
  13. from pathlib import Path
  14. import requests
  15. from loguru import logger
  16. from pybloom_live import BloomFilter
  17. from pymongo import MongoClient
  18. from login import auto_login, account_pool
  19. _cookies = None
  20. _headers = None
  21. _proxies = None
  22. def send_wechat_warning(msg, send=True):
  23. markdown = f'采集异常中断,请切换d模式处理。'
  24. markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
  25. if not send:
  26. logger.info(markdown)
  27. return
  28. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
  29. headers_ = {'Content-Type': 'application/json'}
  30. json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown, "mentioned_mobile_list":["17610673271"]}}
  31. request_params = dict(headers=headers_, json=json_data, timeout=10)
  32. response = requests.post(url, **request_params)
  33. logger.info(response.json())
  34. def setup_cfg(username):
  35. global _cookies, _headers, _proxies
  36. file = (Path(__file__).parent / f'account/{username}.json').absolute()
  37. with open(file, encoding='utf-8') as rp:
  38. json_data = json.load(rp)
  39. _cookies = json_data['cookies']
  40. _headers = json_data['headers']
  41. _proxies = json_data['proxies']
  42. def launch_filter():
  43. """创建布隆过滤器"""
  44. logger.debug('创建布隆过滤器...')
  45. backup = (Path(__file__).parent / 'backup')
  46. if not backup.exists():
  47. backup.mkdir(exist_ok=True)
  48. file = (backup / 'bloomfilter.f')
  49. if not file.exists():
  50. file.touch() # 初始创建存储文件
  51. bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
  52. else:
  53. if file.stat().st_size == 0:
  54. bf = BloomFilter(capacity=1000000, error_rate=0.001)
  55. else:
  56. bf = BloomFilter.fromfile(file.open('rb'))
  57. return file, bf
  58. def fetch(collection, username, page, page_size, channel, bf):
  59. # rest/service/website/search/solr -> cookies
  60. global _cookies, _headers, _proxies
  61. response = None
  62. try:
  63. if channel == '中标信息':
  64. json_data = {
  65. 'keywords': '',
  66. 'timeType': 4,
  67. 'beginTime': '2024-09-01',
  68. 'endTime': '2024-09-30',
  69. 'filtermode': '8',
  70. 'searchMode': 1,
  71. 'currentPage': page,
  72. 'numPerPage': page_size,
  73. 'sortType': 1,
  74. 'allType': 3,
  75. 'beginAmount': '',
  76. 'endAmount': '',
  77. 'purchasingUnitIdList': '',
  78. 'threeClassifyTagStr': '',
  79. 'fourLevelCategoryIdListStr': '',
  80. 'threeLevelCategoryIdListStr': '',
  81. 'levelId': '',
  82. 'tab': 0,
  83. 'searchDataType': 1,
  84. 'types': 3,
  85. 'showContent': 1,
  86. 'newAreas': '1744',
  87. 'hasChooseSortType': 1,
  88. 'progIdAndNoticeSegmentTypeMaps': {
  89. '3': [],
  90. },
  91. 'summaryType': 0,
  92. }
  93. elif channel == '招标信息':
  94. json_data = {
  95. 'keywords': '',
  96. 'timeType': 4,
  97. 'beginTime': '2024-09-01',
  98. 'endTime': '2024-09-30',
  99. 'filtermode': '8',
  100. 'searchMode': 1,
  101. 'currentPage': page,
  102. 'numPerPage': page_size,
  103. 'sortType': 1,
  104. 'allType': 0,
  105. 'beginAmount': '',
  106. 'endAmount': '',
  107. 'purchasingUnitIdList': '',
  108. 'threeClassifyTagStr': '',
  109. 'fourLevelCategoryIdListStr': '',
  110. 'threeLevelCategoryIdListStr': '',
  111. 'levelId': '',
  112. 'tab': 0,
  113. 'searchDataType': 1,
  114. 'types': -1,
  115. 'showContent': 1,
  116. 'newAreas': '1744',
  117. 'hasChooseSortType': 1,
  118. 'progIdAndNoticeSegmentTypeMaps': {
  119. '0': [],
  120. '1': [],
  121. },
  122. 'summaryType': 0,
  123. }
  124. else:
  125. pass
  126. response = requests.post(
  127. 'https://search.vip.qianlima.com/rest/service/website/search/solr',
  128. cookies=_cookies,
  129. headers=_headers,
  130. json=json_data,
  131. proxies=_proxies,
  132. timeout=60
  133. )
  134. assert response.status_code == 200
  135. result = response.json()
  136. try:
  137. total = result['data']['rowCount']
  138. except TypeError:
  139. return False, -1, 0
  140. dedup_count = 0
  141. count = 0
  142. insert_lst = []
  143. data = result['data']['data']
  144. for item in data:
  145. href = item.get('url')
  146. if href is None or href in bf:
  147. dedup_count += 1
  148. # logger.debug(f'重复数据[{href}]')
  149. continue
  150. item['channel'] = channel
  151. insert_lst.append(item)
  152. if len(insert_lst) == page_size:
  153. collection.insert_many(insert_lst, ordered=False)
  154. count += len(insert_lst)
  155. insert_lst = []
  156. bf.add(href)
  157. if len(insert_lst) > 0:
  158. collection.insert_many(insert_lst, ordered=False)
  159. count += len(insert_lst)
  160. logger.info(f'自动翻页|第{page}页|入库{count}条|重复{dedup_count}条')
  161. return True, total, len(data)
  162. except AssertionError:
  163. logger.error(f'{username}|账号异常|请求失败')
  164. # send_wechat_warning(msg=response.content.decode())
  165. return False, -2, 0
  166. except requests.exceptions.RequestException as e:
  167. logger.exception(f'网络请求错误, 原因:{e}')
  168. return False, -3, 0
  169. def spider(username, bf, coll, channel):
  170. setup_cfg(username)
  171. page = 1
  172. page_size = 100
  173. # 翻页
  174. retries = 0
  175. while True:
  176. ok, total, count = fetch(coll, username, page, page_size, channel, bf)
  177. if ok is False:
  178. state = total
  179. if state == -1:
  180. logger.info(f'{username}|请求参数错误|修改参数')
  181. return False
  182. elif state == -2:
  183. logger.info(f'{username}|访问频繁|3秒后切换账号')
  184. time.sleep(3)
  185. return
  186. else:
  187. logger.error(f'{username}|网络异常|准备重试~{retries}')
  188. if retries > 3:
  189. return
  190. else:
  191. retries += 1
  192. continue
  193. # time.sleep(math.log(random.randint(100, 2400), 2))
  194. time.sleep(.5)
  195. if ok is True and count < page_size:
  196. logger.info(f'采集完成|保存{total}条')
  197. break
  198. else:
  199. page += 1
  200. return True
  201. def main():
  202. f, bf = launch_filter() # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
  203. client = MongoClient('192.168.3.182', 27017)
  204. coll = client['sdlt_poc']['qlm_data_lst']
  205. # channel = '招标信息'
  206. channel = '中标信息'
  207. try:
  208. username, password = account_pool.pop(0)
  209. auto_login(username, password, proxy=True, headless=True, auto_quit=True)
  210. spider(username, bf, coll, channel)
  211. except KeyboardInterrupt:
  212. pass
  213. finally:
  214. bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
  215. logger.info('采集结束')
  216. if __name__ == '__main__':
  217. main()