123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-06-01
- ---------
- @summary: 千里马列表页采集
- ---------
- @author: Dzr
- """
- import json
- import math
- import random
- import time
- from pathlib import Path
- import requests
- from loguru import logger
- from pybloom_live import BloomFilter
- from pymongo import MongoClient
- from login import auto_login, account_pool
- _cookies = None
- _headers = None
- _proxies = None
- def send_wechat_warning(msg, send=True):
- markdown = f'采集异常中断,请切换d模式处理。'
- markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
- if not send:
- logger.info(markdown)
- return
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
- headers_ = {'Content-Type': 'application/json'}
- json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown, "mentioned_mobile_list":["17610673271"]}}
- request_params = dict(headers=headers_, json=json_data, timeout=10)
- response = requests.post(url, **request_params)
- logger.info(response.json())
- def setup_cfg(username):
- global _cookies, _headers, _proxies
- file = (Path(__file__).parent / f'account/{username}.json').absolute()
- with open(file, encoding='utf-8') as rp:
- json_data = json.load(rp)
- _cookies = json_data['cookies']
- _headers = json_data['headers']
- _proxies = json_data['proxies']
- def launch_filter():
- """创建布隆过滤器"""
- logger.debug('创建布隆过滤器...')
- backup = (Path(__file__).parent / 'backup')
- if not backup.exists():
- backup.mkdir(exist_ok=True)
- file = (backup / 'bloomfilter.f')
- if not file.exists():
- file.touch() # 初始创建存储文件
- bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
- else:
- if file.stat().st_size == 0:
- bf = BloomFilter(capacity=1000000, error_rate=0.001)
- else:
- bf = BloomFilter.fromfile(file.open('rb'))
- return file, bf
- def fetch(collection, username, page, page_size, channel, bf):
- # rest/service/website/search/solr -> cookies
- global _cookies, _headers, _proxies
- response = None
- try:
- if channel == '中标信息':
- json_data = {
- 'keywords': '',
- 'timeType': 4,
- 'beginTime': '2024-09-01',
- 'endTime': '2024-09-30',
- 'filtermode': '8',
- 'searchMode': 1,
- 'currentPage': page,
- 'numPerPage': page_size,
- 'sortType': 1,
- 'allType': 3,
- 'beginAmount': '',
- 'endAmount': '',
- 'purchasingUnitIdList': '',
- 'threeClassifyTagStr': '',
- 'fourLevelCategoryIdListStr': '',
- 'threeLevelCategoryIdListStr': '',
- 'levelId': '',
- 'tab': 0,
- 'searchDataType': 1,
- 'types': 3,
- 'showContent': 1,
- 'newAreas': '1744',
- 'hasChooseSortType': 1,
- 'progIdAndNoticeSegmentTypeMaps': {
- '3': [],
- },
- 'summaryType': 0,
- }
- elif channel == '招标信息':
- json_data = {
- 'keywords': '',
- 'timeType': 4,
- 'beginTime': '2024-09-01',
- 'endTime': '2024-09-30',
- 'filtermode': '8',
- 'searchMode': 1,
- 'currentPage': page,
- 'numPerPage': page_size,
- 'sortType': 1,
- 'allType': 0,
- 'beginAmount': '',
- 'endAmount': '',
- 'purchasingUnitIdList': '',
- 'threeClassifyTagStr': '',
- 'fourLevelCategoryIdListStr': '',
- 'threeLevelCategoryIdListStr': '',
- 'levelId': '',
- 'tab': 0,
- 'searchDataType': 1,
- 'types': -1,
- 'showContent': 1,
- 'newAreas': '1744',
- 'hasChooseSortType': 1,
- 'progIdAndNoticeSegmentTypeMaps': {
- '0': [],
- '1': [],
- },
- 'summaryType': 0,
- }
- else:
- pass
- response = requests.post(
- 'https://search.vip.qianlima.com/rest/service/website/search/solr',
- cookies=_cookies,
- headers=_headers,
- json=json_data,
- proxies=_proxies,
- timeout=60
- )
- assert response.status_code == 200
- result = response.json()
- try:
- total = result['data']['rowCount']
- except TypeError:
- return False, -1, 0
- dedup_count = 0
- count = 0
- insert_lst = []
- data = result['data']['data']
- for item in data:
- href = item.get('url')
- if href is None or href in bf:
- dedup_count += 1
- # logger.debug(f'重复数据[{href}]')
- continue
- item['channel'] = channel
- insert_lst.append(item)
- if len(insert_lst) == page_size:
- collection.insert_many(insert_lst, ordered=False)
- count += len(insert_lst)
- insert_lst = []
- bf.add(href)
- if len(insert_lst) > 0:
- collection.insert_many(insert_lst, ordered=False)
- count += len(insert_lst)
- logger.info(f'自动翻页|第{page}页|入库{count}条|重复{dedup_count}条')
- return True, total, len(data)
- except AssertionError:
- logger.error(f'{username}|账号异常|请求失败')
- # send_wechat_warning(msg=response.content.decode())
- return False, -2, 0
- except requests.exceptions.RequestException as e:
- logger.exception(f'网络请求错误, 原因:{e}')
- return False, -3, 0
- def spider(username, bf, coll, channel):
- setup_cfg(username)
- page = 1
- page_size = 100
- # 翻页
- retries = 0
- while True:
- ok, total, count = fetch(coll, username, page, page_size, channel, bf)
- if ok is False:
- state = total
- if state == -1:
- logger.info(f'{username}|请求参数错误|修改参数')
- return False
- elif state == -2:
- logger.info(f'{username}|访问频繁|3秒后切换账号')
- time.sleep(3)
- return
- else:
- logger.error(f'{username}|网络异常|准备重试~{retries}')
- if retries > 3:
- return
- else:
- retries += 1
- continue
- # time.sleep(math.log(random.randint(100, 2400), 2))
- time.sleep(.5)
- if ok is True and count < page_size:
- logger.info(f'采集完成|保存{total}条')
- break
- else:
- page += 1
- return True
- def main():
- f, bf = launch_filter() # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
- client = MongoClient('192.168.3.182', 27017)
- coll = client['sdlt_poc']['qlm_data_lst']
- # channel = '招标信息'
- channel = '中标信息'
- try:
- username, password = account_pool.pop(0)
- auto_login(username, password, proxy=True, headless=True, auto_quit=True)
- spider(username, bf, coll, channel)
- except KeyboardInterrupt:
- pass
- finally:
- bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
- logger.info('采集结束')
- if __name__ == '__main__':
- main()
|