采集列表页.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-06-01
  4. ---------
  5. @summary: 千里马列表页采集
  6. ---------
  7. @author: Dzr
  8. """
  9. import json
  10. import math
  11. import random
  12. import time
  13. from pathlib import Path
  14. import requests
  15. from loguru import logger
  16. from pybloom_live import BloomFilter
  17. from pymongo import MongoClient
  18. from login import auto_login, account_pool
  19. _cookies = None
  20. _headers = None
  21. _proxies = None
  22. def send_wechat_warning(msg, send=True):
  23. markdown = f'采集异常中断,请切换d模式处理。'
  24. markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
  25. if not send:
  26. logger.info(markdown)
  27. return
  28. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
  29. headers_ = {'Content-Type': 'application/json'}
  30. json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown, "mentioned_mobile_list":["17610673271"]}}
  31. request_params = dict(headers=headers_, json=json_data, timeout=10)
  32. response = requests.post(url, **request_params)
  33. logger.info(response.json())
  34. def setup_cfg(username):
  35. global _cookies, _headers, _proxies
  36. file = (Path(__file__).parent / f'account/{username}.json').absolute()
  37. with open(file, encoding='utf-8') as rp:
  38. json_data = json.load(rp)
  39. _cookies = json_data['cookies']
  40. _headers = json_data['headers']
  41. _proxies = json_data['proxies']
  42. def launch_filter():
  43. """创建布隆过滤器"""
  44. logger.debug('创建布隆过滤器...')
  45. backup = (Path(__file__).parent / 'backup')
  46. if not backup.exists():
  47. backup.mkdir(exist_ok=True)
  48. file = (backup / 'bloomfilter.f')
  49. if not file.exists():
  50. file.touch() # 初始创建存储文件
  51. bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
  52. else:
  53. if file.stat().st_size == 0:
  54. bf = BloomFilter(capacity=1000000, error_rate=0.001)
  55. else:
  56. bf = BloomFilter.fromfile(file.open('rb'))
  57. return file, bf
  58. def fetch(collection, username, page, page_size, keywords, bf):
  59. # rest/service/website/search/solr -> cookies
  60. global _cookies, _headers, _proxies
  61. response = None
  62. try:
  63. json_data = {
  64. 'keywords': keywords,
  65. 'timeType': 4,
  66. 'beginTime': '2024-09-01',
  67. 'endTime': '2024-09-30',
  68. 'filtermode': '8',
  69. 'searchMode': 1,
  70. 'currentPage': page,
  71. 'numPerPage': page_size,
  72. 'sortType': '1',
  73. 'allType': -1,
  74. 'beginAmount': '',
  75. 'endAmount': '',
  76. 'purchasingUnitIdList': '',
  77. 'threeClassifyTagStr': '',
  78. 'fourLevelCategoryIdListStr': '',
  79. 'threeLevelCategoryIdListStr': '',
  80. 'levelId': '',
  81. 'tab': 2,
  82. 'types': '-1',
  83. 'searchDataType': 1,
  84. 'showContent': 1,
  85. 'hasLinkName': '',
  86. 'newAreas': '',
  87. 'hasChooseSortType': 1,
  88. 'progIdAndNoticeSegmentTypeMaps': {
  89. '3': [],
  90. '4': [
  91. 11,
  92. 12,
  93. ],
  94. '5': [],
  95. },
  96. 'summaryType': 1,
  97. }
  98. response = requests.post(
  99. 'https://search.vip.qianlima.com/rest/service/website/search/solr',
  100. cookies=_cookies,
  101. headers=_headers,
  102. json=json_data,
  103. proxies=_proxies,
  104. timeout=60
  105. )
  106. assert response.status_code == 200
  107. result = response.json()
  108. try:
  109. total = result['data']['rowCount']
  110. if total > 500:
  111. # 丢弃不要
  112. return True, total, page_size
  113. except TypeError:
  114. return False, -1, 0
  115. data = result['data']['data']
  116. dedup_count = 0
  117. count = 0
  118. insert_lst = []
  119. for item in data:
  120. href = item.get('url')
  121. if href is None or href in bf:
  122. dedup_count += 1
  123. # logger.debug(f'重复数据[{href}]')
  124. continue
  125. insert_lst.append(item)
  126. if len(insert_lst) == page_size:
  127. collection.insert_many(insert_lst, ordered=False)
  128. count += len(insert_lst)
  129. insert_lst = []
  130. bf.add(href)
  131. if len(insert_lst) > 0:
  132. collection.insert_many(insert_lst, ordered=False)
  133. count += len(insert_lst)
  134. logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条')
  135. return True, total, len(data)
  136. except AssertionError:
  137. logger.error(f'{username}|账号异常|请求失败')
  138. # send_wechat_warning(msg=response.content.decode())
  139. return False, -2, 0
  140. except requests.exceptions.RequestException as e:
  141. logger.exception(f'网络请求错误, 原因:{e}')
  142. return False, -3, 0
  143. def spider(username, tasks, bf, to_data_lst, coll):
  144. setup_cfg(username)
  145. while tasks:
  146. page = 1
  147. page_size = 100
  148. # 翻页
  149. state = 1
  150. retries = 0
  151. isdownload = True
  152. _id, keywords = tasks.pop()
  153. while True:
  154. ok, total, count = fetch(coll, username, page, page_size, keywords, bf)
  155. if ok is False:
  156. state = total
  157. if state == -1:
  158. logger.info(f'{username}|请求参数错误|修改参数')
  159. return False
  160. elif state == -2:
  161. logger.info(f'{username}|访问频繁|3秒后切换账号')
  162. time.sleep(3)
  163. return
  164. else:
  165. logger.error(f'{username}|网络异常|准备重试~{retries}')
  166. if retries > 3:
  167. return
  168. else:
  169. retries += 1
  170. continue
  171. # time.sleep(math.log(random.randint(100, 2400), 2))
  172. time.sleep(.5)
  173. if ok is True and total >= 500:
  174. logger.error(f'采集完成|{keywords}|疑似模糊匹配|跳过采集')
  175. isdownload = False
  176. break
  177. if ok is True and count < page_size:
  178. logger.info(f'采集完成|{keywords}|保存{total}条')
  179. break
  180. else:
  181. page += 1
  182. # 更新任务状态
  183. if state >= 0:
  184. to_data_lst.update_one(
  185. {'_id': _id},
  186. {
  187. '$set': {
  188. 'b_isdownload': isdownload,
  189. 'i_total': total,
  190. 'i_pages': page,
  191. 'i_state': state,
  192. 'i_updatetime': int(time.time())
  193. }
  194. }
  195. )
  196. return True
  197. def main():
  198. f, bf = launch_filter() # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
  199. client = MongoClient('192.168.3.182', 27017)
  200. to_data_lst = client['31zg_poc']['keyword_company']
  201. coll = client['31zg_poc']['qlm_data_lst']
  202. try:
  203. while True:
  204. q = {'b_isdownload': None}
  205. p = {'s_keyword': 1, '_id': 1}
  206. with to_data_lst.find(q, projection=p, limit=50) as cursor:
  207. tasks = [(item['_id'], item['s_keyword']) for item in cursor]
  208. username, password = account_pool.pop(0)
  209. auto_login(username, password, proxy=True, headless=True, auto_quit=True)
  210. state = spider(username, tasks, bf, to_data_lst, coll)
  211. if state is True:
  212. account_pool.append((username, password))
  213. if state is False:
  214. break
  215. if not to_data_lst.count_documents(q):
  216. break
  217. except KeyboardInterrupt:
  218. pass
  219. finally:
  220. bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
  221. logger.info('采集结束')
  222. if __name__ == '__main__':
  223. main()