3
0
Эх сурвалжийг харах

删除数据处理-redis版

dongzhaorui 1 жил өмнө
parent
commit
344550a4a8

+ 0 - 8
A数据处理/sync_data/README.md

@@ -1,8 +0,0 @@
-#### 数据服务
-    脚本由crontab服务进行定时运行
-
-#### 脚本用途
-    1、backfill_task.py -> 错误任务回填采集队列
-    2、monitor.py -> 爬虫运行监控
-    3、send_data.py -> feapder爬虫采集数据同步爬虫库(爬虫将数据推送RedisDB存放,同步服务从RedisDB间隔5分钟推送至爬虫MongoDB生产库)
-    4、summary.py -> 爬虫心跳数据汇总统计

+ 0 - 70
A数据处理/sync_data/backfill_task.py

@@ -1,70 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2023-04-07
----------
-@summary:  采集任务回填
----------
-@author: dzr
-"""
-import time
-
-from bson.int64 import Int64
-from pymongo import MongoClient
-from pymongo.errors import DuplicateKeyError
-
-from log import logger
-
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
-client = MongoClient(MONGO_HOST, MONGO_PORT)
-
-
-MONGO_DB = "py_spider"
-mongodb = client[MONGO_DB]
-
-# 爬虫数据表
-listdata_err = mongodb["listdata_err"]
-
-
-def backfill_data():
-    """回填重新采集的任务,并给出错误重试次数"""
-    count = 0  # 计数器
-    # 查询5天内范围内的错误重采次数小于20的错误任务
-    start_at = int(time.time()) - 432000
-    end_at = int(time.time())
-    q = {
-        "failed_times": {"$lt": 20},
-        "create_at": {"$gte": start_at, "$lt": end_at}
-    }
-    cursor = listdata_err.find(q, no_cursor_timeout=True)
-    try:
-        for doc in cursor:
-            count += 1
-            if count % 100 == 0:
-                logger.info(f"[BackFill]放入采集队列{count}条任务")
-
-            coll_name = doc["origin"]  # 数据来源的表名
-            document = doc["origin_data"]  # 原始数据
-            document["failed_times"] = doc["failed_times"]
-            # 清除错误记录字段
-            for field in ["save", "pri", "failed", "error", "code"]:
-                document.pop(field, "")
-
-            if "comeintime" in document:
-                document["comeintime"] = Int64(time.time())
-
-            try:
-                mongodb[coll_name].insert_one(document)
-                listdata_err.delete_one({"_id": doc["_id"]})
-            except DuplicateKeyError as e:
-                logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
-                listdata_err.delete_one({"_id": doc["_id"]})
-
-        logger.info(f"[BackFill]共计放入采集队列{count}条任务")
-    finally:
-        client.close()
-        logger.info("[BackFill]采集任务回填结束")
-
-
-if __name__ == '__main__':
-    backfill_data()

+ 0 - 14
A数据处理/sync_data/log.py

@@ -1,14 +0,0 @@
-from pathlib import Path
-
-from loguru import logger
-
-_absolute = Path(__file__).absolute().parent
-_log_path = (_absolute / 'logs/log_{time:YYYY-MM-DD}.log').resolve()
-logger.add(
-    _log_path,
-    format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',
-    level='INFO',
-    rotation='00:00',
-    retention='1 week',
-    encoding='utf-8',
-)

+ 0 - 446
A数据处理/sync_data/monitor.py

@@ -1,446 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2023-04-05
----------
-@summary:  爬虫运行监控(feapder)和日常采集统计
----------
-@author: Dzr
-"""
-import hashlib
-from datetime import datetime, timedelta
-
-from bson.int64 import Int64
-from bson.son import SON
-from pymongo import MongoClient
-
-from log import logger
-
-# mongo
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
-client = MongoClient(MONGO_HOST, MONGO_PORT)
-
-MONGO_DB1 = "py_spider"
-MONGO_DB2 = "editor"
-
-mongodb1 = client[MONGO_DB1]
-mongodb2 = client[MONGO_DB2]
-
-# 爬虫数据生产表
-data_bak = mongodb1["data_bak"]
-# 爬虫心跳表
-spider_heartbeat = mongodb1["pyspider_heartbeat"]
-# 日采集详情汇总表
-spider_monitor = mongodb1["spider_monitor"]
-# 采集任务表
-spider_lua_config = mongodb2["luaconfig"]
-
-# 特殊网站
-special_sites = ["云南省政府采购网", "湖南省政府采购电子卖场"]
-
-
-def get_md5(*args, **kwargs):
-    """
-    @summary: 获取唯一的32位md5
-    ---------
-    @param args: 参与联合去重的值数组
-    @param kwargs: 参与联合去重的值字典
-    ---------
-    @result: 7c8684bcbdfcea6697650aa53d7b1405
-    """
-    if len(args) != 1:
-        conditions = ["site", "channel", "spidercode"]
-        data_lst = list(filter(lambda x: x is not None, args))
-        for k, v in kwargs.items():
-            if k in conditions and (v and v not in data_lst):
-                data_lst.append(v)
-
-        if not data_lst or len(data_lst) != 3:
-            logger.error(f"组合条件缺失:{conditions},当前内容:{data_lst}")
-            return None
-
-        data_lst = sorted(data_lst)
-        content = "_".join(data_lst)
-    else:
-        content = args[0]
-    return hashlib.md5(str(content).encode()).hexdigest()
-
-
-def get_runtime(datestr=None):
-    if datestr is None:
-        today = datetime.now().date()
-        yesterday = today + timedelta(days=-1)
-        datestr = yesterday.strftime("%Y-%m-%d")
-    return datestr
-
-
-def save(documents, collection, ordered=False):
-    """保存数据"""
-    is_list = isinstance(documents, list)
-    documents = documents if is_list else [documents]
-
-    data_lst = []
-    for items in documents:
-        items.pop("_id", None)
-        items.pop("business_type", None)
-        items["comeintime"] = Int64(datetime.now().timestamp())
-        data_lst.append(items)
-        if len(data_lst) == 100:
-            ret = collection.insert_many(data_lst, ordered)
-            logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
-            data_lst = []
-
-    # 提交剩余数据
-    if data_lst:
-        collection.insert_many(data_lst, ordered)
-
-    logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
-
-
-def get_spider_lst():
-    """爬虫基础信息"""
-    crawler_lst = []
-    q = {"platform": "python", "state": 11}
-    projection = {
-        "_id": 0, "site": 1, "channel": 1,
-        "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1
-    }
-    cursor = spider_lua_config.find(q, projection=projection)
-    try:
-        for doc in cursor:
-            crawler_lst.append({
-                "site": doc["site"],
-                "channel": doc["channel"],
-                "code": doc["code"],
-                "modifyid": doc["modifyuserid"],
-                "modifyuser": doc["modifyuser"],
-                "event": doc["event"]
-            })
-    finally:
-        client.close()
-        logger.info(f"爬虫监控 - 已上线 {len(crawler_lst)} 个爬虫")
-        yield from crawler_lst
-
-
-def aggregate_query(collection, pipeline, is_print_error=False):
-    """
-    聚合查询
-
-    @param collection: MongoDB集合
-    @param pipeline: 聚合查询条件
-    @param is_print_error: 是否在console打印错误日志
-    @return: 聚合结果
-    """
-    results = []
-    cursor = collection.aggregate(pipeline, allowDiskUse=True)
-    try:
-        for doc in cursor:
-            results.append(doc)
-    except Exception as e:
-        if is_print_error:
-            logger.exception(e)
-    finally:
-        client.close()
-        yield from results
-
-
-def aggregate_query_count(runtime):
-    """
-    feapder爬虫列表和详情采集数据量统计聚合查询结果统计
-
-    @param runtime: 运行时间
-    @return:
-    """
-    spider_dict = {}  # spidercode 与 site 排查记录
-    aggregate_items = {}
-    pipeline = [
-        {"$match": {"runtime": runtime}},
-        {
-            "$group": {
-                "_id": "$batch_no",
-                "rel_count": {"$sum": "$rel_count"},  # 入库量(去重)
-                "count": {"$sum": "$count"},  # 下载量
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-                "business_type": {"$first": "$business_type"},
-            }
-        },
-        {"$sort": SON([("rel_count", -1)])}
-    ]
-    results = aggregate_query(spider_heartbeat, pipeline)
-    for items in results:
-        site = items["site"]
-        channel = items["channel"]
-        spidercode = items["spidercode"]
-        business_type = items["business_type"]
-        business_type = "list" if business_type.endswith("List") else "detail"
-
-        hash_key = get_md5(**items)  # 爬虫查询标识
-        if not hash_key:
-            logger.error(f"异常批次号 {items['_id']}")
-            continue
-
-        # 通过爬虫业务类型 分拣数据
-        data_dict = {
-            "site": site,
-            "channel": channel,
-            "spidercode": spidercode,
-            "runtime": runtime,
-            "spidercode_at_site_num": 0,  # 爬虫代码与站点对应的关系数量
-            "frame": "feapder",  # 采集框架
-            business_type: {
-                "batch_no": items["_id"],
-                "count": items["count"],
-                "rel_count": items["rel_count"],
-                "runtimes": 1
-            }
-        }
-        if not aggregate_items.get(hash_key):
-            aggregate_items.setdefault(hash_key, data_dict)
-        else:
-            aggregate_items[hash_key][business_type] = data_dict[business_type]
-
-        # 监控爬虫任务,当 spidercode_at_site_num > 1
-        # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员
-        if site not in special_sites:
-            spider = aggregate_items[hash_key]
-            if spidercode not in spider_dict:
-                spider["spidercode_at_site_num"] = 1
-                values = {"keys": [hash_key], "sites": [site]}
-                spider_dict.setdefault(spidercode, values)
-            else:
-                # spidercode 相同,但 site 不同的爬虫进行计数+1
-                sites = spider_dict[spidercode]["sites"]
-                if site not in sites:
-                    keys = spider_dict[spidercode]["keys"]
-                    for key in keys:
-                        # 更新相同 spidercode 的 spidercode_at_site_num
-                        aggregate_items[key]["spidercode_at_site_num"] += 1
-
-                    keys.append(hash_key)  # 记录新爬虫
-                    sites.append(site)  # 添加新站点
-                    spider["spidercode_at_site_num"] = len(sites)
-        else:
-            aggregate_items[hash_key]["spidercode_at_site_num"] = 1
-
-    return aggregate_items
-
-
-def aggregate_query_crawl_list(runtime):
-    """feapder列表爬虫采集聚合查询结果统计"""
-    aggregate_items = {}
-    pipeline = [
-        {
-            "$match": {
-                "runtime": runtime,
-                "business_type": {"$regex": "List"},
-                "status_code": {"$ne": -1}
-            }
-        },
-        {
-            "$group": {
-                "_id": "$batch_no",
-                "count": {"$sum": "$count"},
-                "rel_count": {"$sum": "$rel_count"},
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-            }
-        }
-    ]
-    results = aggregate_query(spider_heartbeat, pipeline)
-    for items in results:
-        hask_key = get_md5(**items)
-        if not aggregate_items.get(hask_key):
-            values = {"list_allintimes": 0, "list_nodatatimes": 0}
-            aggregate_items.setdefault(hask_key, values)
-
-        if all([
-            items["count"] > 0,
-            items["rel_count"] > 0,
-            items["count"] == items["rel_count"]
-        ]):
-            aggregate_items[hask_key]["list_allintimes"] += 1
-
-        if items["count"] == 0:
-            aggregate_items[hask_key]["list_nodatatimes"] += 1
-
-    return aggregate_items
-
-
-def aggregate_count_crawlab_update_runtimes(runtime):
-    """feapder爬虫采集聚合查询crawlab平台运行信息"""
-    pipeline = [
-        {
-            "$project": {
-                "_id": 0,
-                "site": 1,
-                "channel": 1,
-                "batch_no": 1,
-                "spidercode": 1,
-                "business_type": 1,
-                "runtime": 1,
-                "node_ip": 1,
-                "crawlab_taskid": 1,
-                "create_at": 1,
-            }
-        },
-        {"$match": {"runtime": runtime}},
-        {
-            "$group": {
-                "_id": "$batch_no",
-                "business_type": {"$first": "$business_type"},
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-                "crawlab_item": {
-                    "$addToSet": {
-                        "node_ip": "$node_ip",
-                        "crawlab_taskid": "$crawlab_taskid",
-                    },
-                }
-            }
-        }
-    ]
-    results = aggregate_query(spider_heartbeat, pipeline)
-    for items in results:
-        runtimes = len(items["crawlab_item"])  # 采集任务运行次数
-
-        # 通过爬虫业务类型 分拣数据
-        business_type = items["business_type"]
-        business_type = "list" if business_type.endswith("List") else "detail"
-
-        hash_key = get_md5(**items)  # 爬虫查询标识
-        # 更新聚合统计任务运行次数
-        aggregate_count_items[hash_key][business_type]["runtimes"] = runtimes
-
-
-_runtime = get_runtime()  # 统计时间
-aggregate_count_items = aggregate_query_count(_runtime)
-aggregate_crawl_list_items = aggregate_query_crawl_list(_runtime)
-aggregate_count_crawlab_update_runtimes(_runtime)
-
-
-def get_list_isgetdata(hash_key, default=0):
-    """列表页是否采集数据"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result:
-        default = query_result["list"]["count"]
-    return True if default > 0 else False
-
-
-def get_list_allintimes(hash_key, default=0):
-    """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)"""
-    if aggregate_crawl_list_items.get(hash_key):
-        default = aggregate_crawl_list_items[hash_key]["list_allintimes"]
-    return default
-
-
-def get_list_runtimes(hash_key, default=0):
-    """列表采集运行频次"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result and "list" in query_result:
-        default = aggregate_count_items[hash_key]["list"]["runtimes"]
-    return default
-
-
-def get_list_count(hash_key, default=0):
-    """列表采集总数"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result and "list" in query_result:
-        default = aggregate_count_items[hash_key]["list"]["count"]
-    return default
-
-
-def get_list_rel_count(hash_key, default=0):
-    """列表实际入库总数"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result and "list" in query_result:
-        default = aggregate_count_items[hash_key]["list"]["rel_count"]
-    return default
-
-
-def get_list_nodatatimes(hash_key, default=-1):
-    """列表页采集无数据次数(过滤后)"""
-    if aggregate_crawl_list_items.get(hash_key):
-        default = aggregate_crawl_list_items[hash_key]["list_nodatatimes"]
-    return default
-
-
-def get_detail_downloadnum(hash_key, default=0):
-    """详情页下载量"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result and "detail" in query_result:
-        default = aggregate_count_items[hash_key]["detail"]["count"]
-    return default
-
-
-get_detail_count = get_detail_downloadnum
-
-
-def get_detail_downloadsuccessnum(hash_key, default=0):
-    """详情页下载成功量"""
-    query_result = aggregate_count_items.get(hash_key)
-    if query_result and "detail" in query_result:
-        default = aggregate_count_items[hash_key]["detail"]["rel_count"]
-    return default
-
-
-get_detail_rel_count = get_detail_downloadsuccessnum
-
-
-def get_detail_downloadfailnum(**kwargs):
-    """详情页下载失败量"""
-    count = -1
-    if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0:
-        count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"]
-    return count
-
-
-def start_monitor():
-    summary_queue = []
-    spider_lst = get_spider_lst()
-    for spider in spider_lst:
-        site = spider["site"]
-        channel = spider["channel"]
-        spidercode = spider["code"]
-
-        join_data = {**spider, "is_valid": False}  # 创建爬虫基础数据
-        hash_key = get_md5(site, channel, spidercode)
-        query_result = aggregate_count_items.get(hash_key)
-        if query_result:
-            # 聚合查询 - 统计数据采集信息
-            result = query_result
-            join_data["frame"] = result["frame"]  # 采集框架名
-            join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
-
-        # 聚合统计 - 列表采集数据
-        join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
-        join_data["list_allintimes"] = get_list_allintimes(hash_key)
-        join_data["list_runtimes"] = get_list_runtimes(hash_key)
-        join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key)
-        join_data["list_count"] = get_list_count(hash_key)
-        join_data["list_rel_count"] = get_list_rel_count(hash_key)
-
-        # 聚合统计 - 详情采集数据
-        join_data["detail_count"] = get_detail_count(hash_key)
-        join_data["detail_rel_count"] = get_detail_rel_count(hash_key)
-        join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
-        join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
-        join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
-
-        # 日统计采集数据是否有效
-        frame = join_data.get("frame")
-        if frame and frame == "feapder":
-            join_data["is_valid"] = True
-
-        summary_queue.append(join_data)
-        logger.info(f"{site} {channel} {spidercode} --统计完成")
-    # 上传数据库
-    save(summary_queue, spider_monitor)
-    logger.info("爬虫监控 - 日统计完成")
-
-
-if __name__ == '__main__':
-    start_monitor()

+ 0 - 322
A数据处理/sync_data/send_data.py

@@ -1,322 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2023-02-21
----------
-@summary:  数据上传(redis到mongo),采集数据同步服务
----------
-@author: dzr
-"""
-
-import ast
-import re
-import time
-from concurrent.futures import ThreadPoolExecutor, wait
-from typing import Dict
-
-import redis
-from bson import int64
-from elasticsearch import Elasticsearch
-from func_timeout import func_set_timeout
-from func_timeout.exceptions import FunctionTimedOut
-from pymongo import MongoClient
-from redis._compat import unicode, long, basestring
-from redis.connection import Encoder as RedisEncoder
-from redis.exceptions import DataError
-
-from log import logger
-
-
-class Encoder(RedisEncoder):
-
-    def encode(self, value):
-        "Return a bytestring or bytes-like representation of the value"
-        if isinstance(value, (bytes, memoryview)):
-            return value
-        # elif isinstance(value, bool):
-        #     # special case bool since it is a subclass of int
-        #     raise DataError(
-        #         "Invalid input of type: 'bool'. Convert to a "
-        #         "bytes, string, int or float first."
-        #     )
-        elif isinstance(value, float):
-            value = repr(value).encode()
-        elif isinstance(value, (int, long)):
-            # python 2 repr() on longs is '123L', so use str() instead
-            value = str(value).encode()
-        elif isinstance(value, (list, dict, tuple)):
-            value = unicode(value)
-        elif not isinstance(value, basestring):
-            # a value we don't know how to deal with. throw an error
-            typename = type(value).__name__
-            raise DataError(
-                "Invalid input of type: '%s'. Convert to a "
-                "bytes, string, int or float first." % typename
-            )
-        if isinstance(value, unicode):
-            value = value.encode(self.encoding, self.encoding_errors)
-        return value
-
-
-# redis
-redis.connection.Encoder = Encoder
-REDIS_HOST = "172.17.4.232"
-REDIS_PORT = 7361
-REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
-REDIS_DB = 10
-pool = redis.ConnectionPool(
-    host=REDIS_HOST,
-    port=REDIS_PORT,
-    password=REDISDB_USER_PASS,
-    db=REDIS_DB
-)
-rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
-redis_prefix = "savemongo"
-
-# mongo
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
-MONGO_DB = "py_spider"
-mcli = MongoClient(MONGO_HOST, MONGO_PORT)
-mongodb = mcli[MONGO_DB]
-
-# es
-ES_HOST = "172.17.145.178"
-ES_PORT = 9200
-ES_INDEX = "biddingall"
-try:
-    ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
-except ConnectionRefusedError as e:
-    logger.error(f"es服务拒绝访问,原因:{e}")
-    ecli = None
-
-
-def err_msg(worker):
-    err = worker.exception()
-    if err:
-        logger.exception("[Send]worker err: {}".format(err))
-    return worker
-
-
-def literal_eval(node_or_string):
-    """反序列化数据"""
-    try:
-        return ast.literal_eval(node_or_string)
-    except ValueError as e:
-        if "malformed node or string" in e.args[0]:
-            from bson import Code, ObjectId  # eval变量作用域,ObjectId参数
-            import datetime
-            return eval(node_or_string)
-        else:
-            raise e
-
-
-def date2ts(date_str):
-    """日期转时间戳"""
-    if ":" in date_str:
-        ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
-    else:
-        ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
-    return ts
-
-
-def es_query(title, publish_time):
-    """
-    查询es
-
-    :param title: 标题
-    :param publish_time: 发布时间
-    :return:
-    """
-    if not ecli:
-        return 0  # 如果es检索服务异常,保证数据正常推送
-    publish_time = date2ts(publish_time)
-    stime = publish_time - 432000  # 往前推5天
-    etime = publish_time + 432000
-    # 通过发布标题和发布时间范围查询
-    query = {
-        "query": {
-            "bool": {
-                "must": [
-                    {
-                        "multi_match": {
-                            "query": title,
-                            "type": "phrase",
-                            "fields": ["title"]
-                        }
-                    },
-                    {"range": {'publishtime': {"from": stime, "to": etime}}}
-                ]
-            }
-        }
-    }
-    result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
-    total = int(result["hits"]["total"]['value'])
-    return total
-
-
-def rpush(name, values, is_redis_cluster=False):
-    """“将“values”推到列表“name”的尾部”"""
-    if isinstance(values, list):
-        pipe = rcli.pipeline()
-        if not is_redis_cluster:
-            pipe.multi()
-
-        for value in values:
-            pipe.rpush(name, value)
-        pipe.execute()
-    else:
-        return rcli.rpush(name, values)
-
-
-def handle_big_document(item):
-    if "contenthtml" in item:
-        item["contenthtml"] = re.sub("<img[^>]*>", "<br>", item["contenthtml"])
-
-
-def insert_one(table, item: Dict):
-    """MongoDB 单条入库"""
-    table = "".join(table.split(f"{redis_prefix}:"))
-    if item is not None:
-        item.pop("_id", "")
-        if item.get("comeintime"):
-            item["comeintime"] = int64.Int64(item["comeintime"])
-        try:
-            title = item.get("title")
-            result = mongodb[table].insert_one(item)
-            logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功")
-        except Exception as e:
-            if "BSON document too large" in ''.join(e.args):
-                handle_big_document(item)  # MongoDB文档保存要求 BSON 大小限制 16 MB
-
-            # rpush(get_redis_key(table), item)
-            rpush(table, item)
-            logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
-
-
-def delay_push_to_db(table_name, data, delay_time=43200):
-    """
-    第三方数据,需延时入库,推送爬虫生产库
-
-    @param table_name: 表名
-    @param data: 延时的数据
-    @param delay_time: 延时时长,单位:秒
-    @return:
-    """
-    site = data.get("item").get("site")
-    title = data.get("item").get("title")
-    time_diff = int(time.time()) - data.get("comeintime")
-    if time_diff <= delay_time:
-        rpush(table_name, data)
-        logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送")
-    else:
-        logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库")
-        insert_one(table_name, data)
-    return True
-
-
-def es_retrieval_push_to_db(table_name, data):
-    """
-    通过es(近3月增量数据)进行数据去重,推送爬虫生产库
-
-    @param table_name: 表名
-    @param data: 判重数据
-    @return:
-    """
-    site = data.get("item").get("site")
-    title = data.get("item").get("title")
-    pt = data.get("item").get("publishtime")
-    if not title or not pt:  # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
-        return False
-
-    count = es_query(title.strip(), pt)
-    if count == 0:
-        insert_one(table_name, data)
-    logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索")
-    return True
-
-
-def mixture_process_push_to_db(table_name, data, delay_time=43200):
-    """
-    延时 + es检索 混合检索数据,推送爬虫生产库
-
-    @param table_name: 表名
-    @param data: 判重数据
-    @param delay_time: 延时时长,单位:秒
-    @return:
-    """
-    site = data.get("item").get("site")
-    title = data.get("item").get("title")
-    pt = data.get("item").get("publishtime")
-    if not title and not pt:  # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
-        return False
-
-    is_continue = False
-    time_diff = int(time.time()) - data.get("comeintime")
-    count = es_query(title.strip(), pt)
-    if count == 0:
-        if time_diff <= delay_time:
-            rpush(table_name, data)
-        else:
-            insert_one(table_name, data)
-        is_continue = True
-
-    msg = "保持轮询检索" if is_continue else "删除重复数据"
-    logger.info(f"[Send]{site}-{title}-{msg}--混合检索")
-    return True
-
-
-def sync_data(table: str):
-    """
-    保存数据
-
-    @param table:
-    @return:
-    """
-    redis_key = table
-    total = rcli.llen(redis_key)
-    logger.info(f"[Send]同步数据表名:{table},推送总数:{total}")
-    for _ in range(total):
-        obj = rcli.lpop(redis_key)
-        if obj is None:
-            logger.warning(f"[Send]{table} 错误数据:{obj}")
-            continue
-
-        try:
-            item = literal_eval(obj)
-            if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]):
-                insert_one(table, item)
-            else:
-                is_delay = item.get("is_delay")  # 延时推送
-                is_es_retrieval = item.get("if_es")  # es检索
-                if is_delay and is_es_retrieval:
-                    mixture_process_push_to_db(table, item)
-                elif is_delay and not is_es_retrieval:
-                    delay_push_to_db(table, item)
-                elif not is_delay and is_es_retrieval:
-                    es_retrieval_push_to_db(table, item)
-                else:
-                    insert_one(table, item)
-        except Exception as e:
-            rpush(table, obj)
-            logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
-
-
-@func_set_timeout(60 * 20)
-def main():
-    logger.info("[Send]同步数据开始")
-    with ThreadPoolExecutor() as threadPool:
-        futures = []
-        for key in rcli.keys(f"{redis_prefix}:*"):
-            table = key.decode()
-            f = threadPool.submit(sync_data, table)
-            f.add_done_callback(err_msg)
-            futures.append(f)
-        wait(futures)
-    logger.info("[Send]同步数据结束")
-
-
-if __name__ == '__main__':
-    try:
-        main()
-    except FunctionTimedOut:
-        logger.warning("[Send]同步数据超时")

