浏览代码

update:聚合查询列表页采集结果;聚合查询crawlab平台运行结果

dongzhaorui 2 年之前
父节点
当前提交
725239b03c
共有 1 个文件被更改,包括 255 次插入150 次删除
  1. 255 150
      A数据处理/sync_data/monitor_summary.py

+ 255 - 150
A数据处理/sync_data/monitor_summary.py

@@ -14,6 +14,7 @@ from bson.son import SON
 from pymongo import MongoClient
 
 from log import logger
+from operator import itemgetter
 
 # mongo
 # MONGO_HOST = "172.17.4.87"
@@ -45,31 +46,37 @@ spider_monitor = mongodb1["spider_monitor"]
 spider_lua_config = mongodb2["luaconfig"]
 
 
-def get_hask_key(*args, **kwargs):
+def get_md5(*args, **kwargs):
     """
     @summary: 获取唯一的32位md5
     ---------
-    @param args: 参与联合去重的值集合
+    @param args: 参与联合去重的值数组
     @param kwargs: 参与联合去重的值字典
     ---------
     @result: 7c8684bcbdfcea6697650aa53d7b1405
     """
     conditions = ["site", "channel", "spidercode"]
-    if args:
-        trait = list(filter(lambda x: x is not None, args))
-    else:
-        trait = set()
-        for k, v in kwargs.items():
-            if k in conditions and kwargs[k] is not None:
-                trait.add(kwargs[k])
-
-    if not trait or len(trait) != 3:
+    data_lst = list(filter(lambda x: x is not None, args))
+    for k, v in kwargs.items():
+        if k in conditions and kwargs[k] and kwargs[k] not in data_lst:
+            data_lst.append(kwargs[k])
+
+    if not data_lst or len(data_lst) != 3:
         # raise AttributeError(f"缺少{conditions}属性")
-        logger.error(f"[Monitor]缺少{conditions}属性,内容:{trait}")
+        logger.error(f"[Monitor]缺少{conditions}属性,内容:{data_lst}")
         return None
 
-    join_data = "_".join(sorted(trait)).encode()
-    return hashlib.md5(join_data).hexdigest()
+    data_lst = sorted(data_lst)
+    content = "_".join(data_lst)
+    return hashlib.md5(str(content).encode()).hexdigest()
+
+
+def get_runtime(datestr=None):
+    if datestr is None:
+        today = datetime.now().date()
+        yesterday = today + timedelta(days=-1)
+        datestr = yesterday.strftime("%Y-%m-%d")
+    return datestr
 
 
 def save(documents, collection):
@@ -96,14 +103,6 @@ def save(documents, collection):
     logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
 
 
-def get_runtime(datestr=None):
-    if datestr is None:
-        today = datetime.now().date()
-        yesterday = today + timedelta(days=-1)
-        datestr = yesterday.strftime("%Y-%m-%d")
-    return datestr
-
-
 def get_crawler_basic_information():
     """爬虫基础信息"""
     crawler_lst = []
@@ -129,19 +128,36 @@ def get_crawler_basic_information():
         yield from crawler_lst
 
 
-def get_node_and_taskid(runtime, spidercode):
-    """获取最新爬虫工作节点和任务id"""
-    q = {"runtime": runtime, "spidercode": spidercode}
-    projection = {"node_ip": 1, "crawlab_taskid": 1, "_id": 0}
-    sort = [("_id", -1)]
-    result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
-    return result
+def aggregate_query(collection, pipeline, is_print_error=False):
+    """
+    聚合查询
+
+    @param collection: MongoDB集合
+    @param pipeline: 聚合查询条件
+    @param is_print_error: 是否在console打印错误日志
+    @return: 聚合结果
+    """
+    results = []
+    cursor = collection.aggregate(pipeline, allowDiskUse=True)
+    try:
+        for doc in cursor:
+            results.append(doc)
+    except Exception as e:
+        if is_print_error:
+            logger.exception(e)
+    finally:
+        client.close()
+        yield from results
 
 
-def aggregate_query(runtime, is_print_error=False):
-    """feapder采集聚合查询"""
-    aggregate_items = {}
+def aggregate_query_crawl_count(runtime):
+    """
+    feapder爬虫列表和详情采集聚合查询结果统计
 
+    @param runtime: 运行时间
+    @return:
+    """
+    aggregate_items = {}
     pipeline = [
         {"$match": {"runtime": runtime}},
         {
@@ -161,123 +177,221 @@ def aggregate_query(runtime, is_print_error=False):
         },
         {"$sort": SON([("rel_count", -1)])}
     ]
