Pārlūkot izejas kodu

update:添加"is_valid"字段,用于区分监控信息

dongzhaorui 2 gadi atpakaļ
vecāks
revīzija
0b05f903ac
1 mainītis faili ar 134 papildinājumiem un 37 dzēšanām
  1. 134 37
      A数据处理/sync_data/monitor.py

+ 134 - 37
A数据处理/sync_data/monitor_summary.py → A数据处理/sync_data/monitor.py

@@ -2,9 +2,9 @@
 """
 Created on 2023-04-05
 ---------
-@summary:  爬虫监控数据汇总
+@summary:  爬虫运行监控(feapder + py_spiders)
 ---------
-@author: dzr
+@author: Dzr
 """
 import hashlib
 from datetime import datetime, time, timedelta
@@ -58,19 +58,21 @@ def get_md5(*args, **kwargs):
     ---------
     @result: 7c8684bcbdfcea6697650aa53d7b1405
     """
-    conditions = ["site", "channel", "spidercode"]
-    data_lst = list(filter(lambda x: x is not None, args))
-    for k, v in kwargs.items():
-        if k in conditions and (v and v not in data_lst):
-            data_lst.append(v)
-
-    if not data_lst or len(data_lst) != 3:
-        # raise AttributeError(f"缺少{conditions}属性")
-        logger.error(f"[Monitor]缺少{conditions}属性,内容:{data_lst}")
-        return None
-
-    data_lst = sorted(data_lst)
-    content = "_".join(data_lst)
+    if len(args) != 1:
+        conditions = ["site", "channel", "spidercode"]
+        data_lst = list(filter(lambda x: x is not None, args))
+        for k, v in kwargs.items():
+            if k in conditions and (v and v not in data_lst):
+                data_lst.append(v)
+
+        if not data_lst or len(data_lst) != 3:
+            logger.error(f"[Monitor]条件缺失{conditions},当前内容:{data_lst}")
+            return None
+
+        data_lst = sorted(data_lst)
+        content = "_".join(data_lst)
+    else:
+        content = args[0]
     return hashlib.md5(str(content).encode()).hexdigest()
 
 
@@ -84,7 +86,6 @@ def get_runtime(datestr=None):
 
 def save(documents, collection):
     """保存数据"""
-
     is_list = isinstance(documents, list)
     documents = documents if is_list else [documents]
 
@@ -102,7 +103,8 @@ def save(documents, collection):
             logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
 
     # 提交剩余数据
-    collection.insert_many(data_lst)
+    if len(data_lst) > 0:
+        collection.insert_many(data_lst)
     logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
 
 
@@ -120,7 +122,7 @@ def get_crawler_basic_information():
             crawler_lst.append({
                 "site": doc["site"],
                 "channel": doc["channel"],
-                "spidercode": doc["code"],
+                "code": doc["code"],
                 "modifyid": doc["modifyuserid"],
                 "modifyuser": doc["modifyuser"],
                 "event": doc["event"]
@@ -191,7 +193,7 @@ def aggregate_query_crawl_count(runtime):
             business_type = items["business_type"]
 
             if len(spider_item) > 1 and site not in special_sites:
-                logger.warning(f"[Monitor]{spidercode} -> {site}--存在风险, {len(spider_item)}")
+                logger.warning(f"[Monitor]{spidercode} -> {site} --存在风险, {len(spider_item)}")
 
             is_list = str(business_type).endswith("List")
 
@@ -209,7 +211,8 @@ def aggregate_query_crawl_count(runtime):
                     "spidercode": spidercode,
                     "business_type": business_type,
                     "runtime": runtime,
-                    "spidercode_at_site_num": 0  # 爬虫代码与站点对应的关系数量
+                    "spidercode_at_site_num": 0,  # 爬虫代码与站点对应的关系数量
+                    "frame": "feapder"  # 采集框架
                 }
                 if is_list:
                     data["list_count"] = doc["count"]
@@ -264,6 +267,76 @@ def aggregate_query_crawl_count(runtime):
     return aggregate_items
 
 
+def aggregate_query_py_spiders(runtime):
+    """py_spiders爬虫列表和详情采集聚合查询结果统计"""
+    aggregate_items = {}
+    if runtime is not None:
+        today = datetime.fromisoformat(runtime).date()
+    else:
+        today = datetime.now().date()
+    yesterday = today + timedelta(days=-1)
+
+    runtime = yesterday.strftime("%Y-%m-%d")
+    start_time = int(datetime.combine(yesterday, time()).timestamp())
+    end_time = int(datetime.combine(today, time()).timestamp())
+
+    pipeline = [
+        {
+            "$addFields": {
+                "rel_count": {
+                    "$cond": {
+                        "if": {"$ne": ["$finished", True]},
+                        "then": 1,
+                        "else": 0
+                    }
+                }
+            }
+        },
+        {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
+        {
+            "$group": {
+                "_id": "$spidercode",
+                "count": {"$sum": 1},  # 当天采集总数
+                "rel_count": {"$sum": 1},  # 当天采集总数
+                "spider_item": {
+                    "$addToSet": {
+                        "site": "$site",
+                        "channel": "$channel",
+                        "spidercode": "$spidercode",
+                        "business_type": "List"
+                    }
+                }
+            }
+        },
+    ]
+    results = aggregate_query(py_spiders_crawl_list, pipeline)
+    for doc in results:
+        spider_item = doc["spider_item"]
+        for items in spider_item:
+            site = items["site"]
+            channel = items["channel"]
+            spidercode = items["spidercode"]
+            business_type = items["business_type"]
+            hask_key = get_md5(site, channel, spidercode)
+            spider_id = get_md5(spidercode + business_type + runtime)
+            data = {
+                "spider_id": spider_id,
+                "site": site,
+                "channel": items["channel"],
+                "spidercode": spidercode,
+                "business_type": business_type,
+                "runtime": runtime,
+                "list_count": doc["count"],
+                "list_rel_count": doc["rel_count"],
+                "detail_count": 0,
+                "detail_rel_count": 0,
+                "spidercode_at_site_num": 1,  # 爬虫代码与站点对应的关系数量
+                "frame": "py_spiders"
+            }
+            aggregate_items[hask_key] = data
+    return aggregate_items
+
+
 def aggregate_query_crawl_list(runtime):
     """feapder列表爬虫采集聚合查询结果统计"""
     aggregate_items = {}
@@ -290,14 +363,18 @@ def aggregate_query_crawl_list(runtime):
         for item in spider_item:
             hask_key = get_md5(**item)
             if not aggregate_items.get(hask_key):
-                if all([
-                    item["count"] > 0,
-                    item["rel_count"] > 0,
-                    item["count"] == item["rel_count"]
-                ]):
-                    aggregate_items.setdefault(hask_key, {"list_allintimes": 1})
-            else:
-                aggregate_items.get(hask_key)["list_allintimes"] += 1
+                values = {"list_allintimes": 0, "list_nodatatimes": 0}
+                aggregate_items.setdefault(hask_key, values)
+
+            if all([
+                item["count"] > 0,
+                item["rel_count"] > 0,
+                item["count"] == item["rel_count"]
+            ]):
+                aggregate_items[hask_key]["list_allintimes"] += 1
+
+            if item["count"] == 0:
+                aggregate_items[hask_key]["list_nodatatimes"] = 1
     return aggregate_items
 
 
@@ -343,6 +420,7 @@ def aggregate_query_crawlab_info(runtime):
 
 _runtime = get_runtime()
 aggregate_results = aggregate_query_crawl_count(_runtime)
+aggregate_query_py_spiders = aggregate_query_py_spiders(_runtime)
 aggregate_list_results = aggregate_query_crawl_list(_runtime)
 aggregate_query_crawlab_results = aggregate_query_crawlab_info(_runtime)
 
@@ -356,8 +434,10 @@ def get_node_and_taskid(spidercode, default=None):
 
 def get_list_isgetdata(hash_key, default=0):
     """列表页是否采集数据"""
-    if aggregate_results.get(hash_key):
-        default = aggregate_results[hash_key]["list_count"]
+    query_result = (aggregate_results.get(hash_key)
+                    or aggregate_query_py_spiders.get(hash_key))
+    if query_result:
+        default = query_result["list_count"]
     return True if default > 0 else False
 
 
@@ -375,6 +455,13 @@ def get_list_runtimes(hash_key, default=0):
     return default
 
 
+def get_list_nodatatimes(hash_key, default=-1):
+    """列表页采集无数据次数(过滤后)"""
+    if aggregate_list_results.get(hash_key):
+        default = aggregate_list_results[hash_key]["list_nodatatimes"]
+    return default
+
+
 def get_detail_downloadnum(hash_key, default=0):
     """详情页下载量"""
     if aggregate_results.get(hash_key):
@@ -401,37 +488,47 @@ def main():
     summary_queue = []
     crawlers = get_crawler_basic_information()
     for crawler in crawlers:
+        join_data = {**crawler, "is_valid": False}  # 创建爬虫基础数据
+
         site = crawler["site"]
         channel = crawler["channel"]
-        spidercode = crawler["spidercode"]
+        spidercode = crawler["code"]
 
-        join_data = {**crawler}  # 添加爬虫基础数据
         hash_key = get_md5(site, channel, spidercode)
-        if aggregate_results.get(hash_key):
+        query_result = (aggregate_results.get(hash_key)
+                        or aggregate_query_py_spiders.get(hash_key))
+        if query_result:
             # crawlab平台
             crawlab = get_node_and_taskid(spidercode)
             if crawlab:
-                join_data["py_taskid"] = crawlab["crawlab_taskid"]
-                join_data["py_nodename"] = crawlab["node_ip"]
+                join_data["py_taskid"] = crawlab["crawlab_taskid"] or ""
+                join_data["py_nodename"] = crawlab["node_ip"] or ""
 
             # 聚合查询心跳统计结果
-            result = aggregate_results[hash_key]
+            result = query_result
+            join_data["frame"] = result["frame"]  # 采集框架
             join_data["spider_id"] = result["spider_id"]
             join_data["business_type"] = result["business_type"]
-            join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
             join_data["list_count"] = result["list_count"]
             join_data["list_rel_count"] = result["list_rel_count"]
             join_data["detail_count"] = result["detail_count"]
             join_data["detail_rel_count"] = result["detail_rel_count"]
+            join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
 
         # 列表采集汇总数据
         join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
         join_data["list_allintimes"] = get_list_allintimes(hash_key)
         join_data["list_runtimes"] = get_list_runtimes(hash_key)
+        join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key)
         # 详情采集汇总数据
         join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
         join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
         join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
+        # 监控是否有效
+        frame = join_data.get("frame")
+        if frame and frame == "feapder":
+            join_data["is_valid"] = True
+
         summary_queue.append(join_data)
         logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
     # 上传数据库