+ 0 - 240
A数据处理/sync_data/summary.py

@@ -1,240 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2023-04-04
----------
-@summary:  心跳数据汇总推送到list,目前仅汇总了列表页采集
----------
-@author: dzr
-"""
-from datetime import datetime, time, timedelta
-
-from bson.int64 import Int64
-from bson.son import SON
-from pymongo import MongoClient
-
-from log import logger
-
-# mongo
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
-MONGO_DB = "py_spider"
-client = MongoClient(MONGO_HOST, MONGO_PORT)
-mongodb = client[MONGO_DB]
-
-# 数据生产表
-data_bak = mongodb["data_bak"]
-
-# 心跳表
-spider_heartbeat = mongodb["pyspider_heartbeat"]
-
-# 采集任务列表
-ybw_list = mongodb["ybw_list"]
-zbytb_list = mongodb["zbytb_list"]
-
-# 主题爬虫采集任务表
-zgzb_list = mongodb["zgzb_list"]
-
-# 列表页汇总表
-summary_table = mongodb["list"]
-
-
-def save(documents, collection, ordered=False):
-    """保存数据"""
-    is_list = isinstance(documents, list)
-    documents = documents if is_list else [documents]
-
-    data_lst = []
-    for item in documents:
-        item.pop("_id", None)
-        data_lst.append(item)
-        if len(data_lst) == 100:
-            ret = collection.insert_many(data_lst, ordered)
-            logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
-            data_lst = []
-
-    if data_lst:
-        collection.insert_many(data_lst, ordered)
-
-    logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
-    return len(documents)
-
-
-def pick_data(items, runtime, only_count_list_page=False):
-    """聚合的数据进行分类"""
-    results = []
-
-    spidercode = items["spidercode"]
-    site = items["site"]
-    data = {
-        "business_type": items["business_type"],
-        "site": site,
-        "channel": items["channel"],
-        "spidercode": spidercode,
-        "count": items["count"],
-        "rel_count": items["rel_count"],
-        "runtime": runtime,
-        "create_at": Int64(datetime.now().timestamp())
-    }
-
-    if only_count_list_page:
-        if str(items["business_type"]).endswith("List"):
-            results.append(data)
-    else:
-        results.append(data)
-
-    return results
-
-
-def feapder_crawl_aggregate_of_list_pages(datestr=None):
-    """feapder采集列表页数据汇总(前一天的数据)"""
-    if datestr is None:
-        today = datetime.now().date()
-        yesterday = today + timedelta(days=-1)
-        datestr = yesterday.strftime("%Y-%m-%d")
-
-    count = 0
-    pipeline = [
-        {"$match": {"runtime": datestr}},
-        {
-            "$group": {
-                "_id": "$batch_no",
-                "rel_count": {"$sum": "$rel_count"},
-                "count": {"$sum": "$count"},
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-                "business_type": {"$first": "$business_type"},
-            }
-        },
-        {"$sort": SON([("rel_count", -1)])}
-    ]
-    #  $group 阶段的内存限制为100M,默认情况下,如果stage超过此限制,$group 将产生错误,
-    #  若要允许处理大型数据集,请将 allowDiskUse 选项设置为 true 以启用 $group
-    #  操作以写入临时文件。
-    cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
-    try:
-        results = []
-        for doc in cursor:
-            results.extend(pick_data(doc, datestr, True))
-        count = save(results, summary_table)
-    finally:
-        client.close()
-        logger.info(f"feapder - 数据汇总 {count} 条")
-
-
-def competing_products_crawl_aggregate(collection, datestr=None):
-    """竞品采集聚合查询"""
-    if datestr is not None:
-        today = datetime.fromisoformat(datestr).date()
-    else:
-        today = datetime.now().date()
-    yesterday = today + timedelta(days=-1)
-
-    count = 0
-    publish_time = yesterday.strftime("%Y-%m-%d")
-    pipeline = [
-        {
-            "$addFields": {
-                "rel_count": {
-                    "$cond": {
-                        "if": {"$ne": ["$count", 0]},
-                        "then": 1,
-                        "else": 0
-                    }
-                }
-            }
-        },
-        {"$match": {"publishtime": publish_time}},
-        {
-            "$group": {
-                "_id": "$channel",
-                "count": {"$sum": 1},  # 当天采集总数
-                "rel_count": {"$sum": "$rel_count"},  # es检索结果为0的总数
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-                "business_type": {
-                    "$first": {
-                        "$cond": {
-                            "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
-                            "then": "List",
-                            "else": "$business_type"
-                        }
-                    }
-                },
-            }
-        },
-    ]
-    cursor = collection.aggregate(pipeline, allowDiskUse=True)
-    try:
-        results = []
-        for doc in cursor:
-            results.extend(pick_data(doc, publish_time))
-        count = save(results, summary_table)
-    finally:
-        client.close()
-        return count
-
-
-def competing_products_crawl_aggregate_of_list_pages(datestr=None):
-    """竞品采集列表页数据汇总"""
-    count = 0
-    count += competing_products_crawl_aggregate(ybw_list, datestr)
-    count += competing_products_crawl_aggregate(zbytb_list, datestr)
-    logger.info(f"竞品采集 - 数据汇总 {count} 条")
-
-
-def zgzb_crawl_aggregate_of_list_pages(datestr=None):
-    if datestr is not None:
-        today = datetime.fromisoformat(datestr).date()
-    else:
-        today = datetime.now().date()
-    yesterday = today + timedelta(days=-1)
-
-    runtime = yesterday.strftime("%Y-%m-%d")
-    start_time = int(datetime.combine(yesterday, time()).timestamp())
-    end_time = int(datetime.combine(today, time()).timestamp())
-
-    count = 0
-    pipeline = [
-        {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
-        {
-            "$group": {
-                "_id": "$spidercode",
-                "count": {"$sum": 1},  # 当天采集总数
-                "rel_count": {"$sum": 1},  # 当天采集总数
-                "site": {"$first": "$site"},
-                "channel": {"$first": "$channel"},
-                "spidercode": {"$first": "$spidercode"},
-                "business_type": {
-                    "$first": {
-                        "$cond": {
-                            "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
-                            "then": "List",
-                            "else": "$business_type"
-                        }
-                    }
-                },
-            }
-        },
-    ]
-    cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True)
-    try:
-        results = []
-        for doc in cursor:
-            results.extend(pick_data(doc, runtime))
-        count = save(results, summary_table)
-    finally:
-        client.close()
-        logger.info(f"中国招标投标公共服务平台 - 数据汇总 {count} 条")
-
-
-def start_summary():
-    feapder_crawl_aggregate_of_list_pages()
-    competing_products_crawl_aggregate_of_list_pages()
-    zgzb_crawl_aggregate_of_list_pages()
-    logger.info("数据汇总完成")
-
-
-if __name__ == '__main__':
-    start_summary()