# -*- coding: utf-8 -*- """ Created on 2023-04-04 --------- @summary: 心跳数据汇总推送到list,目前仅汇总了列表页采集 --------- @author: dzr """ from datetime import datetime, time, timedelta from bson.int64 import Int64 from bson.son import SON from pymongo import MongoClient from log import logger # mongo MONGO_HOST = "172.17.4.87" MONGO_PORT = 27080 MONGO_DB = "py_spider" client = MongoClient(MONGO_HOST, MONGO_PORT) mongodb = client[MONGO_DB] # 数据生产表 data_bak = mongodb["data_bak"] # 心跳表 spider_heartbeat = mongodb["pyspider_heartbeat"] # 采集任务列表 ybw_list = mongodb["ybw_list"] zbytb_list = mongodb["zbytb_list"] # 主题爬虫采集任务表 zgzb_list = mongodb["zgzb_list"] # 列表页汇总表 summary_table = mongodb["list"] def save(documents, collection, ordered=False): """保存数据""" is_list = isinstance(documents, list) documents = documents if is_list else [documents] data_lst = [] for item in documents: item.pop("_id", None) data_lst.append(item) if len(data_lst) == 100: ret = collection.insert_many(data_lst, ordered) logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据") data_lst = [] if data_lst: collection.insert_many(data_lst, ordered) logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据") return len(documents) def pick_data(items, runtime, only_count_list_page=False): """聚合的数据进行分类""" results = [] spidercode = items["spidercode"] site = items["site"] data = { "business_type": items["business_type"], "site": site, "channel": items["channel"], "spidercode": spidercode, "count": items["count"], "rel_count": items["rel_count"], "runtime": runtime, "create_at": Int64(datetime.now().timestamp()) } if only_count_list_page: if str(items["business_type"]).endswith("List"): results.append(data) else: results.append(data) return results def feapder_crawl_aggregate_of_list_pages(datestr=None): """feapder采集列表页数据汇总(前一天的数据)""" if datestr is None: today = datetime.now().date() yesterday = today + timedelta(days=-1) datestr = yesterday.strftime("%Y-%m-%d") count = 0 pipeline = [ {"$match": {"runtime": datestr}}, { "$group": { "_id": "$batch_no", "rel_count": {"$sum": "$rel_count"}, "count": {"$sum": "$count"}, "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, "business_type": {"$first": "$business_type"}, } }, {"$sort": SON([("rel_count", -1)])} ] # $group 阶段的内存限制为100M,默认情况下,如果stage超过此限制,$group 将产生错误, # 若要允许处理大型数据集,请将 allowDiskUse 选项设置为 true 以启用 $group # 操作以写入临时文件。 cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True) try: results = [] for doc in cursor: results.extend(pick_data(doc, datestr, True)) count = save(results, summary_table) finally: client.close() logger.info(f"feapder - 数据汇总 {count} 条") def competing_products_crawl_aggregate(collection, datestr=None): """竞品采集聚合查询""" if datestr is not None: today = datetime.fromisoformat(datestr).date() else: today = datetime.now().date() yesterday = today + timedelta(days=-1) count = 0 publish_time = yesterday.strftime("%Y-%m-%d") pipeline = [ { "$addFields": { "rel_count": { "$cond": { "if": {"$ne": ["$count", 0]}, "then": 1, "else": 0 } } } }, {"$match": {"publishtime": publish_time}}, { "$group": { "_id": "$channel", "count": {"$sum": 1}, # 当天采集总数 "rel_count": {"$sum": "$rel_count"}, # es检索结果为0的总数 "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, "business_type": { "$first": { "$cond": { "if": {"$eq": [{"$type": "$business_type"}, "missing"]}, "then": "List", "else": "$business_type" } } }, } }, ] cursor = collection.aggregate(pipeline, allowDiskUse=True) try: results = [] for doc in cursor: results.extend(pick_data(doc, publish_time)) count = save(results, summary_table) finally: client.close() return count def competing_products_crawl_aggregate_of_list_pages(datestr=None): """竞品采集列表页数据汇总""" count = 0 count += competing_products_crawl_aggregate(ybw_list, datestr) count += competing_products_crawl_aggregate(zbytb_list, datestr) logger.info(f"竞品采集 - 数据汇总 {count} 条") def zgzb_crawl_aggregate_of_list_pages(datestr=None): if datestr is not None: today = datetime.fromisoformat(datestr).date() else: today = datetime.now().date() yesterday = today + timedelta(days=-1) runtime = yesterday.strftime("%Y-%m-%d") start_time = int(datetime.combine(yesterday, time()).timestamp()) end_time = int(datetime.combine(today, time()).timestamp()) count = 0 pipeline = [ {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}}, { "$group": { "_id": "$spidercode", "count": {"$sum": 1}, # 当天采集总数 "rel_count": {"$sum": 1}, # 当天采集总数 "site": {"$first": "$site"}, "channel": {"$first": "$channel"}, "spidercode": {"$first": "$spidercode"}, "business_type": { "$first": { "$cond": { "if": {"$eq": [{"$type": "$business_type"}, "missing"]}, "then": "List", "else": "$business_type" } } }, } }, ] cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True) try: results = [] for doc in cursor: results.extend(pick_data(doc, runtime)) count = save(results, summary_table) finally: client.close() logger.info(f"中国招标投标公共服务平台 - 数据汇总 {count} 条") def start_summary(): feapder_crawl_aggregate_of_list_pages() competing_products_crawl_aggregate_of_list_pages() zgzb_crawl_aggregate_of_list_pages() logger.info("数据汇总完成") if __name__ == '__main__': start_summary()