123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-04
- ---------
- @summary: 心跳数据汇总推送到list,目前仅汇总了列表页采集
- ---------
- @author: dzr
- """
- from datetime import datetime, time, timedelta
- from bson.int64 import Int64
- from bson.son import SON
- from pymongo import MongoClient
- from log import logger
- # mongo
- MONGO_HOST = "172.17.4.87"
- MONGO_PORT = 27080
- MONGO_DB = "py_spider"
- client = MongoClient(MONGO_HOST, MONGO_PORT)
- mongodb = client[MONGO_DB]
- # 数据生产表
- data_bak = mongodb["data_bak"]
- # 心跳表
- spider_heartbeat = mongodb["pyspider_heartbeat"]
- # 采集任务列表
- ybw_list = mongodb["ybw_list"]
- zbytb_list = mongodb["zbytb_list"]
- # 主题爬虫采集任务表
- zgzb_list = mongodb["zgzb_list"]
- # 列表页汇总表
- summary_table = mongodb["list"]
- def save(documents, collection, ordered=False):
- """保存数据"""
- is_list = isinstance(documents, list)
- documents = documents if is_list else [documents]
- data_lst = []
- for item in documents:
- item.pop("_id", None)
- data_lst.append(item)
- if len(data_lst) == 100:
- ret = collection.insert_many(data_lst, ordered)
- logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
- data_lst = []
- if data_lst:
- collection.insert_many(data_lst, ordered)
- logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
- return len(documents)
- def pick_data(items, runtime, only_count_list_page=False):
- """聚合的数据进行分类"""
- results = []
- spidercode = items["spidercode"]
- site = items["site"]
- data = {
- "business_type": items["business_type"],
- "site": site,
- "channel": items["channel"],
- "spidercode": spidercode,
- "count": items["count"],
- "rel_count": items["rel_count"],
- "runtime": runtime,
- "create_at": Int64(datetime.now().timestamp())
- }
- if only_count_list_page:
- if str(items["business_type"]).endswith("List"):
- results.append(data)
- else:
- results.append(data)
- return results
- def feapder_crawl_aggregate_of_list_pages(datestr=None):
- """feapder采集列表页数据汇总(前一天的数据)"""
- if datestr is None:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- datestr = yesterday.strftime("%Y-%m-%d")
- count = 0
- pipeline = [
- {"$match": {"runtime": datestr}},
- {
- "$group": {
- "_id": "$batch_no",
- "rel_count": {"$sum": "$rel_count"},
- "count": {"$sum": "$count"},
- "site": {"$first": "$site"},
- "channel": {"$first": "$channel"},
- "spidercode": {"$first": "$spidercode"},
- "business_type": {"$first": "$business_type"},
- }
- },
- {"$sort": SON([("rel_count", -1)])}
- ]
- # $group 阶段的内存限制为100M,默认情况下,如果stage超过此限制,$group 将产生错误,
- # 若要允许处理大型数据集,请将 allowDiskUse 选项设置为 true 以启用 $group
- # 操作以写入临时文件。
- cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(pick_data(doc, datestr, True))
- count = save(results, summary_table)
- finally:
- client.close()
- logger.info(f"feapder - 数据汇总 {count} 条")
- def competing_products_crawl_aggregate(collection, datestr=None):
- """竞品采集聚合查询"""
- if datestr is not None:
- today = datetime.fromisoformat(datestr).date()
- else:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- count = 0
- publish_time = yesterday.strftime("%Y-%m-%d")
- pipeline = [
- {
- "$addFields": {
- "rel_count": {
- "$cond": {
- "if": {"$ne": ["$count", 0]},
- "then": 1,
- "else": 0
- }
- }
- }
- },
- {"$match": {"publishtime": publish_time}},
- {
- "$group": {
- "_id": "$channel",
- "count": {"$sum": 1}, # 当天采集总数
- "rel_count": {"$sum": "$rel_count"}, # es检索结果为0的总数
- "site": {"$first": "$site"},
- "channel": {"$first": "$channel"},
- "spidercode": {"$first": "$spidercode"},
- "business_type": {
- "$first": {
- "$cond": {
- "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
- "then": "List",
- "else": "$business_type"
- }
- }
- },
- }
- },
- ]
- cursor = collection.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(pick_data(doc, publish_time))
- count = save(results, summary_table)
- finally:
- client.close()
- return count
- def competing_products_crawl_aggregate_of_list_pages(datestr=None):
- """竞品采集列表页数据汇总"""
- count = 0
- count += competing_products_crawl_aggregate(ybw_list, datestr)
- count += competing_products_crawl_aggregate(zbytb_list, datestr)
- logger.info(f"竞品采集 - 数据汇总 {count} 条")
- def zgzb_crawl_aggregate_of_list_pages(datestr=None):
- if datestr is not None:
- today = datetime.fromisoformat(datestr).date()
- else:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- runtime = yesterday.strftime("%Y-%m-%d")
- start_time = int(datetime.combine(yesterday, time()).timestamp())
- end_time = int(datetime.combine(today, time()).timestamp())
- count = 0
- pipeline = [
- {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
- {
- "$group": {
- "_id": "$spidercode",
- "count": {"$sum": 1}, # 当天采集总数
- "rel_count": {"$sum": 1}, # 当天采集总数
- "site": {"$first": "$site"},
- "channel": {"$first": "$channel"},
- "spidercode": {"$first": "$spidercode"},
- "business_type": {
- "$first": {
- "$cond": {
- "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
- "then": "List",
- "else": "$business_type"
- }
- }
- },
- }
- },
- ]
- cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(pick_data(doc, runtime))
- count = save(results, summary_table)
- finally:
- client.close()
- logger.info(f"中国招标投标公共服务平台 - 数据汇总 {count} 条")
- def start_summary():
- feapder_crawl_aggregate_of_list_pages()
- competing_products_crawl_aggregate_of_list_pages()
- zgzb_crawl_aggregate_of_list_pages()
- logger.info("数据汇总完成")
- if __name__ == '__main__':
- start_summary()
|