# -*- coding: utf-8 -*- """ Created on 2024-06-01 --------- @summary: 千里马列表页采集 --------- @author: Dzr """ import json import math import random import time from pathlib import Path import requests from loguru import logger from pybloom_live import BloomFilter from pymongo import MongoClient from login import auto_login, account_pool _cookies = None _headers = None _proxies = None def send_wechat_warning(msg, send=True): markdown = f'采集异常中断,请切换d模式处理。' markdown += f'\n>异常详情:**{msg}**' if not send: logger.info(markdown) return url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11' headers_ = {'Content-Type': 'application/json'} json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown, "mentioned_mobile_list":["17610673271"]}} request_params = dict(headers=headers_, json=json_data, timeout=10) response = requests.post(url, **request_params) logger.info(response.json()) def setup_cfg(username): global _cookies, _headers, _proxies file = (Path(__file__).parent / f'account/{username}.json').absolute() with open(file, encoding='utf-8') as rp: json_data = json.load(rp) _cookies = json_data['cookies'] _headers = json_data['headers'] _proxies = json_data['proxies'] def launch_filter(): """创建布隆过滤器""" logger.debug('创建布隆过滤器...') backup = (Path(__file__).parent / 'backup') if not backup.exists(): backup.mkdir(exist_ok=True) file = (backup / 'bloomfilter.f') if not file.exists(): file.touch() # 初始创建存储文件 bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1% else: if file.stat().st_size == 0: bf = BloomFilter(capacity=1000000, error_rate=0.001) else: bf = BloomFilter.fromfile(file.open('rb')) return file, bf def fetch(collection, username, page, page_size, keywords, bf): # rest/service/website/search/solr -> cookies global _cookies, _headers, _proxies response = None try: json_data = { 'keywords': keywords, 'timeType': 4, 'beginTime': '2024-09-01', 'endTime': '2024-09-30', 'filtermode': '8', 'searchMode': 1, 'currentPage': page, 'numPerPage': page_size, 'sortType': '1', 'allType': -1, 'beginAmount': '', 'endAmount': '', 'purchasingUnitIdList': '', 'threeClassifyTagStr': '', 'fourLevelCategoryIdListStr': '', 'threeLevelCategoryIdListStr': '', 'levelId': '', 'tab': 2, 'types': '-1', 'searchDataType': 1, 'showContent': 1, 'hasLinkName': '', 'newAreas': '', 'hasChooseSortType': 1, 'progIdAndNoticeSegmentTypeMaps': { '3': [], '4': [ 11, 12, ], '5': [], }, 'summaryType': 1, } response = requests.post( 'https://search.vip.qianlima.com/rest/service/website/search/solr', cookies=_cookies, headers=_headers, json=json_data, proxies=_proxies, timeout=60 ) assert response.status_code == 200 result = response.json() try: total = result['data']['rowCount'] if total > 500: # 丢弃不要 return True, total, page_size except TypeError: return False, -1, 0 data = result['data']['data'] dedup_count = 0 count = 0 insert_lst = [] for item in data: href = item.get('url') if href is None or href in bf: dedup_count += 1 # logger.debug(f'重复数据[{href}]') continue insert_lst.append(item) if len(insert_lst) == page_size: collection.insert_many(insert_lst, ordered=False) count += len(insert_lst) insert_lst = [] bf.add(href) if len(insert_lst) > 0: collection.insert_many(insert_lst, ordered=False) count += len(insert_lst) logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条') return True, total, len(data) except AssertionError: logger.error(f'{username}|账号异常|请求失败') # send_wechat_warning(msg=response.content.decode()) return False, -2, 0 except requests.exceptions.RequestException as e: logger.exception(f'网络请求错误, 原因:{e}') return False, -3, 0 def spider(username, tasks, bf, to_data_lst, coll): setup_cfg(username) while tasks: page = 1 page_size = 100 # 翻页 state = 1 retries = 0 isdownload = True _id, keywords = tasks.pop() while True: ok, total, count = fetch(coll, username, page, page_size, keywords, bf) if ok is False: state = total if state == -1: logger.info(f'{username}|请求参数错误|修改参数') return False elif state == -2: logger.info(f'{username}|访问频繁|3秒后切换账号') time.sleep(3) return else: logger.error(f'{username}|网络异常|准备重试~{retries}') if retries > 3: return else: retries += 1 continue # time.sleep(math.log(random.randint(100, 2400), 2)) time.sleep(.5) if ok is True and total >= 500: logger.error(f'采集完成|{keywords}|疑似模糊匹配|跳过采集') isdownload = False break if ok is True and count < page_size: logger.info(f'采集完成|{keywords}|保存{total}条') break else: page += 1 # 更新任务状态 if state >= 0: to_data_lst.update_one( {'_id': _id}, { '$set': { 'b_isdownload': isdownload, 'i_total': total, 'i_pages': page, 'i_state': state, 'i_updatetime': int(time.time()) } } ) return True def main(): f, bf = launch_filter() # 创建布隆过滤器,预计插入100万个元素,错误率0.1% client = MongoClient('192.168.3.182', 27017) to_data_lst = client['31zg_poc']['keyword_company'] coll = client['31zg_poc']['qlm_data_lst'] try: while True: q = {'b_isdownload': None} p = {'s_keyword': 1, '_id': 1} with to_data_lst.find(q, projection=p, limit=50) as cursor: tasks = [(item['_id'], item['s_keyword']) for item in cursor] username, password = account_pool.pop(0) auto_login(username, password, proxy=True, headless=True, auto_quit=True) state = spider(username, tasks, bf, to_data_lst, coll) if state is True: account_pool.append((username, password)) if state is False: break if not to_data_lst.count_documents(q): break except KeyboardInterrupt: pass finally: bf.tofile(f.open('wb')) # 保存布隆过滤器到本地 logger.info('采集结束') if __name__ == '__main__': main()