-    cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
-    try:
-        for doc in cursor:
-            spider_item = doc["spider_item"]
-            tmp_websites = []
-            for items in spider_item:
-                site = items["site"]
-                channel = items["channel"]
-                spidercode = items["spidercode"]
-                business_type = items["business_type"]
-
-                if len(spider_item) > 1:
-                    logger.warning(f"{spidercode} -> {site}--对应的关系数量异常, {len(spider_item) }")
-
-                is_list = str(business_type).endswith("List")
-                hash_key = get_hask_key(**items)  # 防止多站点对应1个spidercode,数据相互重叠
-                if not hash_key:
-                    logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
-                    continue
-
-                if not aggregate_items.get(hash_key):
-                    data = {
-                        "spider_id": doc["_id"],
-                        "site": site,
-                        "channel": items["channel"],
-                        "spidercode": spidercode,
-                        "business_type": business_type,
-                        "runtime": runtime,
-                        "spidercode_at_site_num": 0  # 爬虫代码与站点对应的关系数量
-                    }
-                    if is_list:
-                        data["list_count"] = doc["count"]
-                        data["list_rel_count"] = doc["rel_count"]
-                        data["list_runtimes"] = 1
-                        data["detail_count"] = 0
-                        data["detail_rel_count"] = 0
-                        data["detail_runtimes"] = 0
-                    else:
-                        data["detail_count"] = doc["count"]
-                        data["detail_rel_count"] = doc["rel_count"]
-                        data["detail_runtimes"] = 1
-                        data["list_count"] = 0
-                        data["list_rel_count"] = 0
-                        data["list_runtimes"] = 0
-
-                    aggregate_items.setdefault(hash_key, data)
+    results = aggregate_query(spider_heartbeat, pipeline)
+    for doc in results:
+        spider_item = doc["spider_item"]
+        label_dict = {}
+        for items in spider_item:
+            site = items["site"]
+            channel = items["channel"]
+            spidercode = items["spidercode"]
+            business_type = items["business_type"]
+
+            if site == "湖北省住房和城乡建设厅":
+                print("123")
+
+            if len(spider_item) > 1:
+                logger.warning(f"{spidercode} -> {site}--存在风险, {len(spider_item)}")
+
+            is_list = str(business_type).endswith("List")
+            hash_key = get_md5(**items)  # 防止多站点对应1个spidercode,数据相互重叠
+            if not hash_key:
+                # logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
+                logger.error(f"[Monitor]{doc['_id']}--爬虫异常")
+                continue
+
+            if not aggregate_items.get(hash_key):
+                data = {
+                    "spider_id": doc["_id"],
+                    "site": site,
+                    "channel": items["channel"],
+                    "spidercode": spidercode,
+                    "business_type": business_type,
+                    "runtime": runtime,
+                    "spidercode_at_site_num": 0  # 爬虫代码与站点对应的关系数量
+                }
+                if is_list:
+                    data["list_count"] = doc["count"]
+                    data["list_rel_count"] = doc["rel_count"]
+                    data["list_runtimes"] = 1
+                    data["detail_count"] = 0
+                    data["detail_rel_count"] = 0
+                    data["detail_runtimes"] = 0
                 else:
-                    data = aggregate_items.get(hash_key)
-                    if is_list:
-                        data["list_count"] += doc["count"]
-                        data["list_rel_count"] += doc["rel_count"]
-                        data["list_runtimes"] += 1
-                    else:
-                        data["detail_count"] += doc["count"]
-                        data["detail_rel_count"] += doc["rel_count"]
-                        data["detail_runtimes"] += 1
-
-                    aggregate_items.update({hash_key: data})
-
-                # 监控爬虫任务,当 spidercode_at_site_num > 1
-                # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组
-                if site not in tmp_websites:
-                    # TODO 排查下统计是否逻辑有问题
-                    aggregate_items[hash_key]["spidercode_at_site_num"] = 1
-                    tmp_websites.append(site)
+                    data["detail_count"] = doc["count"]
+                    data["detail_rel_count"] = doc["rel_count"]
+                    data["detail_runtimes"] = 1
+                    data["list_count"] = 0
+                    data["list_rel_count"] = 0
+                    data["list_runtimes"] = 0
+
+                aggregate_items.setdefault(hash_key, data)
+            else:
+                data = aggregate_items.get(hash_key)
+                if is_list:
+                    data["list_count"] += doc["count"]
+                    data["list_rel_count"] += doc["rel_count"]
+                    data["list_runtimes"] += 1
                 else:
