|
@@ -0,0 +1,268 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on 2024-06-01
|
|
|
+---------
|
|
|
+@summary: 千里马列表页采集
|
|
|
+---------
|
|
|
+@author: Dzr
|
|
|
+"""
|
|
|
+import json
|
|
|
+import math
|
|
|
+import random
|
|
|
+import time
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import requests
|
|
|
+from loguru import logger
|
|
|
+from pybloom_live import BloomFilter
|
|
|
+from pymongo import MongoClient
|
|
|
+from login import auto_login, account_pool
|
|
|
+
|
|
|
+
|
|
|
+_cookies = None
|
|
|
+_headers = None
|
|
|
+_proxies = None
|
|
|
+
|
|
|
+
|
|
|
+def send_wechat_warning(msg, send=True):
|
|
|
+ markdown = f'采集异常中断,请切换d模式处理。'
|
|
|
+ markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
|
|
|
+
|
|
|
+ if not send:
|
|
|
+ logger.info(markdown)
|
|
|
+ return
|
|
|
+
|
|
|
+ url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=079193d8-1856-443e-9f6d-ecc5c883bf11'
|
|
|
+ headers_ = {'Content-Type': 'application/json'}
|
|
|
+ json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown, "mentioned_mobile_list":["17610673271"]}}
|
|
|
+ request_params = dict(headers=headers_, json=json_data, timeout=10)
|
|
|
+ response = requests.post(url, **request_params)
|
|
|
+ logger.info(response.json())
|
|
|
+
|
|
|
+
|
|
|
+def setup_cfg(username):
|
|
|
+ global _cookies, _headers, _proxies
|
|
|
+ file = (Path(__file__).parent / f'account/{username}.json').absolute()
|
|
|
+ with open(file, encoding='utf-8') as rp:
|
|
|
+ json_data = json.load(rp)
|
|
|
+ _cookies = json_data['cookies']
|
|
|
+ _headers = json_data['headers']
|
|
|
+ _proxies = json_data['proxies']
|
|
|
+
|
|
|
+
|
|
|
+def launch_filter():
|
|
|
+ """创建布隆过滤器"""
|
|
|
+ logger.debug('创建布隆过滤器...')
|
|
|
+ backup = (Path(__file__).parent / 'backup')
|
|
|
+ if not backup.exists():
|
|
|
+ backup.mkdir(exist_ok=True)
|
|
|
+
|
|
|
+ file = (backup / 'bloomfilter.f')
|
|
|
+ if not file.exists():
|
|
|
+ file.touch() # 初始创建存储文件
|
|
|
+ bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
|
|
|
+ else:
|
|
|
+ if file.stat().st_size == 0:
|
|
|
+ bf = BloomFilter(capacity=1000000, error_rate=0.001)
|
|
|
+ else:
|
|
|
+ bf = BloomFilter.fromfile(file.open('rb'))
|
|
|
+
|
|
|
+ return file, bf
|
|
|
+
|
|
|
+
|
|
|
+def fetch(collection, username, page, page_size, keywords, bf):
|
|
|
+ # rest/service/website/search/solr -> cookies
|
|
|
+ global _cookies, _headers, _proxies
|
|
|
+ response = None
|
|
|
+ try:
|
|
|
+ json_data = {
|
|
|
+ 'keywords': keywords,
|
|
|
+ 'timeType': 4,
|
|
|
+ 'beginTime': '2024-09-01',
|
|
|
+ 'endTime': '2024-09-30',
|
|
|
+ 'filtermode': '8',
|
|
|
+ 'searchMode': 1,
|
|
|
+ 'currentPage': page,
|
|
|
+ 'numPerPage': page_size,
|
|
|
+ 'sortType': '1',
|
|
|
+ 'allType': -1,
|
|
|
+ 'beginAmount': '',
|
|
|
+ 'endAmount': '',
|
|
|
+ 'purchasingUnitIdList': '',
|
|
|
+ 'threeClassifyTagStr': '',
|
|
|
+ 'fourLevelCategoryIdListStr': '',
|
|
|
+ 'threeLevelCategoryIdListStr': '',
|
|
|
+ 'levelId': '',
|
|
|
+ 'tab': 2,
|
|
|
+ 'types': '-1',
|
|
|
+ 'searchDataType': 1,
|
|
|
+ 'showContent': 1,
|
|
|
+ 'hasLinkName': '',
|
|
|
+ 'newAreas': '',
|
|
|
+ 'hasChooseSortType': 1,
|
|
|
+ 'progIdAndNoticeSegmentTypeMaps': {
|
|
|
+ '3': [],
|
|
|
+ '4': [
|
|
|
+ 11,
|
|
|
+ 12,
|
|
|
+ ],
|
|
|
+ '5': [],
|
|
|
+ },
|
|
|
+ 'summaryType': 1,
|
|
|
+ }
|
|
|
+
|
|
|
+ response = requests.post(
|
|
|
+ 'https://search.vip.qianlima.com/rest/service/website/search/solr',
|
|
|
+ cookies=_cookies,
|
|
|
+ headers=_headers,
|
|
|
+ json=json_data,
|
|
|
+ proxies=_proxies,
|
|
|
+ timeout=60
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+ result = response.json()
|
|
|
+ try:
|
|
|
+ total = result['data']['rowCount']
|
|
|
+ if total > 500:
|
|
|
+ # 丢弃不要
|
|
|
+ return True, total, page_size
|
|
|
+
|
|
|
+ except TypeError:
|
|
|
+ return False, -1, 0
|
|
|
+
|
|
|
+ data = result['data']['data']
|
|
|
+
|
|
|
+ dedup_count = 0
|
|
|
+ count = 0
|
|
|
+ insert_lst = []
|
|
|
+ for item in data:
|
|
|
+ href = item.get('url')
|
|
|
+ if href is None or href in bf:
|
|
|
+ dedup_count += 1
|
|
|
+ # logger.debug(f'重复数据[{href}]')
|
|
|
+ continue
|
|
|
+
|
|
|
+ insert_lst.append(item)
|
|
|
+ if len(insert_lst) == page_size:
|
|
|
+ collection.insert_many(insert_lst, ordered=False)
|
|
|
+ count += len(insert_lst)
|
|
|
+ insert_lst = []
|
|
|
+
|
|
|
+ bf.add(href)
|
|
|
+
|
|
|
+ if len(insert_lst) > 0:
|
|
|
+ collection.insert_many(insert_lst, ordered=False)
|
|
|
+ count += len(insert_lst)
|
|
|
+
|
|
|
+ logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条')
|
|
|
+ return True, total, len(data)
|
|
|
+
|
|
|
+ except AssertionError:
|
|
|
+ logger.error(f'{username}|账号异常|请求失败')
|
|
|
+ # send_wechat_warning(msg=response.content.decode())
|
|
|
+ return False, -2, 0
|
|
|
+
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.exception(f'网络请求错误, 原因:{e}')
|
|
|
+ return False, -3, 0
|
|
|
+
|
|
|
+
|
|
|
+def spider(username, tasks, bf, to_data_lst, coll):
|
|
|
+ setup_cfg(username)
|
|
|
+
|
|
|
+ while tasks:
|
|
|
+ page = 1
|
|
|
+ page_size = 100
|
|
|
+
|
|
|
+ # 翻页
|
|
|
+ state = 1
|
|
|
+ retries = 0
|
|
|
+ isdownload = True
|
|
|
+ _id, keywords = tasks.pop()
|
|
|
+ while True:
|
|
|
+ ok, total, count = fetch(coll, username, page, page_size, keywords, bf)
|
|
|
+ if ok is False:
|
|
|
+ state = total
|
|
|
+ if state == -1:
|
|
|
+ logger.info(f'{username}|请求参数错误|修改参数')
|
|
|
+ return False
|
|
|
+ elif state == -2:
|
|
|
+ logger.info(f'{username}|访问频繁|3秒后切换账号')
|
|
|
+ time.sleep(3)
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ logger.error(f'{username}|网络异常|准备重试~{retries}')
|
|
|
+ if retries > 3:
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ retries += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # time.sleep(math.log(random.randint(100, 2400), 2))
|
|
|
+ time.sleep(.5)
|
|
|
+
|
|
|
+ if ok is True and total >= 500:
|
|
|
+ logger.error(f'采集完成|{keywords}|疑似模糊匹配|跳过采集')
|
|
|
+ isdownload = False
|
|
|
+ break
|
|
|
+
|
|
|
+ if ok is True and count < page_size:
|
|
|
+ logger.info(f'采集完成|{keywords}|保存{total}条')
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ page += 1
|
|
|
+
|
|
|
+ # 更新任务状态
|
|
|
+ if state >= 0:
|
|
|
+ to_data_lst.update_one(
|
|
|
+ {'_id': _id},
|
|
|
+ {
|
|
|
+ '$set': {
|
|
|
+ 'b_isdownload': isdownload,
|
|
|
+ 'i_total': total,
|
|
|
+ 'i_pages': page,
|
|
|
+ 'i_state': state,
|
|
|
+ 'i_updatetime': int(time.time())
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ f, bf = launch_filter() # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
|
|
|
+
|
|
|
+ client = MongoClient('192.168.3.182', 27017)
|
|
|
+ to_data_lst = client['31zg_poc']['keyword_company']
|
|
|
+ coll = client['31zg_poc']['qlm_data_lst']
|
|
|
+
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ q = {'b_isdownload': None}
|
|
|
+ p = {'s_keyword': 1, '_id': 1}
|
|
|
+ with to_data_lst.find(q, projection=p, limit=50) as cursor:
|
|
|
+ tasks = [(item['_id'], item['s_keyword']) for item in cursor]
|
|
|
+
|
|
|
+ username, password = account_pool.pop(0)
|
|
|
+ auto_login(username, password, proxy=True, headless=True, auto_quit=True)
|
|
|
+ state = spider(username, tasks, bf, to_data_lst, coll)
|
|
|
+ if state is True:
|
|
|
+ account_pool.append((username, password))
|
|
|
+
|
|
|
+ if state is False:
|
|
|
+ break
|
|
|
+
|
|
|
+ if not to_data_lst.count_documents(q):
|
|
|
+ break
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ pass
|
|
|
+
|
|
|
+ finally:
|
|
|
+ bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
|
|
|
+ logger.info('采集结束')
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|