瀏覽代碼

日统计采集信息统计错误问题排查维护:
1、列表总采集量
2、采集任务总运行次数

dongzhaorui 1 年之前
父節點
當前提交
378f0d6712
共有 1 個文件被更改,包括 123 次插入132 次删除
  1. 123 132
      A数据处理/sync_data/monitor.py

+ 123 - 132
A数据处理/sync_data/monitor.py

@@ -2,13 +2,12 @@
 """
 Created on 2023-04-05
 ---------
-@summary:  爬虫运行监控(feapder + py_spiders)
+@summary:  爬虫运行监控(feapder)和日常采集统计
 ---------
 @author: Dzr
 """
 import hashlib
 from datetime import datetime, timedelta
-from operator import itemgetter
 
 from bson.int64 import Int64
 from bson.son import SON
@@ -152,6 +151,7 @@ def aggregate_query_count(runtime):
     @param runtime: 运行时间
     @return:
     """
+    spider_dict = {}  # spidercode 与 site 排查记录
     aggregate_items = {}
     pipeline = [
         {"$match": {"runtime": runtime}},
@@ -160,97 +160,69 @@ def aggregate_query_count(runtime):
                 "_id": "$batch_no",
                 "rel_count": {"$sum": "$rel_count"},  # 入库量(去重)
                 "count": {"$sum": "$count"},  # 下载量
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "business_type": "$business_type"
-                    }
-                }
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
+                "business_type": {"$first": "$business_type"},
             }
         },
         {"$sort": SON([("rel_count", -1)])}
     ]
     results = aggregate_query(spider_heartbeat, pipeline)
-    for doc in results:
-        spider_dict = {}
-        spider_item = doc["spider_item"]
-        for items in spider_item:
-            site = items["site"]
-            channel = items["channel"]
-            spidercode = items["spidercode"]
-            business_type = items["business_type"]
-
-            if len(spider_item) > 1 and site not in special_sites:
-                logger.warning(f"{spidercode} -> {site} --存在风险, {len(spider_item)}")
-
-            hash_key = get_md5(**items)  # 防止多站点对应1个spidercode,数据相互重叠
-            if not hash_key:
-                logger.error(f"异常批次号 {doc['_id']}")
-                continue
-
-            is_list = str(business_type).endswith("List")
-            if not aggregate_items.get(hash_key):
-                data = {
-                    "batch_no": doc["_id"],
-                    "site": site,
-                    "channel": channel,
-                    "spidercode": spidercode,
-                    "business_type": business_type,
-                    "runtime": runtime,
-                    "spidercode_at_site_num": 0,  # 爬虫代码与站点对应的关系数量
-                    "frame": "feapder"  # 采集框架
-                }
-                if is_list:
-                    data["list_count"] = doc["count"]
-                    data["list_rel_count"] = doc["rel_count"]
-                    data["list_runtimes"] = 1
-                    data["detail_count"] = 0
-                    data["detail_rel_count"] = 0
-                    data["detail_runtimes"] = 0
-                else:
-                    data["detail_count"] = doc["count"]
-                    data["detail_rel_count"] = doc["rel_count"]
-                    data["detail_runtimes"] = 1
-                    data["list_count"] = 0
-                    data["list_rel_count"] = 0
-                    data["list_runtimes"] = 0
-
-                aggregate_items.setdefault(hash_key, data)
+    for items in results:
+        site = items["site"]
+        channel = items["channel"]
+        spidercode = items["spidercode"]
+        business_type = items["business_type"]
+        business_type = "list" if business_type.endswith("List") else "detail"
+
+        hash_key = get_md5(**items)  # 爬虫查询标识
+        if not hash_key:
+            logger.error(f"异常批次号 {items['_id']}")
+            continue
+
+        # 通过爬虫业务类型 分拣数据
+        data_dict = {
+            "site": site,
+            "channel": channel,
+            "spidercode": spidercode,
+            "runtime": runtime,
+            "spidercode_at_site_num": 0,  # 爬虫代码与站点对应的关系数量
+            "frame": "feapder",  # 采集框架
+            business_type: {
+                "batch_no": items["_id"],
+                "count": items["count"],
+                "rel_count": items["rel_count"],
+                "runtimes": 1
+            }
+        }
+        if not aggregate_items.get(hash_key):
+            aggregate_items.setdefault(hash_key, data_dict)
+        else:
+            aggregate_items[hash_key][business_type] = data_dict[business_type]
+
+        # 监控爬虫任务,当 spidercode_at_site_num > 1
+        # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员
+        if site not in special_sites:
+            spider = aggregate_items[hash_key]
+            if spidercode not in spider_dict:
+                spider["spidercode_at_site_num"] = 1
+                values = {"keys": [hash_key], "sites": [site]}
+                spider_dict.setdefault(spidercode, values)
             else:
-                data = aggregate_items.get(hash_key)
-                if is_list:
-                    data["list_count"] += doc["count"]
-                    data["list_rel_count"] += doc["rel_count"]
-                    data["list_runtimes"] += 1
-                else:
-                    data["detail_count"] += doc["count"]
-                    data["detail_rel_count"] += doc["rel_count"]
-                    data["detail_runtimes"] += 1
-
-                aggregate_items.update({hash_key: data})
-
-            # 监控爬虫任务,当 spidercode_at_site_num > 1
-            # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员
-            if site not in special_sites:
-                spider = aggregate_items[hash_key]
-                if spidercode not in spider_dict:
-                    spider["spidercode_at_site_num"] = 1
-                    values = {"keys": [hash_key], "sites": [site]}
-                    spider_dict.setdefault(spidercode, values)
-                else:
-                    # 相同 spidercode 但 site 不同的爬虫进行计数+1
-                    sites = spider_dict[spidercode]["sites"]
-                    if site not in sites:
-                        keys = spider_dict[spidercode]["keys"]
-                        for key in keys:
-                            # 更新相同 spidercode 的 spidercode_at_site_num
-                            aggregate_items[key]["spidercode_at_site_num"] += 1
-
-                        keys.append(hash_key)  # 记录新爬虫
-                        sites.append(site)  # 添加新站点
-                        spider["spidercode_at_site_num"] = len(site)
+                # spidercode 相同,但 site 不同的爬虫进行计数+1
+                sites = spider_dict[spidercode]["sites"]
+                if site not in sites:
+                    keys = spider_dict[spidercode]["keys"]
+                    for key in keys:
+                        # 更新相同 spidercode 的 spidercode_at_site_num
+                        aggregate_items[key]["spidercode_at_site_num"] += 1
+
+                    keys.append(hash_key)  # 记录新爬虫
+                    sites.append(site)  # 添加新站点
+                    spider["spidercode_at_site_num"] = len(sites)
+        else:
+            aggregate_items[hash_key]["spidercode_at_site_num"] = 1
 
     return aggregate_items
 
@@ -297,15 +269,17 @@ def aggregate_query_crawl_list(runtime):
     return aggregate_items
 
 
-def aggregate_query_crawlab_information(runtime):
-    """feapder爬虫采集聚合查询crawlab平台运行信息统计"""
-    aggregate_items = {}
+def aggregate_count_crawlab_update_runtimes(runtime):
+    """feapder爬虫采集聚合查询crawlab平台运行信息"""
     pipeline = [
         {
             "$project": {
                 "_id": 0,
+                "site": 1,
+                "channel": 1,
                 "batch_no": 1,
                 "spidercode": 1,
+                "business_type": 1,
                 "runtime": 1,
                 "node_ip": 1,
                 "crawlab_taskid": 1,
@@ -316,46 +290,43 @@ def aggregate_query_crawlab_information(runtime):
         {
             "$group": {
                 "_id": "$batch_no",
+                "business_type": {"$first": "$business_type"},
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
                 "crawlab_item": {
                     "$addToSet": {
-                        "spidercode": "$spidercode",
-                        "create_at": "$create_at",
                         "node_ip": "$node_ip",
-                        "crawlab_taskid": "$crawlab_taskid"
+                        "crawlab_taskid": "$crawlab_taskid",
                     },
                 }
             }
         }
     ]
     results = aggregate_query(spider_heartbeat, pipeline)
-    for doc in results:
-        crawlab_item = sorted(doc["crawlab_item"], key=itemgetter("create_at"), reverse=True)
-        items: dict = crawlab_item[0]
-        spidercode = items.pop("spidercode")
-        if not aggregate_items.get(spidercode):
-            aggregate_items.setdefault(spidercode, items)
+    for items in results:
+        runtimes = len(items["crawlab_item"])  # 采集任务运行次数
 
-    return aggregate_items
+        # 通过爬虫业务类型 分拣数据
+        business_type = items["business_type"]
+        business_type = "list" if business_type.endswith("List") else "detail"
+
+        hash_key = get_md5(**items)  # 爬虫查询标识
+        # 更新聚合统计任务运行次数
+        aggregate_count_items[hash_key][business_type]["runtimes"] = runtimes
 
 
 _runtime = get_runtime()  # 统计时间
 aggregate_count_items = aggregate_query_count(_runtime)
 aggregate_crawl_list_items = aggregate_query_crawl_list(_runtime)
-aggregate_query_crawlab_items = aggregate_query_crawlab_information(_runtime)
-
-
-def get_node_and_taskid(spidercode, default=None):
-    """获取最新爬虫工作节点和任务id"""
-    if aggregate_query_crawlab_items.get(spidercode):
-        default = aggregate_query_crawlab_items[spidercode]
-    return default
+aggregate_count_crawlab_update_runtimes(_runtime)
 
 
 def get_list_isgetdata(hash_key, default=0):
     """列表页是否采集数据"""
     query_result = aggregate_count_items.get(hash_key)
     if query_result:
-        default = query_result["list_count"]
+        default = query_result["list"]["count"]
     return True if default > 0 else False
 
 
@@ -368,8 +339,25 @@ def get_list_allintimes(hash_key, default=0):
 
 def get_list_runtimes(hash_key, default=0):
     """列表采集运行频次"""
-    if aggregate_count_items.get(hash_key):
-        default = aggregate_count_items[hash_key]["list_runtimes"]
+    query_result = aggregate_count_items.get(hash_key)
+    if query_result and "list" in query_result:
+        default = aggregate_count_items[hash_key]["list"]["runtimes"]
+    return default
+
+
+def get_list_count(hash_key, default=0):
+    """列表采集总数"""
+    query_result = aggregate_count_items.get(hash_key)
+    if query_result and "list" in query_result:
+        default = aggregate_count_items[hash_key]["list"]["count"]
+    return default
+
+
+def get_list_rel_count(hash_key, default=0):
+    """列表实际入库总数"""
+    query_result = aggregate_count_items.get(hash_key)
+    if query_result and "list" in query_result:
+        default = aggregate_count_items[hash_key]["list"]["rel_count"]
     return default
 
 
@@ -382,18 +370,26 @@ def get_list_nodatatimes(hash_key, default=-1):
 
 def get_detail_downloadnum(hash_key, default=0):
     """详情页下载量"""
-    if aggregate_count_items.get(hash_key):
-        default = aggregate_count_items[hash_key]["detail_count"]
+    query_result = aggregate_count_items.get(hash_key)
+    if query_result and "detail" in query_result:
+        default = aggregate_count_items[hash_key]["detail"]["count"]
     return default
 
 
+get_detail_count = get_detail_downloadnum
+
+
 def get_detail_downloadsuccessnum(hash_key, default=0):
     """详情页下载成功量"""
-    if aggregate_count_items.get(hash_key):
-        default = aggregate_count_items[hash_key]["detail_rel_count"]
+    query_result = aggregate_count_items.get(hash_key)
+    if query_result and "detail" in query_result:
+        default = aggregate_count_items[hash_key]["detail"]["rel_count"]
     return default
 
 
+get_detail_rel_count = get_detail_downloadsuccessnum
+
+
 def get_detail_downloadfailnum(**kwargs):
     """详情页下载失败量"""
     count = -1
@@ -414,32 +410,27 @@ def start_monitor():
         hash_key = get_md5(site, channel, spidercode)
         query_result = aggregate_count_items.get(hash_key)
         if query_result:
-            crawlab = get_node_and_taskid(spidercode)  # crawlab 运行详情
-            if crawlab:
-                join_data["py_taskid"] = crawlab["crawlab_taskid"] or ""
-                join_data["py_nodename"] = crawlab["node_ip"] or ""
-
-            # 聚合查询 - 统计数据采集详情信息(来自采集心跳)
+            # 聚合查询 - 统计数据采集信息
             result = query_result
-            join_data["frame"] = result["frame"]  # 采集框架
-            join_data["batch_no"] = result["batch_no"]
-            join_data["business_type"] = result["business_type"]
-            join_data["list_count"] = result["list_count"]
-            join_data["list_rel_count"] = result["list_rel_count"]
-            join_data["detail_count"] = result["detail_count"]
-            join_data["detail_rel_count"] = result["detail_rel_count"]
+            join_data["frame"] = result["frame"]  # 采集框架名
             join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
 
-        # 采集汇总 - 列表数据
+        # 聚合统计 - 列表采集数据
         join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
         join_data["list_allintimes"] = get_list_allintimes(hash_key)
         join_data["list_runtimes"] = get_list_runtimes(hash_key)
         join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key)
-        # 采集汇总 - 详情数据
+        join_data["list_count"] = get_list_count(hash_key)
+        join_data["list_rel_count"] = get_list_rel_count(hash_key)
+
+        # 聚合统计 - 详情采集数据
+        join_data["detail_count"] = get_detail_count(hash_key)
+        join_data["detail_rel_count"] = get_detail_rel_count(hash_key)
         join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
         join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
         join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
-        # 监控是否有效
+
+        # 日统计采集数据是否有效
         frame = join_data.get("frame")
         if frame and frame == "feapder":
             join_data["is_valid"] = True
@@ -448,7 +439,7 @@ def start_monitor():
         logger.info(f"{site} {channel} {spidercode} --统计完成")
     # 上传数据库
     save(summary_queue, spider_monitor)
-    logger.info(f"爬虫监控 - 统计完成")
+    logger.info(f"爬虫监控 - 统计完成")
 
 
 if __name__ == '__main__':