-                    aggregate_items[hash_key]["spidercode_at_site_num"] += 1
+                    data["detail_count"] += doc["count"]
+                    data["detail_rel_count"] += doc["rel_count"]
+                    data["detail_runtimes"] += 1
+
+                aggregate_items.update({hash_key: data})
+
+            # 监控爬虫任务,当 spidercode_at_site_num > 1
+            # 表明创建的爬虫任务存在问题,问题反馈数据寻源人员
+            label = f"{business_type}_{spidercode}"
+            if label not in label_dict:
+                aggregate_items[hash_key]["spidercode_at_site_num"] = 1
+                conditions = {"keys": [hash_key], "websites": [site]}
+                label_dict.setdefault(label, conditions)
+            else:
+                # 相同spidercode但site不同的爬虫进行计数+1
+                websites = label_dict[label]["websites"]
+                if site not in websites:
+                    keys = label_dict[label]["keys"]
+                    for key in keys:
+                        aggregate_items[key]["spidercode_at_site_num"] += 1
+                    # 记录身份id - hash_key
+                    keys.append(hash_key)
+                    # 记录站点
+                    websites.append(site)
+                    aggregate_items[hash_key]["spidercode_at_site_num"] = len(websites)
+
+    return aggregate_items
+
+
+def aggregate_query_crawl_list(runtime):
+    """feapder列表爬虫采集聚合查询结果统计"""
+    aggregate_items = {}
+    pipeline = [
+        {"$match": {"runtime": runtime, "business_type": {"$regex": "List"}}},
+        {
+            "$group": {
+                "_id": "$spider_id",
+                "spider_item": {
+                    "$addToSet": {
+                        "site": "$site",
+                        "channel": "$channel",
+                        "spidercode": "$spidercode",
+                        "count": {"$ifNull": ["$count", 0]},
+                        "rel_count": {"$ifNull": ["$rel_count", 0]},
+                    }
+                }
+            }
+        }
+    ]
+    results = aggregate_query(spider_heartbeat, pipeline)
+    for doc in results:
+        spider_item = doc["spider_item"]
+        for item in spider_item:
+            hask_key = get_md5(**item)
+            if not aggregate_items.get(hask_key):
+                if all([
+                    item["count"] > 0,
+                    item["rel_count"] > 0,
+                    item["count"] == item["rel_count"]
+                ]):
+                    aggregate_items.setdefault(hask_key, {"list_allintimes": 1})
+            else:
+                aggregate_items.get(hask_key)["list_allintimes"] += 1
+    return aggregate_items
+
+
+def aggregate_query_crawlab_info(runtime):
+    """feapder爬虫采集聚合查询crawlab平台运行信息统计"""
+    aggregate_items = {}
+    pipeline = [
+        {
+            "$project": {
+                "_id": 0,
+                "spider_id": 1,
+                "spidercode": 1,
+                "runtime": 1,
+                "node_ip": 1,
+                "crawlab_taskid": 1,
+                "create_at": 1,
+            }
+        },
+        {"$match": {"runtime": runtime}},
+        {
+            "$group": {
+                "_id": "$spider_id",
+                "crawlab_item": {
+                    "$addToSet": {
+                        "spidercode": "$spidercode",
+                        "create_at": "$create_at",
+                        "node_ip": "$node_ip",
+                        "crawlab_taskid": "$crawlab_taskid"
+                    },
+                }
+            }
+        }
+    ]
+    results = aggregate_query(spider_heartbeat, pipeline)
+    for doc in results:
+        crawlab_item = sorted(doc["crawlab_item"], key=itemgetter("create_at"), reverse=True)
+        items: dict = crawlab_item[0]
+        spidercode = items.pop("spidercode")
+        if not aggregate_items.get(spidercode):
+            aggregate_items.setdefault(spidercode, items)
+    return aggregate_items
 
-    except Exception as e:
-        if is_print_error:
-            logger.exception(e)
 
-    finally:
-        client.close()
-        return aggregate_items
+_runtime = get_runtime()
+aggregate_results = aggregate_query_crawl_count(_runtime)
+aggregate_list_results = aggregate_query_crawl_list(_runtime)
+aggregate_query_crawlab_results = aggregate_query_crawlab_info(_runtime)
 
 
-_runtime = get_runtime()
-aggregate_results = aggregate_query(_runtime)
+def get_node_and_taskid(spidercode, default=None):
+    """获取最新爬虫工作节点和任务id"""
+    if aggregate_query_crawlab_results.get(spidercode):
+        default = aggregate_query_crawlab_results[spidercode]
+    return default
 
 
-def get_list_isgetdata(hash_key):
+def get_list_isgetdata(hash_key, default=0):
     """列表页是否采集数据"""
-    count = 0
     if aggregate_results.get(hash_key):
