123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-10-17
- ---------
- @summary:
- ---------
- """
- import json
- import math
- import random
- import time
- from datetime import datetime, timedelta
- from functools import partial
- from pathlib import Path
- import bson
- import requests
- from pybloom_live import BloomFilter
- from pymongo import MongoClient
- from pymongo.operations import UpdateOne
- from rgg import net
- from rgg.account import account_pool
- from rgg.clean_html import cleaner
- from rgg.log import logger
- from rgg.net import send_wechat_warning
- Int64 = bson.Int64
- def get_random(min_value=None, max_value=None):
- # 定义目标范围
- min_value = min_value or 60
- max_value = max_value or 600
- # 计算对数值的范围
- log_min = math.log(min_value, 10)
- log_max = math.log(max_value, 10)
- # 生成 log_min 和 log_max 之间的随机数
- random_log = random.uniform(log_min, log_max)
- # 将随机对数值转换回原始范围内的数
- result = 10 ** random_log
- # print(f"生成的随机数 ({min_value}-{max_value}): {result:.2f}")
- return float(f'{result:.1f}')
- def delay(a, b, msg=None):
- secs = get_random(a, b)
- logger.info(f'{msg if msg else "采集"}延时{secs}秒')
- time.sleep(secs)
- def bulk_update_task_status(collection, id_lst, update):
- """
- 批量更新任务状态
- :param pymongo.collection.Collection collection:
- :param id_lst:
- :param dict update:更新条件
- :return:
- """
- count = 0
- update_lst = []
- for id_ in id_lst:
- update['updatetime'] = Int64(int(time.time())) # 更新任务时间
- update_lst.append(UpdateOne({'_id': id_}, {'$set': update}))
- if len(update_lst) == 50:
- results = collection.bulk_write(update_lst, ordered=False)
- update_lst = []
- count += results.modified_count
- if len(update_lst) > 0:
- results = collection.bulk_write(update_lst, ordered=False)
- count += results.modified_count
- return count
- def setup_login_info(username):
- logger.debug('加载登录身份信息...')
- file = (Path(__file__).parent / f'rgg/account/{username}.json').absolute()
- with open(file, encoding='utf-8') as rp:
- json_data = json.load(rp)
- net.set_cookies(ck=json_data['cookies'])
- net.set_headers(h=json_data['headers'])
- net.set_proxies(p=json_data['proxies'])
- def launch_filter():
- """创建布隆过滤器"""
- logger.debug('创建布隆过滤器...')
- backup = (Path(__file__).parent / 'rgg/backup')
- if not backup.exists():
- backup.mkdir(exist_ok=True)
- file = (backup / 'bloomfilter.f')
- if not file.exists():
- file.touch() # 初始创建存储文件
- bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
- else:
- if file.stat().st_size == 0:
- bf = BloomFilter(capacity=1000000, error_rate=0.001)
- else:
- bf = BloomFilter.fromfile(file.open('rb'))
- return file, bf
- def fetch_list(coll, bf, keyword, page, page_size, begin_time=None, end_time=None):
- count = 0
- dedup_count = 0
- href_lst = []
- params = dict(begin_time=begin_time, end_time=end_time)
- lst = net.download_list(keyword, page, page_size, **params)
- if lst is None:
- return False, lst, count, dedup_count
- lst_data = []
- for item in lst:
- href = item.get('url')
- if href is None:
- # print(f'问题数据|{item}')
- continue
- if href in bf:
- logger.debug(f'重复数据|{href}')
- dedup_count += 1
- continue
- title = item['popTitle'] if 'popTitle' in item else item['showTitle']
- publishtime = item['updateTime']
- l_np_publishtime = datetime.strptime(publishtime, '%Y-%m-%d').timestamp()
- addr = str(item['areaName']).split('-')
- area = addr[0] if len(addr) > 0 else ''
- city = addr[1] if len(addr) > 1 else ''
- if '国土' in item.get('progName', ''):
- channel = item['progName']
- else:
- channel = (item['noticeSegmentTypeName'] or item['progName'])
- data = {
- 'site': '千里马',
- 'channel': channel,
- 'spidercode': 'sdxzbiddingsjzypc', # 人工补录专用
- 'area': area,
- 'city': city,
- 'district': '',
- 'href': href,
- 'title': title,
- 'publishtime': publishtime,
- 'l_np_publishtime': Int64(l_np_publishtime),
- 'comeintime': Int64(int(time.time())),
- 'isdownload': False, # 是否下载
- 'isfailed': False, # 是否失败
- 'keywords': ",".join(item['hiLightKey']), # 查询关键词
- }
- lst_data.append(data)
- if len(lst_data) == 50:
- coll.insert_many(lst_data, ordered=False)
- count += len(lst_data)
- lst_data = []
- href_lst.append(href)
- if len(lst_data) > 0:
- coll.insert_many(lst_data, ordered=False)
- count += len(lst_data)
- # 添加过滤器
- for href in href_lst:
- bf.add(href)
- return True, lst, count, dedup_count
- def auto_page_turning(coll, bf, keywords, page_size=40, **kwargs):
- page = 1
- page_size = 100 if page_size > 100 else page_size
- while True:
- ret, *args = fetch_list(coll, bf, keywords, page, page_size, **kwargs)
- if ret is False:
- return False
- ret_lst, count, dedup_count = args
- logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条')
- if len(ret_lst) < page_size:
- break
- else:
- page += 1
- return True
- def get_tasks(collection, max_limit=100):
- now = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- start_ts = int(now.timestamp())
- second_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
- end_ts = int(second_day.timestamp())
- q = {'comeintime': {'$gte': start_ts, '$lt': end_ts}}
- with collection.find(q) as cursor:
- rets = [item for item in cursor]
- rets = rets[:max_limit] if len(rets) > max_limit else rets # 限制每天采集最大条数
- rets = list(filter(lambda x: x['isdownload'] is False, rets)) # 仅保留未下载数据
- return rets
- def fetch_detail(collection, tasks, username):
- results = {'success': [], 'failed': []}
- insert_lst = []
- for task in tasks:
- href = task['href']
- ret = net.download_json(href, referer=False)
- if ret is False:
- logger.error(f'账号失效|{username}')
- return False, results
- if not ret:
- results['failed'].append(task['_id'])
- else:
- data = {
- 'site': task['site'],
- 'channel': task['channel'],
- 'spidercode': task['spidercode'],
- 'area': task['area'],
- 'city': task['city'],
- 'district': task['district'],
- 'href': '#',
- 'competehref': href,
- 'title': task['title'],
- 's_title': task['title'],
- 'contenthtml': ret['content'],
- 'detail': cleaner(ret['content']),
- 'publishtime': task['publishtime'],
- 'l_np_publishtime': task['l_np_publishtime'],
- 'comeintime': Int64(int(time.time())),
- 'T': 'bidding',
- 'infoformat': 1,
- 'sendflag': 'false',
- 'iscompete': True,
- '_d': 'comeintime',
- 'publishdept': '',
- 'type': '',
- 'is_mixed': True
- }
- insert_lst.append(data)
- results['success'].append(task['_id'])
- if len(insert_lst) == 50:
- collection.insert_many(insert_lst, ordered=False)
- insert_lst = []
- # delay(45, 540)
- time.sleep(.5)
- if len(insert_lst) > 0:
- collection.insert_many(insert_lst, ordered=False)
- return True, results
- def download(lst_coll, detail_coll, bf, keyword, username, **kwargs):
- # partial 绑定方法
- bulk_update = partial(bulk_update_task_status, lst_coll)
- '''数据采集'''
- ret = auto_page_turning(lst_coll, bf, keyword, **kwargs) # 列表页下载
- if ret is False:
- return False
- tasks = get_tasks(lst_coll) # 领取采集任务
- state, detail_ret = fetch_detail(detail_coll, tasks, username) # 详情页下载
- # 批量更新任务状态
- success_ids = detail_ret['success']
- if bulk_update(success_ids, {'isdownload': True}):
- logger.info(f'批量更新|任务状态|成功|数量{len(success_ids)}条')
- # 批量更新失败任务
- failed_ids = detail_ret['failed']
- if bulk_update(failed_ids, {'isdownload': True, 'isfailed': True}):
- logger.info(f'批量更新|任务状态|失败|数量{len(failed_ids)}条]')
- if state is False:
- return False
- def spider(username, keyword, begin_time=None, end_time=None, **kwargs):
- logger.info('+++ 开始采集 +++')
- if 'begin_time' not in kwargs:
- kwargs['begin_time'] = begin_time
- if 'end_time' not in kwargs:
- kwargs['end_time'] = end_time
- # 创建MongoDB连接
- # to_mgo = MongoClient('192.168.3.182', 27080)
- to_mgo = MongoClient('172.17.4.87', 27080)
- lst_coll = to_mgo['py_spider']['zyjc_qlm_list']
- detail_coll = to_mgo['py_spider']['data_bak']
- # 创建布隆过滤器
- f, bf = launch_filter()
- try:
- ret = download(lst_coll, detail_coll, bf, keyword, username, **kwargs)
- if ret is False:
- # TODO 或许该退出采集,防止账号被封禁
- return False
- except requests.exceptions.ConnectionError:
- send_wechat_warning(f'浙移集成|访问失败|代理访问超时|{net.get_proxies("https")}')
- return
- except (Exception, BaseException) as e:
- logger.exception(f'爬虫异常,原因:{e}')
- return False
- finally:
- logger.debug('保存过滤器')
- bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
- logger.info('+++ 采集结束 +++')
- def is_workhour(start, end):
- """
- 是工作时间?
- :param int start: 开始时间
- :param int end: 结束时间
- :return:
- """
- if end < start:
- raise ValueError('end must be greater than start')
- if start <= datetime.now().hour <= end:
- return True
- return False
- def main():
- username, _ = account_pool.pop(0)
- setup_login_info(username)
- keyword = '甄选公告'
- is_weekday = True
- skip = False
- try:
- while True:
- if skip:
- net.send_wechat_warning('浙移集成|数据采集|异常停止')
- else:
- now = datetime.now()
- try:
- # 每周日17点 执行一次
- if now.weekday() == 6 and is_workhour(17, 17):
- if not is_weekday:
- is_weekday = True
- sat = (now - timedelta(1)).strftime('%Y-%m-%d')
- sun = now.strftime('%Y-%m-%d')
- ret = spider(
- username=username,
- begin_time=sat,
- end_time=sun,
- keyword=keyword,
- page_size=40,
- )
- if ret is False:
- skip = True
- # 工作日9点 - 18点
- if 0 <= now.weekday() <= 4 and is_workhour(9, 18):
- is_weekday = False # 重置周末采集标识
- start = now.strftime('%Y-%m-%d')
- ret = spider(
- username=username,
- begin_time=start,
- end_time=start,
- keyword=keyword,
- page_size=40,
- )
- if ret is False:
- skip = True # 账号异常
- except Exception as e:
- logger.exception(e)
- delay(4200, 7200, "[浙移集成]下轮采集") # 间隔 70分钟 - 120分钟执行1次
- except KeyboardInterrupt:
- pass
- finally:
- logger.info('任务结束')
- if __name__ == '__main__':
- main()
|