# -*- coding: utf-8 -*- """ Created on 2024-10-17 --------- @summary: --------- """ import json import math import random import time from datetime import datetime, timedelta from functools import partial from pathlib import Path import bson import requests from pybloom_live import BloomFilter from pymongo import MongoClient from pymongo.operations import UpdateOne from rgg import net from rgg.account import account_pool from rgg.clean_html import cleaner from rgg.log import logger from rgg.net import send_wechat_warning Int64 = bson.Int64 def get_random(min_value=None, max_value=None): # 定义目标范围 min_value = min_value or 60 max_value = max_value or 600 # 计算对数值的范围 log_min = math.log(min_value, 10) log_max = math.log(max_value, 10) # 生成 log_min 和 log_max 之间的随机数 random_log = random.uniform(log_min, log_max) # 将随机对数值转换回原始范围内的数 result = 10 ** random_log # print(f"生成的随机数 ({min_value}-{max_value}): {result:.2f}") return float(f'{result:.1f}') def delay(a, b, msg=None): secs = get_random(a, b) logger.info(f'{msg if msg else "采集"}延时{secs}秒') time.sleep(secs) def bulk_update_task_status(collection, id_lst, update): """ 批量更新任务状态 :param pymongo.collection.Collection collection: :param id_lst: :param dict update:更新条件 :return: """ count = 0 update_lst = [] for id_ in id_lst: update['updatetime'] = Int64(int(time.time())) # 更新任务时间 update_lst.append(UpdateOne({'_id': id_}, {'$set': update})) if len(update_lst) == 50: results = collection.bulk_write(update_lst, ordered=False) update_lst = [] count += results.modified_count if len(update_lst) > 0: results = collection.bulk_write(update_lst, ordered=False) count += results.modified_count return count def setup_login_info(username): logger.debug('加载登录身份信息...') file = (Path(__file__).parent / f'rgg/account/{username}.json').absolute() with open(file, encoding='utf-8') as rp: json_data = json.load(rp) net.set_cookies(ck=json_data['cookies']) net.set_headers(h=json_data['headers']) net.set_proxies(p=json_data['proxies']) def launch_filter(): """创建布隆过滤器""" logger.debug('创建布隆过滤器...') backup = (Path(__file__).parent / 'rgg/backup') if not backup.exists(): backup.mkdir(exist_ok=True) file = (backup / 'bloomfilter.f') if not file.exists(): file.touch() # 初始创建存储文件 bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1% else: if file.stat().st_size == 0: bf = BloomFilter(capacity=1000000, error_rate=0.001) else: bf = BloomFilter.fromfile(file.open('rb')) return file, bf def fetch_list(coll, bf, keyword, page, page_size, begin_time=None, end_time=None): count = 0 dedup_count = 0 href_lst = [] params = dict(begin_time=begin_time, end_time=end_time) lst = net.download_list(keyword, page, page_size, **params) if lst is None: return False, lst, count, dedup_count lst_data = [] for item in lst: href = item.get('url') if href is None: # print(f'问题数据|{item}') continue if href in bf: logger.debug(f'重复数据|{href}') dedup_count += 1 continue title = item['popTitle'] if 'popTitle' in item else item['showTitle'] publishtime = item['updateTime'] l_np_publishtime = datetime.strptime(publishtime, '%Y-%m-%d').timestamp() addr = str(item['areaName']).split('-') area = addr[0] if len(addr) > 0 else '' city = addr[1] if len(addr) > 1 else '' if '国土' in item.get('progName', ''): channel = item['progName'] else: channel = (item['noticeSegmentTypeName'] or item['progName']) data = { 'site': '千里马', 'channel': channel, 'spidercode': 'sdxzbiddingsjzypc', # 人工补录专用 'area': area, 'city': city, 'district': '', 'href': href, 'title': title, 'publishtime': publishtime, 'l_np_publishtime': Int64(l_np_publishtime), 'comeintime': Int64(int(time.time())), 'isdownload': False, # 是否下载 'isfailed': False, # 是否失败 'keywords': ",".join(item['hiLightKey']), # 查询关键词 } lst_data.append(data) if len(lst_data) == 50: coll.insert_many(lst_data, ordered=False) count += len(lst_data) lst_data = [] href_lst.append(href) if len(lst_data) > 0: coll.insert_many(lst_data, ordered=False) count += len(lst_data) # 添加过滤器 for href in href_lst: bf.add(href) return True, lst, count, dedup_count def auto_page_turning(coll, bf, keywords, page_size=40, **kwargs): page = 1 page_size = 100 if page_size > 100 else page_size while True: ret, *args = fetch_list(coll, bf, keywords, page, page_size, **kwargs) if ret is False: return False ret_lst, count, dedup_count = args logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条') if len(ret_lst) < page_size: break else: page += 1 return True def get_tasks(collection, max_limit=100): now = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) start_ts = int(now.timestamp()) second_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) end_ts = int(second_day.timestamp()) q = {'comeintime': {'$gte': start_ts, '$lt': end_ts}} with collection.find(q) as cursor: rets = [item for item in cursor] rets = rets[:max_limit] if len(rets) > max_limit else rets # 限制每天采集最大条数 rets = list(filter(lambda x: x['isdownload'] is False, rets)) # 仅保留未下载数据 return rets def fetch_detail(collection, tasks, username): results = {'success': [], 'failed': []} insert_lst = [] for task in tasks: href = task['href'] ret = net.download_json(href, referer=False) if ret is False: logger.error(f'账号失效|{username}') return False, results if not ret: results['failed'].append(task['_id']) else: data = { 'site': task['site'], 'channel': task['channel'], 'spidercode': task['spidercode'], 'area': task['area'], 'city': task['city'], 'district': task['district'], 'href': '#', 'competehref': href, 'title': task['title'], 's_title': task['title'], 'contenthtml': ret['content'], 'detail': cleaner(ret['content']), 'publishtime': task['publishtime'], 'l_np_publishtime': task['l_np_publishtime'], 'comeintime': Int64(int(time.time())), 'T': 'bidding', 'infoformat': 1, 'sendflag': 'false', 'iscompete': True, '_d': 'comeintime', 'publishdept': '', 'type': '', 'is_mixed': True } insert_lst.append(data) results['success'].append(task['_id']) if len(insert_lst) == 50: collection.insert_many(insert_lst, ordered=False) insert_lst = [] # delay(45, 540) time.sleep(.5) if len(insert_lst) > 0: collection.insert_many(insert_lst, ordered=False) return True, results def download(lst_coll, detail_coll, bf, keyword, username, **kwargs): # partial 绑定方法 bulk_update = partial(bulk_update_task_status, lst_coll) '''数据采集''' ret = auto_page_turning(lst_coll, bf, keyword, **kwargs) # 列表页下载 if ret is False: return False tasks = get_tasks(lst_coll) # 领取采集任务 state, detail_ret = fetch_detail(detail_coll, tasks, username) # 详情页下载 # 批量更新任务状态 success_ids = detail_ret['success'] if bulk_update(success_ids, {'isdownload': True}): logger.info(f'批量更新|任务状态|成功|数量{len(success_ids)}条') # 批量更新失败任务 failed_ids = detail_ret['failed'] if bulk_update(failed_ids, {'isdownload': True, 'isfailed': True}): logger.info(f'批量更新|任务状态|失败|数量{len(failed_ids)}条]') if state is False: return False def spider(username, keyword, begin_time=None, end_time=None, **kwargs): logger.info('+++ 开始采集 +++') if 'begin_time' not in kwargs: kwargs['begin_time'] = begin_time if 'end_time' not in kwargs: kwargs['end_time'] = end_time # 创建MongoDB连接 # to_mgo = MongoClient('192.168.3.182', 27080) to_mgo = MongoClient('172.17.4.87', 27080) lst_coll = to_mgo['py_spider']['zyjc_qlm_list'] detail_coll = to_mgo['py_spider']['data_bak'] # 创建布隆过滤器 f, bf = launch_filter() try: ret = download(lst_coll, detail_coll, bf, keyword, username, **kwargs) if ret is False: # TODO 或许该退出采集,防止账号被封禁 return False except requests.exceptions.ConnectionError: send_wechat_warning(f'浙移集成|访问失败|代理访问超时|{net.get_proxies("https")}') return except (Exception, BaseException) as e: logger.exception(f'爬虫异常,原因:{e}') return False finally: logger.debug('保存过滤器') bf.tofile(f.open('wb')) # 保存布隆过滤器到本地 logger.info('+++ 采集结束 +++') def is_workhour(start, end): """ 是工作时间? :param int start: 开始时间 :param int end: 结束时间 :return: """ if end < start: raise ValueError('end must be greater than start') if start <= datetime.now().hour <= end: return True return False def main(): username, _ = account_pool.pop(0) setup_login_info(username) keyword = '甄选公告' is_weekday = True skip = False try: while True: if skip: net.send_wechat_warning('浙移集成|数据采集|异常停止') else: now = datetime.now() try: # 每周日17点 执行一次 if now.weekday() == 6 and is_workhour(17, 17): if not is_weekday: is_weekday = True sat = (now - timedelta(1)).strftime('%Y-%m-%d') sun = now.strftime('%Y-%m-%d') ret = spider( username=username, begin_time=sat, end_time=sun, keyword=keyword, page_size=40, ) if ret is False: skip = True # 工作日9点 - 18点 if 0 <= now.weekday() <= 4 and is_workhour(9, 18): is_weekday = False # 重置周末采集标识 start = now.strftime('%Y-%m-%d') ret = spider( username=username, begin_time=start, end_time=start, keyword=keyword, page_size=40, ) if ret is False: skip = True # 账号异常 except Exception as e: logger.exception(e) delay(4200, 7200, "[浙移集成]下轮采集") # 间隔 70分钟 - 120分钟执行1次 except KeyboardInterrupt: pass finally: logger.info('任务结束') if __name__ == '__main__': main()