-        count += aggregate_results[hash_key]["list_count"]
-    return True if count > 0 else False
+        default = aggregate_results[hash_key]["list_count"]
+    return True if default > 0 else False
 
 
-def get_list_allintimes(hash_key):
-    """日采集列表的总入库量"""
-    count = 0
-    if aggregate_results.get(hash_key):
-        count += aggregate_results[hash_key]["list_rel_count"]
-    return count
+def get_list_allintimes(hash_key, default=0):
+    """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)"""
+    if aggregate_list_results.get(hash_key):
+        default = aggregate_list_results[hash_key]["list_allintimes"]
+    return default
 
 
-def get_list_runtimes(hash_key):
-    count = 0
+def get_list_runtimes(hash_key, default=0):
+    """列表采集运行频次"""
     if aggregate_results.get(hash_key):
-        count += aggregate_results.get(hash_key)["list_runtimes"]
-    return count
+        default = aggregate_results[hash_key]["list_runtimes"]
+    return default
 
 
-def get_detail_downloadnum(hash_key):
+def get_detail_downloadnum(hash_key, default=0):
     """详情页下载量"""
-    count = 0
     if aggregate_results.get(hash_key):
-        count += aggregate_results.get(hash_key)["detail_count"]
-    return count
+        default = aggregate_results[hash_key]["detail_count"]
+    return default
 
 
-def get_detail_downloadsuccessnum(hash_key):
+def get_detail_downloadsuccessnum(hash_key, default=0):
     """详情页下载成功量"""
-    count = 0
     if aggregate_results.get(hash_key):
-        count += aggregate_results.get(hash_key)["detail_rel_count"]
+        default = aggregate_results[hash_key]["detail_rel_count"]
+    return default
+
+
+def get_detail_downloadfailnum(**kwargs):
+    """详情页下载失败量"""
+    count = -1
+    if kwargs["detail_downloadnum"] >= 0 and kwargs["detail_downloadnum"] >= 0:
+        count = kwargs["detail_downloadnum"] - kwargs["detail_downloadsuccessnum"]
     return count
 
 
@@ -291,7 +405,6 @@ def get_count(document, business_type: str):
 
 
 def get_rel_count(document, business_type: str):
-
     if business_type.title() not in ["List", "Detail"]:
         raise ValueError("business_type")
 
@@ -309,43 +422,35 @@ def main():
         spidercode = crawler["spidercode"]
 
         join_data = {**crawler}  # 添加爬虫基础数据
-        hash_key = get_hask_key(site, channel, spidercode)
+        hash_key = get_md5(site, channel, spidercode)
         if aggregate_results.get(hash_key):
             # crawlab平台
-            crawlab = get_node_and_taskid(_runtime, spidercode)
+            crawlab = get_node_and_taskid(spidercode)
             if crawlab:
                 join_data["py_taskid"] = crawlab["crawlab_taskid"]
                 join_data["py_nodename"] = crawlab["node_ip"]
 
+            # 聚合查询心跳统计结果
             result = aggregate_results[hash_key]
-            # 查询聚合心跳统计结果
-            join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
-            join_data["business_type"] = result["business_type"]
             join_data["spider_id"] = result["spider_id"]
+            join_data["business_type"] = result["business_type"]
+            join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
             join_data["list_count"] = result["list_count"]
             join_data["list_rel_count"] = result["list_rel_count"]
             join_data["detail_count"] = result["detail_count"]
             join_data["detail_rel_count"] = result["detail_rel_count"]
-            # 计算列表页数据
-            join_data["list_isgetdata"] = get_list_isgetdata(hash_key)  # 列表页是否采集数据
-            # join_data["list_allintimes"] = get_list_allintimes(hash_key)  # 日采集列表的总入库量
-            join_data["list_allintimes"] = -1  # 暂不做统计,原因:无法获取列表页抽取的条目总数
-            join_data["list_runtimes"] = get_list_runtimes(hash_key)  # 列表页采集运行频次
-            # 计算详情页数据
-            join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)  # 详情页下载量
-            join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)  # 详情页下载成功量
-            join_data["detail_downloadfailnum"] = join_data["detail_downloadnum"] - join_data["detail_downloadsuccessnum"]  # 下载详情失败数量
-        else:
-            join_data["list_isgetdata"] = False
-            join_data["list_allintimes"] = -1
-            join_data["list_runtimes"] = -1
-            join_data["detail_downloadnum"] = -1
-            join_data["detail_downloadsuccessnum"] = -1
-            join_data["detail_downloadfailnum"] = -1
 
-        logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
+        # 列表采集汇总数据
+        join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
+        join_data["list_allintimes"] = get_list_allintimes(hash_key)
+        join_data["list_runtimes"] = get_list_runtimes(hash_key)
+        # 详情采集汇总数据
+        join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
+        join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
+        join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
         summary_queue.append(join_data)
-
+        logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
+    # 上传数据库
     save(summary_queue, spider_monitor)