123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-04
- ---------
- @summary: 心跳数据汇总推送到list,目前仅汇总了列表页采集
- ---------
- @author: dzr
- """
- from datetime import datetime, time, timedelta
- from bson.int64 import Int64
- from bson.son import SON
- from pymongo import MongoClient
- from log import logger
- MONGO_HOST = "127.0.0.1"
- MONGO_PORT = 27001
- MONGO_DB = "py_spider"
- # mongo
- # MONGO_HOST = "172.17.4.87"
- # MONGO_PORT = 27080
- # MONGO_DB = "py_spider"
- client = MongoClient(MONGO_HOST, MONGO_PORT)
- mongodb = client[MONGO_DB]
- # 爬虫数据表
- data_bak = mongodb["data_bak"]
- # 心跳表
- spider_heartbeat = mongodb["spider_heartbeat"]
- # py_spiders列表
- py_spiders_crawl_list = mongodb["crawl_data"]
- # 竞品列表
- ybw_list = mongodb["ybw_list"]
- zbytb_list = mongodb["zbytb_list"]
- # 主题爬虫
- zgzb_list = mongodb["zgzb_list"]
- # 列表页汇总表
- # summary_table_of_list_pages = mongodb["list"]
- summary_table_of_list_pages = mongodb["123qqq"]
- def save(documents, collection):
- """保存数据"""
- is_list = isinstance(documents, list)
- documents = documents if is_list else [documents]
- count = 0
- data_lst = []
- for item in documents:
- item.pop("_id", None)
- data_lst.append(item)
- count += 1
- if len(data_lst) % 100 == 0:
- collection.insert_many(data_lst)
- data_lst.clear()
- logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
- # 提交剩余数据
- if len(data_lst) > 0:
- collection.insert_many(data_lst)
- logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
- return count
- def summary_data(document, runtime, only_count_list_page=False):
- """对聚合的数据进行汇总和分类"""
- summary_lst = []
- spider_item = document["spider_item"]
- for item in spider_item:
- spidercode = item["spidercode"]
- site = item["site"]
- data = {
- "business_type": item["business_type"],
- "site": site,
- "channel": item["channel"],
- "spidercode": spidercode,
- "count": document["count"],
- "rel_count": document["rel_count"],
- "runtime": runtime,
- "create_at": Int64(datetime.now().timestamp())
- }
- if len(spider_item) > 1:
- logger.warning(f"[Summary]{spidercode} -> {site} --映射关系错误")
- data["warning"] = "spidercode业务对应关系错误"
- if only_count_list_page:
- if str(item["business_type"]).endswith("List"):
- summary_lst.append(data)
- continue
- summary_lst.append(data)
- return summary_lst
- def feapder_crawl_aggregate_of_list_pages(datestr=None):
- """feapder采集列表页数据汇总(前一天的数据)"""
- if datestr is None:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- datestr = yesterday.strftime("%Y-%m-%d")
- pipeline = [
- {"$match": {"runtime": datestr}},
- {
- "$group": {
- "_id": "$spider_id",
- "rel_count": {"$sum": "$rel_count"},
- "count": {"$sum": "$count"},
- "spider_item": {
- "$addToSet": {
- "site": "$site",
- "channel": "$channel",
- "spidercode": "$spidercode",
- "business_type": "$business_type"
- }
- }
- }
- },
- {"$sort": SON([("rel_count", -1)])}
- ]
- # $group阶段的内存限制为100M,默认情况下,如果stage超过此限制,
- # $group将产生错误,但是,要允许处理大型数据集,请将allowDiskUse选项设置为true以启用$group操作以写入临时文件。
- cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(summary_data(doc, datestr, True))
- save(results, summary_table_of_list_pages)
- finally:
- client.close()
- logger.info("[Summary]feapder数据汇总结束")
- def py_spiders_crawl_aggregate_of_list_pages(datestr=None):
- """py_spiders采集列表页数据汇总(前一天的数据)"""
- if datestr is not None:
- today = datetime.fromisoformat(datestr).date()
- else:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- runtime = yesterday.strftime("%Y-%m-%d")
- start_time = int(datetime.combine(yesterday, time()).timestamp())
- end_time = int(datetime.combine(today, time()).timestamp())
- pipeline = [
- {
- "$addFields": {
- "rel_count": {
- "$cond": {
- "if": {"$ne": ["$finished", True]},
- "then": 1,
- "else": 0
- }
- }
- }
- },
- {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
- {
- "$group": {
- "_id": "$spidercode",
- "count": {"$sum": 1}, # 当天采集总数
- "rel_count": {"$sum": 1}, # 当天采集总数
- # "rel_count": {"$sum": "$rel_count"}, # 当天采集详情总数(仅成功)
- "spider_item": {
- "$addToSet": {
- "site": "$site",
- "channel": "$channel",
- "spidercode": "$spidercode",
- "business_type": "List"
- }
- }
- }
- },
- {"$sort": SON([("rel_count", -1)])}
- ]
- cursor = py_spiders_crawl_list.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(summary_data(doc, runtime))
- save(results, summary_table_of_list_pages)
- finally:
- client.close()
- logger.info("[Summary]py_spiders数据汇总结束")
- def competing_products_crawl_aggregate(collection, datestr=None):
- """竞品采集聚合查询"""
- if datestr is not None:
- today = datetime.fromisoformat(datestr).date()
- else:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- publish_time = yesterday.strftime("%Y-%m-%d")
- table_name = collection.name
- pipeline = [
- {
- "$addFields": {
- "rel_count": {
- "$cond": {
- "if": {"$ne": ["$count", 0]},
- "then": 1,
- "else": 0
- }
- }
- }
- },
- {"$match": {"publishtime": publish_time}},
- {
- "$group": {
- "_id": "$channel",
- "count": {"$sum": 1}, # 当天采集总数
- "rel_count": {"$sum": "$rel_count"}, # es检索结果为0的总数
- "spider_item": {
- "$addToSet": {
- "site": "$site",
- "channel": "$channel",
- "spidercode": "$spidercode",
- "business_type": "List"
- }
- }
- }
- },
- ]
- cursor = collection.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(summary_data(doc, publish_time))
- save(results, summary_table_of_list_pages)
- finally:
- client.close()
- logger.info(f"[Summary]{table_name}数据汇总结束")
- def competing_products_crawl_aggregate_of_list_pages(datestr=None):
- """竞品采集列表页数据汇总"""
- competing_products_crawl_aggregate(ybw_list, datestr)
- competing_products_crawl_aggregate(zbytb_list, datestr)
- def zgzb_crawl_aggregate_of_list_pages(datestr=None):
- if datestr is not None:
- today = datetime.fromisoformat(datestr).date()
- else:
- today = datetime.now().date()
- yesterday = today + timedelta(days=-1)
- runtime = yesterday.strftime("%Y-%m-%d")
- start_time = int(datetime.combine(yesterday, time()).timestamp())
- end_time = int(datetime.combine(today, time()).timestamp())
- pipeline = [
- {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
- {
- "$group": {
- "_id": "$spidercode",
- "count": {"$sum": 1}, # 当天采集总数
- "rel_count": {"$sum": 1}, # 当天采集总数
- "spider_item": {
- "$addToSet": {
- "site": "$site",
- "channel": "$channel",
- "spidercode": "$spidercode",
- "business_type": "List"
- }
- }
- }
- },
- ]
- cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True)
- try:
- results = []
- for doc in cursor:
- results.extend(summary_data(doc, runtime))
- save(results, summary_table_of_list_pages)
- finally:
- client.close()
- logger.info("[Summary]zgzb_list数据汇总结束")
- if __name__ == '__main__':
- feapder_crawl_aggregate_of_list_pages()
- py_spiders_crawl_aggregate_of_list_pages()
- competing_products_crawl_aggregate_of_list_pages()
- zgzb_crawl_aggregate_of_list_pages()
|