Browse Source

'zyjc'

浙移集成数据采集
dongzhaorui 8 months ago
parent
commit
6e1a9ba3e9
4 changed files with 425 additions and 0 deletions
  1. 1 0
      zyjc/README.md
  2. 14 0
      zyjc/log.py
  3. 404 0
      zyjc/main.py
  4. 6 0
      zyjc/requirements.txt

+ 1 - 0
zyjc/README.md

@@ -0,0 +1 @@
+# 浙江移动信息系统集成有限公司

+ 14 - 0
zyjc/log.py

@@ -0,0 +1,14 @@
+from pathlib import Path
+
+from loguru import logger
+
+_absolute = Path(__file__).absolute().parent.parent
+_log_path = (_absolute / 'logs/log_{time:YYYYMMDD}.log').resolve()
+logger.add(
+    _log_path,
+    format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {thread.name} - {name}:{function}:{line} - {message}',
+    level='INFO',
+    rotation='00:00',
+    retention='1 week',
+    encoding='utf-8',
+)

+ 404 - 0
zyjc/main.py

@@ -0,0 +1,404 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-10-17 
+---------
+@summary:  
+---------
+
+"""
+
+import json
+import math
+import random
+import time
+from datetime import datetime, timedelta
+from functools import partial
+from pathlib import Path
+
+import bson
+import requests
+from pybloom_live import BloomFilter
+from pymongo import MongoClient
+from pymongo.operations import UpdateOne
+
+from rgg import net
+from rgg.account import account_pool
+from rgg.clean_html import cleaner
+from rgg.log import logger
+from rgg.net import send_wechat_warning
+
+Int64 = bson.Int64
+
+
+def get_random(min_value=None, max_value=None):
+    # 定义目标范围
+    min_value = min_value or 60
+    max_value = max_value or 600
+
+    # 计算对数值的范围
+    log_min = math.log(min_value, 10)
+    log_max = math.log(max_value, 10)
+
+    # 生成 log_min 和 log_max 之间的随机数
+    random_log = random.uniform(log_min, log_max)
+
+    # 将随机对数值转换回原始范围内的数
+    result = 10 ** random_log
+
+    # print(f"生成的随机数 ({min_value}-{max_value}): {result:.2f}")
+    return float(f'{result:.1f}')
+
+
+def delay(a, b, msg=None):
+    secs = get_random(a, b)
+    logger.info(f'{msg if msg else "采集"}延时{secs}秒')
+    time.sleep(secs)
+
+
+def bulk_update_task_status(collection, id_lst, update):
+    """
+    批量更新任务状态
+
+    :param pymongo.collection.Collection collection:
+    :param id_lst:
+    :param dict update:更新条件
+    :return:
+    """
+    count = 0
+    update_lst = []
+    for id_ in id_lst:
+        update['updatetime'] = Int64(int(time.time()))  # 更新任务时间
+        update_lst.append(UpdateOne({'_id': id_}, {'$set': update}))
+        if len(update_lst) == 50:
+            results = collection.bulk_write(update_lst, ordered=False)
+            update_lst = []
+            count += results.modified_count
+
+    if len(update_lst) > 0:
+        results = collection.bulk_write(update_lst, ordered=False)
+        count += results.modified_count
+
+    return count
+
+
+def setup_login_info(username):
+    logger.debug('加载登录身份信息...')
+    file = (Path(__file__).parent / f'rgg/account/{username}.json').absolute()
+    with open(file, encoding='utf-8') as rp:
+        json_data = json.load(rp)
+        net.set_cookies(ck=json_data['cookies'])
+        net.set_headers(h=json_data['headers'])
+        net.set_proxies(p=json_data['proxies'])
+
+
+def launch_filter():
+    """创建布隆过滤器"""
+    logger.debug('创建布隆过滤器...')
+    backup = (Path(__file__).parent / 'rgg/backup')
+    if not backup.exists():
+        backup.mkdir(exist_ok=True)
+
+    file = (backup / 'bloomfilter.f')
+    if not file.exists():
+        file.touch()  # 初始创建存储文件
+        bf = BloomFilter(capacity=1000000, error_rate=0.001)  # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
+    else:
+        if file.stat().st_size == 0:
+            bf = BloomFilter(capacity=1000000, error_rate=0.001)
+        else:
+            bf = BloomFilter.fromfile(file.open('rb'))
+
+    return file, bf
+
+
+def fetch_list(coll, bf, keyword, page, page_size, begin_time=None, end_time=None):
+    count = 0
+    dedup_count = 0
+
+    href_lst = []
+    params = dict(begin_time=begin_time, end_time=end_time)
+    lst = net.download_list(keyword, page, page_size, **params)
+    if lst is None:
+        return False, lst, count, dedup_count
+
+    lst_data = []
+    for item in lst:
+        href = item.get('url')
+        if href is None:
+            # print(f'问题数据|{item}')
+            continue
+
+        if href in bf:
+            logger.debug(f'重复数据|{href}')
+            dedup_count += 1
+            continue
+
+        title = item['popTitle'] if 'popTitle' in item else item['showTitle']
+        publishtime = item['updateTime']
+        l_np_publishtime = datetime.strptime(publishtime, '%Y-%m-%d').timestamp()
+
+        addr = str(item['areaName']).split('-')
+        area = addr[0] if len(addr) > 0 else ''
+        city = addr[1] if len(addr) > 1 else ''
+        if '国土' in item.get('progName', ''):
+            channel = item['progName']
+        else:
+            channel = (item['noticeSegmentTypeName'] or item['progName'])
+
+        data = {
+            'site': '千里马',
+            'channel': channel,
+            'spidercode': 'sdxzbiddingsjzypc',  # 人工补录专用
+            'area': area,
+            'city': city,
+            'district': '',
+            'href': href,
+            'title': title,
+            'publishtime': publishtime,
+            'l_np_publishtime': Int64(l_np_publishtime),
+            'comeintime': Int64(int(time.time())),
+            'isdownload': False,  # 是否下载
+            'isfailed': False,  # 是否失败
+            'keywords': ",".join(item['hiLightKey']),  # 查询关键词
+        }
+        lst_data.append(data)
+        if len(lst_data) == 50:
+            coll.insert_many(lst_data, ordered=False)
+            count += len(lst_data)
+            lst_data = []
+
+        href_lst.append(href)
+
+    if len(lst_data) > 0:
+        coll.insert_many(lst_data, ordered=False)
+        count += len(lst_data)
+
+    # 添加过滤器
+    for href in href_lst:
+        bf.add(href)
+
+    return True, lst, count, dedup_count
+
+
+def auto_page_turning(coll, bf, keywords, page_size=40, **kwargs):
+    page = 1
+    page_size = 100 if page_size > 100 else page_size
+    while True:
+        ret, *args = fetch_list(coll, bf, keywords, page, page_size, **kwargs)
+        if ret is False:
+            return False
+
+        ret_lst, count, dedup_count = args
+        logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条')
+        if len(ret_lst) < page_size:
+            break
+        else:
+            page += 1
+
+    return True
+
+
+def get_tasks(collection, max_limit=100):
+    now = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+    start_ts = int(now.timestamp())
+    second_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+    end_ts = int(second_day.timestamp())
+
+    q = {'comeintime': {'$gte': start_ts, '$lt': end_ts}}
+    with collection.find(q) as cursor:
+        rets = [item for item in cursor]
+        rets = rets[:max_limit] if len(rets) > max_limit else rets  # 限制每天采集最大条数
+        rets = list(filter(lambda x: x['isdownload'] is False, rets))  # 仅保留未下载数据
+    return rets
+
+
+def fetch_detail(collection, tasks, username):
+    results = {'success': [], 'failed': []}
+
+    insert_lst = []
+    for task in tasks:
+        href = task['href']
+        ret = net.download_json(href, referer=False)
+        if ret is False:
+            logger.error(f'账号失效|{username}')
+            return False, results
+
+        if not ret:
+            results['failed'].append(task['_id'])
+        else:
+            data = {
+                'site': task['site'],
+                'channel': task['channel'],
+                'spidercode': task['spidercode'],
+                'area': task['area'],
+                'city': task['city'],
+                'district': task['district'],
+                'href': '#',
+                'competehref': href,
+                'title': task['title'],
+                's_title': task['title'],
+                'contenthtml': ret['content'],
+                'detail': cleaner(ret['content']),
+                'publishtime': task['publishtime'],
+                'l_np_publishtime': task['l_np_publishtime'],
+                'comeintime': Int64(int(time.time())),
+                'T': 'bidding',
+                'infoformat': 1,
+                'sendflag': 'false',
+                'iscompete': True,
+                '_d': 'comeintime',
+                'publishdept': '',
+                'type': '',
+                'is_mixed': True
+            }
+            insert_lst.append(data)
+            results['success'].append(task['_id'])
+            if len(insert_lst) == 50:
+                collection.insert_many(insert_lst, ordered=False)
+                insert_lst = []
+
+        # delay(45, 540)
+        time.sleep(.5)
+
+    if len(insert_lst) > 0:
+        collection.insert_many(insert_lst, ordered=False)
+
+    return True, results
+
+
+def download(lst_coll, detail_coll, bf, keyword, username, **kwargs):
+    # partial 绑定方法
+    bulk_update = partial(bulk_update_task_status, lst_coll)
+
+    '''数据采集'''
+    ret = auto_page_turning(lst_coll, bf, keyword, **kwargs)  # 列表页下载
+    if ret is False:
+        return False
+
+    tasks = get_tasks(lst_coll)  # 领取采集任务
+    state, detail_ret = fetch_detail(detail_coll, tasks, username)   # 详情页下载
+
+    # 批量更新任务状态
+    success_ids = detail_ret['success']
+    if bulk_update(success_ids, {'isdownload': True}):
+        logger.info(f'批量更新|任务状态|成功|数量{len(success_ids)}条')
+
+    # 批量更新失败任务
+    failed_ids = detail_ret['failed']
+    if bulk_update(failed_ids, {'isdownload': True, 'isfailed': True}):
+        logger.info(f'批量更新|任务状态|失败|数量{len(failed_ids)}条]')
+
+    if state is False:
+        return False
+
+
+def spider(username, keyword, begin_time=None, end_time=None, **kwargs):
+    logger.info('+++ 开始采集 +++')
+    if 'begin_time' not in kwargs:
+        kwargs['begin_time'] = begin_time
+    if 'end_time' not in kwargs:
+        kwargs['end_time'] = end_time
+
+    # 创建MongoDB连接
+    # to_mgo = MongoClient('192.168.3.182', 27080)
+    to_mgo = MongoClient('172.17.4.87', 27080)
+    lst_coll = to_mgo['py_spider']['zyjc_qlm_list']
+    detail_coll = to_mgo['py_spider']['data_bak']
+
+    # 创建布隆过滤器
+    f, bf = launch_filter()
+
+    try:
+        ret = download(lst_coll, detail_coll, bf, keyword, username, **kwargs)
+        if ret is False:
+            # TODO 或许该退出采集,防止账号被封禁
+            return False
+
+    except requests.exceptions.ConnectionError:
+        send_wechat_warning(f'浙移集成|访问失败|代理访问超时|{net.get_proxies("https")}')
+        return
+
+    except (Exception, BaseException) as e:
+        logger.exception(f'爬虫异常,原因:{e}')
+        return False
+
+    finally:
+        logger.debug('保存过滤器')
+        bf.tofile(f.open('wb'))  # 保存布隆过滤器到本地
+        logger.info('+++ 采集结束 +++')
+
+
+def is_workhour(start, end):
+    """
+    是工作时间?
+    :param int start: 开始时间
+    :param int end: 结束时间
+    :return:
+    """
+
+    if end < start:
+        raise ValueError('end must be greater than start')
+
+    if start <= datetime.now().hour <= end:
+        return True
+    return False
+
+
+def main():
+    username, _ = account_pool.pop(0)
+    setup_login_info(username)
+
+    keyword = '甄选公告'
+    is_weekday = True
+    skip = False
+    try:
+        while True:
+            if skip:
+                net.send_wechat_warning('浙移集成|数据采集|异常停止')
+            else:
+                now = datetime.now()
+                try:
+                    # 每周日17点 执行一次
+                    if now.weekday() == 6 and is_workhour(17, 17):
+                        if not is_weekday:
+                            is_weekday = True
+                            sat = (now - timedelta(1)).strftime('%Y-%m-%d')
+                            sun = now.strftime('%Y-%m-%d')
+                            ret = spider(
+                                username=username,
+                                begin_time=sat,
+                                end_time=sun,
+                                keyword=keyword,
+                                page_size=40,
+                            )
+                            if ret is False:
+                                skip = True
+
+                    # 工作日9点 - 18点
+                    if 0 <= now.weekday() <= 4 and is_workhour(9, 18):
+                        is_weekday = False  # 重置周末采集标识
+                        start = now.strftime('%Y-%m-%d')
+                        ret = spider(
+                            username=username,
+                            begin_time=start,
+                            end_time=start,
+                            keyword=keyword,
+                            page_size=40,
+                        )
+                        if ret is False:
+                            skip = True  # 账号异常
+
+                except Exception as e:
+                    logger.exception(e)
+
+            delay(4200, 7200, "[浙移集成]下轮采集")  # 间隔 70分钟 - 120分钟执行1次
+
+    except KeyboardInterrupt:
+        pass
+
+    finally:
+        logger.info('任务结束')
+
+
+if __name__ == '__main__':
+    main()

+ 6 - 0
zyjc/requirements.txt

@@ -0,0 +1,6 @@
+DrissionPage==4.0.5.6
+loguru==0.5.3
+pymongo==3.12.0
+pybloom_live==4.0.0
+PySocks==1.7.1
+PyExecJS==1.5.1