dongzhaorui 2 роки тому
батько
коміт
8cf4b7d277
1 змінених файлів з 81 додано та 66 видалено
  1. 81 66
      A数据处理/sync_data/monitor_summary.py

+ 81 - 66
A数据处理/sync_data/monitor_summary.py

@@ -1,3 +1,11 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-05
+---------
+@summary:  爬虫监控数据汇总
+---------
+@author: dzr
+"""
 import hashlib
 from datetime import datetime, time, timedelta
 
@@ -37,15 +45,30 @@ spider_monitor = mongodb1["spider_monitor"]
 spider_lua_config = mongodb2["luaconfig"]
 
 
-def get_hask_key(*args):
+def get_hask_key(*args, **kwargs):
     """
     @summary: 获取唯一的32位md5
     ---------
-    @param *args: 参与联合去重的值
+    @param args: 参与联合去重的值集合
+    @param kwargs: 参与联合去重的值字典
     ---------
     @result: 7c8684bcbdfcea6697650aa53d7b1405
     """
-    join_data = "_".join(*args).encode()
+    conditions = ["site", "channel", "spidercode"]
+    if args:
+        trait = list(filter(lambda x: x is not None, args))
+    else:
+        trait = set()
+        for k, v in kwargs.items():
+            if k in conditions and kwargs[k] is not None:
+                trait.add(kwargs[k])
+
+    if not trait or len(trait) != 3:
+        # raise AttributeError(f"缺少{conditions}属性")
+        logger.error(f"[Monitor]缺少{conditions}属性,内容:{trait}")
+        return None
+
+    join_data = "_".join(sorted(trait)).encode()
     return hashlib.md5(join_data).hexdigest()
 
 
@@ -57,20 +80,20 @@ def save(documents, collection):
 
     count = 0
     data_lst = []
-    for item in documents:
-        item.pop("_id", None)
-        item.pop("business_type", None)
-        item["comeintime"] = Int64(datetime.now().timestamp())
-        data_lst.append(item)
+    for items in documents:
+        items.pop("_id", None)
+        items.pop("business_type", None)
+        items["comeintime"] = Int64(datetime.now().timestamp())
+        data_lst.append(items)
         count += 1
         if len(data_lst) % 100 == 0:
             collection.insert_many(data_lst)
             data_lst.clear()
-            logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
+            logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
 
     # 提交剩余数据
     collection.insert_many(data_lst)
-    logger.info(f"{collection.name}-批量保存{count}条数据--已完成")
+    logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
 
 
 def get_runtime(datestr=None):
@@ -98,7 +121,7 @@ def get_crawler_basic_information():
             })
     finally:
         client.close()
-        logger.info(f"爬虫采集日报--共计{len(crawler_lst)}个爬虫")
+        logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫")
         yield from crawler_lst
 
 
@@ -111,16 +134,17 @@ def get_node_and_taskid(runtime, spidercode):
     return result
 
 
-
-
 def aggregate_query(runtime):
     """feapder采集聚合查询"""
+    websites = []
+    aggregate_items = {}
+
     pipeline = [
         {"$match": {"runtime": runtime}},
         {
             "$group": {
                 "_id": "$spider_id",
-                "rel_count": {"$sum": "$rel_count"},  # 实际下载
+                "rel_count": {"$sum": "$rel_count"},  # 入库
                 "count": {"$sum": "$count"},  # 下载量
                 "spider_item": {
                     "$addToSet": {
@@ -134,69 +158,54 @@ def aggregate_query(runtime):
         },
         {"$sort": SON([("rel_count", -1)])}
     ]
-
-    aggregate_items = {}
-    website_lst = []
-
     cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
     try:
         for doc in cursor:
             spider_item = doc["spider_item"]
 
-            spidercode_at_site_num = 0
-
-            for item in spider_item:
-                site = item["site"]
-                channel = item["channel"]
-                spidercode = item["spidercode"]
-
-                hash_key = get_hask_key([site, channel, spidercode])  # 防止多站点对应1个spidercode,数据相互重叠
+            for items in spider_item:
+                site = items["site"]
+                channel = items["channel"]
+                spidercode = items["spidercode"]
+                business_type = items["business_type"]
 
-                same_site = True
-                if site not in website_lst:
-                    same_site = False
-                    website_lst.append(site)
+                if len(spider_item) > 1:
+                    logger.warning(f"{spidercode} -> {site}--对应的关系数量异常")
 
-                if not same_site and aggregate_items.get(hash_key):
-                    aggregate_items.get(hash_key)["spidercode_at_site_num"] += 1
-                else:
-                    spidercode_at_site_num += 1
+                is_list = str(business_type).endswith("List")
+                hash_key = get_hask_key(**items)  # 防止多站点对应1个spidercode,数据相互重叠
+                if not hash_key:
+                    logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
+                    continue
 
                 if not aggregate_items.get(hash_key):
                     data = {
-                        "business_type": item["business_type"],
                         "spider_id": doc["_id"],
                         "site": site,
-                        "channel": item["channel"],
+                        "channel": items["channel"],
                         "spidercode": spidercode,
+                        "business_type": business_type,
                         "runtime": runtime,
-                        "spidercode_at_site_num": spidercode_at_site_num  # 爬虫代码对应的站点数量
+                        "spidercode_at_site_num": 0  # 爬虫代码与站点对应的关系数量
                     }
-
-                    is_list = str(item["business_type"]).endswith("List")
                     if is_list:
                         data["list_count"] = doc["count"]
                         data["list_rel_count"] = doc["rel_count"]
+                        data["list_runtimes"] = 1
                         data["detail_count"] = 0
                         data["detail_rel_count"] = 0
-                        data["list_runtimes"] = 1
                         data["detail_runtimes"] = 0
                     else:
-                        data["list_count"] = 0
-                        data["list_rel_count"] = 0
                         data["detail_count"] = doc["count"]
                         data["detail_rel_count"] = doc["rel_count"]
                         data["detail_runtimes"] = 1
+                        data["list_count"] = 0
+                        data["list_rel_count"] = 0
                         data["list_runtimes"] = 0
 
-                    if len(spider_item) > 1:
-                        logger.warning(f"{spidercode} -> {site} --映射关系错误")
-
                     aggregate_items.setdefault(hash_key, data)
-
                 else:
                     data = aggregate_items.get(hash_key)
-                    is_list = str(item["business_type"]).endswith("List")
                     if is_list:
                         data["list_count"] += doc["count"]
                         data["list_rel_count"] += doc["rel_count"]
@@ -208,13 +217,24 @@ def aggregate_query(runtime):
 
                     aggregate_items.update({hash_key: data})
 
+                # 监控爬虫任务,当 spidercode_at_site_num > 1
+                # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组
+                if site not in websites:
+                    aggregate_items[hash_key]["spidercode_at_site_num"] = 1
+                    websites.append(site)
+                else:
+                    aggregate_items[hash_key]["spidercode_at_site_num"] += 1
+
+    # except Exception as e:
+    #     logger.exception(e)
+
     finally:
         client.close()
         return aggregate_items
 
 
-runtime = get_runtime()
-aggregate_results = aggregate_query(runtime)
+_runtime = get_runtime()
+aggregate_results = aggregate_query(_runtime)
 
 
 def get_list_isgetdata(hash_key):
@@ -282,39 +302,34 @@ def main():
         site = crawler["site"]
         channel = crawler["channel"]
         spidercode = crawler["spidercode"]
-        hash_key = get_hask_key([site, channel, spidercode])
 
+        join_data = {**crawler}  # 添加爬虫基础数据
+        hash_key = get_hask_key(site, channel, spidercode)
         if aggregate_results.get(hash_key):
-            # 合并数据
-            join_data = {**crawler}
-            result = aggregate_results.get(hash_key)
+            # crawlab平台
+            crawlab = get_node_and_taskid(_runtime, spidercode)
+            if crawlab:
+                join_data["py_taskid"] = crawlab["crawlab_taskid"]
+                join_data["py_nodename"] = crawlab["node_ip"]
 
+            result = aggregate_results[hash_key]
+            # 查询聚合心跳统计结果
             join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
-
             join_data["business_type"] = result["business_type"]
             join_data["spider_id"] = result["spider_id"]
-
             join_data["list_count"] = result["list_count"]
             join_data["list_rel_count"] = result["list_rel_count"]
             join_data["detail_count"] = result["detail_count"]
             join_data["detail_rel_count"] = result["detail_rel_count"]
-
-            # crawlab平台
-            crawlab = get_node_and_taskid(runtime, spidercode)
-            if crawlab:
-                join_data["py_taskid"] = crawlab["crawlab_taskid"]
-                join_data["py_nodename"] = crawlab["node_ip"]
-
+            # 计算列表页数据
             join_data["list_isgetdata"] = get_list_isgetdata(hash_key)  # 列表页是否采集数据
             join_data["list_allintimes"] = get_list_allintimes(hash_key)  # 日采集列表的总入库量
             join_data["list_runtimes"] = get_list_runtimes(hash_key)  # 列表页采集运行频次
-
+            # 计算详情页数据
             join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)  # 详情页下载量
             join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)  # 详情页下载成功量
             join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"]  # 下载详情失败数量
-
         else:
-            join_data = {**crawler}
             join_data["list_isgetdata"] = False
             join_data["list_allintimes"] = -1
             join_data["list_runtimes"] = -1
@@ -322,7 +337,7 @@ def main():
             join_data["detail_downloadsuccessnum"] = -1
             join_data["detail_downloadfailnum"] = -1
 
-        logger.info(f"{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
+        logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
         summary_queue.append(join_data)
 
     save(summary_queue, spider_monitor)