Browse Source

修复聚合查询列表爬虫采集详情数据重复问题;优化详情数据统计的精准率;删除py_spider统计数据

dongzhaorui 1 year ago
parent
commit
e2ed9e900b
1 changed files with 109 additions and 190 deletions
  1. 109 190
      A数据处理/sync_data/monitor.py

+ 109 - 190
A数据处理/sync_data/monitor.py

@@ -7,14 +7,14 @@ Created on 2023-04-05
 @author: Dzr
 """
 import hashlib
-from datetime import datetime, time, timedelta
+from datetime import datetime, timedelta
+from operator import itemgetter
 
 from bson.int64 import Int64
 from bson.son import SON
 from pymongo import MongoClient
 
 from log import logger
-from operator import itemgetter
 
 # mongo
 MONGO_HOST = "172.17.4.87"
@@ -27,19 +27,13 @@ MONGO_DB2 = "editor"
 mongodb1 = client[MONGO_DB1]
 mongodb2 = client[MONGO_DB2]
 
-# 爬虫数据表
+# 爬虫数据生产
 data_bak = mongodb1["data_bak"]
-
-# 心跳表
-spider_heartbeat = mongodb1["spider_heartbeat"]
-
-# py_spiders列表
-py_spiders_crawl_list = mongodb1["crawl_data"]
-
-# 列表页汇总表
+# 爬虫心跳表
+spider_heartbeat = mongodb1["pyspider_heartbeat"]
+# 日采集详情汇总表
 spider_monitor = mongodb1["spider_monitor"]
-
-# luaconfig表
+# 采集任务表
 spider_lua_config = mongodb2["luaconfig"]
 
 # 特殊网站
@@ -63,7 +57,7 @@ def get_md5(*args, **kwargs):
                 data_lst.append(v)
 
         if not data_lst or len(data_lst) != 3:
-            logger.error(f"[Monitor]条件缺失{conditions},当前内容:{data_lst}")
+            logger.error(f"组合条件缺失:{conditions},当前内容:{data_lst}")
             return None
 
         data_lst = sorted(data_lst)
@@ -81,31 +75,30 @@ def get_runtime(datestr=None):
     return datestr
 
 
-def save(documents, collection):
+def save(documents, collection, ordered=False):
     """保存数据"""
     is_list = isinstance(documents, list)
     documents = documents if is_list else [documents]
 
-    count = 0
     data_lst = []
     for items in documents:
         items.pop("_id", None)
         items.pop("business_type", None)
         items["comeintime"] = Int64(datetime.now().timestamp())
         data_lst.append(items)
-        count += 1
-        if len(data_lst) % 100 == 0:
-            collection.insert_many(data_lst)
-            data_lst.clear()
-            logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
+        if len(data_lst) == 100:
+            ret = collection.insert_many(data_lst, ordered)
+            logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
+            data_lst = []
 
     # 提交剩余数据
-    if len(data_lst) > 0:
-        collection.insert_many(data_lst)
-    logger.info(f"[Monitor]{collection.name}-批量上传{count}条数据--已保存")
+    if data_lst:
+        collection.insert_many(data_lst, ordered)
 
+    logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
 
-def get_crawler_basic_information():
+
+def get_spider_lst():
     """爬虫基础信息"""
     crawler_lst = []
     q = {"platform": "python", "state": 11}
@@ -126,7 +119,7 @@ def get_crawler_basic_information():
             })
     finally:
         client.close()
-        logger.info(f"[Monitor]爬虫采集监控--共计{len(crawler_lst)}个爬虫")
+        logger.info(f"爬虫监控 - 已上线 {len(crawler_lst)} 个爬虫")
         yield from crawler_lst
 
 
@@ -152,9 +145,9 @@ def aggregate_query(collection, pipeline, is_print_error=False):
         yield from results
 
 
-def aggregate_query_crawl_count(runtime):
+def aggregate_query_count(runtime):
     """
-    feapder爬虫列表和详情采集聚合查询结果统计
+    feapder爬虫列表和详情采集数据量统计聚合查询结果统计
 
     @param runtime: 运行时间
     @return:
@@ -164,8 +157,8 @@ def aggregate_query_crawl_count(runtime):
         {"$match": {"runtime": runtime}},
         {
             "$group": {
-                "_id": "$spider_id",
-                "rel_count": {"$sum": "$rel_count"},  # 入库量
+                "_id": "$batch_no",
+                "rel_count": {"$sum": "$rel_count"},  # 入库量(去重)
                 "count": {"$sum": "$count"},  # 下载量
                 "spider_item": {
                     "$addToSet": {
@@ -181,8 +174,8 @@ def aggregate_query_crawl_count(runtime):
     ]
     results = aggregate_query(spider_heartbeat, pipeline)
     for doc in results:
+        spider_dict = {}
         spider_item = doc["spider_item"]
-        label_dict = {}
         for items in spider_item:
             site = items["site"]
             channel = items["channel"]
@@ -190,21 +183,19 @@ def aggregate_query_crawl_count(runtime):
             business_type = items["business_type"]
 
             if len(spider_item) > 1 and site not in special_sites:
-                logger.warning(f"[Monitor]{spidercode} -> {site} --存在风险, {len(spider_item)}")
-
-            is_list = str(business_type).endswith("List")
+                logger.warning(f"{spidercode} -> {site} --存在风险, {len(spider_item)}")
 
             hash_key = get_md5(**items)  # 防止多站点对应1个spidercode,数据相互重叠
             if not hash_key:
-                # logger.error(f"[Monitor]{site}-{channel}-{spidercode}--监控异常")
-                logger.error(f"[Monitor]{doc['_id']}--爬虫异常")
+                logger.error(f"异常批次号 {doc['_id']}")
                 continue
 
+            is_list = str(business_type).endswith("List")
             if not aggregate_items.get(hash_key):
                 data = {
-                    "spider_id": doc["_id"],
+                    "batch_no": doc["_id"],
                     "site": site,
-                    "channel": items["channel"],
+                    "channel": channel,
                     "spidercode": spidercode,
                     "business_type": business_type,
                     "runtime": runtime,
@@ -241,96 +232,26 @@ def aggregate_query_crawl_count(runtime):
                 aggregate_items.update({hash_key: data})
 
             # 监控爬虫任务,当 spidercode_at_site_num > 1
-            # 表明创建的爬虫任务存在问题,问题反馈数据寻源人员
+            # 表明创建的爬虫任务存在问题,需反馈数据寻源相关人员
             if site not in special_sites:
-                label = f"{business_type}_{spidercode}"
-                if label not in label_dict:
-                    aggregate_items[hash_key]["spidercode_at_site_num"] = 1
-                    conditions = {"keys": [hash_key], "websites": [site]}
-                    label_dict.setdefault(label, conditions)
+                spider = aggregate_items[hash_key]
+                if spidercode not in spider_dict:
+                    spider["spidercode_at_site_num"] = 1
+                    values = {"keys": [hash_key], "sites": [site]}
+                    spider_dict.setdefault(spidercode, values)
                 else:
-                    # 相同spidercode但site不同的爬虫进行计数+1
-                    websites = label_dict[label]["websites"]
-                    if site not in websites:
-                        keys = label_dict[label]["keys"]
+                    # 相同 spidercode  site 不同的爬虫进行计数+1
+                    sites = spider_dict[spidercode]["sites"]
+                    if site not in sites:
+                        keys = spider_dict[spidercode]["keys"]
                         for key in keys:
+                            # 更新相同 spidercode 的 spidercode_at_site_num
                             aggregate_items[key]["spidercode_at_site_num"] += 1
-                        # 记录身份id - hash_key
-                        keys.append(hash_key)
-                        # 记录站点
-                        websites.append(site)
-                        aggregate_items[hash_key]["spidercode_at_site_num"] = len(websites)
-
-    return aggregate_items
-
-
-def aggregate_query_py_spiders(runtime):
-    """py_spiders爬虫列表和详情采集聚合查询结果统计"""
-    aggregate_items = {}
-    if runtime is not None:
-        today = datetime.fromisoformat(runtime).date()
-    else:
-        today = datetime.now().date()
-    yesterday = today + timedelta(days=-1)
 
-    runtime = yesterday.strftime("%Y-%m-%d")
-    start_time = int(datetime.combine(yesterday, time()).timestamp())
-    end_time = int(datetime.combine(today, time()).timestamp())
+                        keys.append(hash_key)  # 记录新爬虫
+                        sites.append(site)  # 添加新站点
+                        spider["spidercode_at_site_num"] = len(site)
 
-    pipeline = [
-        {
-            "$addFields": {
-                "rel_count": {
-                    "$cond": {
-                        "if": {"$ne": ["$finished", True]},
-                        "then": 1,
-                        "else": 0
-                    }
-                }
-            }
-        },
-        {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
-        {
-            "$group": {
-                "_id": "$spidercode",
-                "count": {"$sum": 1},  # 当天采集总数
-                "rel_count": {"$sum": 1},  # 当天采集总数
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "business_type": "List"
-                    }
-                }
-            }
-        },
-    ]
-    results = aggregate_query(py_spiders_crawl_list, pipeline)
-    for doc in results:
-        spider_item = doc["spider_item"]
-        for items in spider_item:
-            site = items["site"]
-            channel = items["channel"]
-            spidercode = items["spidercode"]
-            business_type = items["business_type"]
-            hask_key = get_md5(site, channel, spidercode)
-            spider_id = get_md5(spidercode + business_type + runtime)
-            data = {
-                "spider_id": spider_id,
-                "site": site,
-                "channel": items["channel"],
-                "spidercode": spidercode,
-                "business_type": business_type,
-                "runtime": runtime,
-                "list_count": doc["count"],
-                "list_rel_count": doc["rel_count"],
-                "detail_count": 0,
-                "detail_rel_count": 0,
-                "spidercode_at_site_num": 1,  # 爬虫代码与站点对应的关系数量
-                "frame": "py_spiders"
-            }
-            aggregate_items[hask_key] = data
     return aggregate_items
 
 
@@ -338,51 +259,52 @@ def aggregate_query_crawl_list(runtime):
     """feapder列表爬虫采集聚合查询结果统计"""
     aggregate_items = {}
     pipeline = [
-        {"$match": {"runtime": runtime, "business_type": {"$regex": "List"}}},
+        {
+            "$match": {
+                "runtime": runtime,
+                "business_type": {"$regex": "List"},
+                "status_code": {"$ne": -1}
+            }
+        },
         {
             "$group": {
-                "_id": "$spider_id",
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "count": {"$ifNull": ["$count", 0]},
-                        "rel_count": {"$ifNull": ["$rel_count", 0]},
-                    }
-                }
+                "_id": "$batch_no",
+                "count": {"$sum": "$count"},
+                "rel_count": {"$sum": "$rel_count"},
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
             }
         }
     ]
     results = aggregate_query(spider_heartbeat, pipeline)
-    for doc in results:
-        spider_item = doc["spider_item"]
-        for item in spider_item:
-            hask_key = get_md5(**item)
-            if not aggregate_items.get(hask_key):
-                values = {"list_allintimes": 0, "list_nodatatimes": 0}
-                aggregate_items.setdefault(hask_key, values)
-
-            if all([
-                item["count"] > 0,
-                item["rel_count"] > 0,
-                item["count"] == item["rel_count"]
-            ]):
-                aggregate_items[hask_key]["list_allintimes"] += 1
-
-            if item["count"] == 0:
-                aggregate_items[hask_key]["list_nodatatimes"] = 1
+    for items in results:
+        hask_key = get_md5(**items)
+        if not aggregate_items.get(hask_key):
+            values = {"list_allintimes": 0, "list_nodatatimes": 0}
+            aggregate_items.setdefault(hask_key, values)
+
+        if all([
+            items["count"] > 0,
+            items["rel_count"] > 0,
+            items["count"] == items["rel_count"]
+        ]):
+            aggregate_items[hask_key]["list_allintimes"] += 1
+
+        if items["count"] == 0:
+            aggregate_items[hask_key]["list_nodatatimes"] += 1
+
     return aggregate_items
 
 
-def aggregate_query_crawlab_info(runtime):
+def aggregate_query_crawlab_information(runtime):
     """feapder爬虫采集聚合查询crawlab平台运行信息统计"""
     aggregate_items = {}
     pipeline = [
         {
             "$project": {
                 "_id": 0,
-                "spider_id": 1,
+                "batch_no": 1,
                 "spidercode": 1,
                 "runtime": 1,
                 "node_ip": 1,
@@ -393,7 +315,7 @@ def aggregate_query_crawlab_info(runtime):
         {"$match": {"runtime": runtime}},
         {
             "$group": {
-                "_id": "$spider_id",
+                "_id": "$batch_no",
                 "crawlab_item": {
                     "$addToSet": {
                         "spidercode": "$spidercode",
@@ -412,27 +334,26 @@ def aggregate_query_crawlab_info(runtime):
         spidercode = items.pop("spidercode")
         if not aggregate_items.get(spidercode):
             aggregate_items.setdefault(spidercode, items)
+
     return aggregate_items
 
 
-_runtime = get_runtime()
-aggregate_results = aggregate_query_crawl_count(_runtime)
-aggregate_query_py_spiders = aggregate_query_py_spiders(_runtime)
-aggregate_list_results = aggregate_query_crawl_list(_runtime)
-aggregate_query_crawlab_results = aggregate_query_crawlab_info(_runtime)
+_runtime = get_runtime()  # 统计时间
+aggregate_count_items = aggregate_query_count(_runtime)
+aggregate_crawl_list_items = aggregate_query_crawl_list(_runtime)
+aggregate_query_crawlab_items = aggregate_query_crawlab_information(_runtime)
 
 
 def get_node_and_taskid(spidercode, default=None):
     """获取最新爬虫工作节点和任务id"""
-    if aggregate_query_crawlab_results.get(spidercode):
-        default = aggregate_query_crawlab_results[spidercode]
+    if aggregate_query_crawlab_items.get(spidercode):
+        default = aggregate_query_crawlab_items[spidercode]
     return default
 
 
 def get_list_isgetdata(hash_key, default=0):
     """列表页是否采集数据"""
-    query_result = (aggregate_results.get(hash_key)
-                    or aggregate_query_py_spiders.get(hash_key))
+    query_result = aggregate_count_items.get(hash_key)
     if query_result:
         default = query_result["list_count"]
     return True if default > 0 else False
@@ -440,36 +361,36 @@ def get_list_isgetdata(hash_key, default=0):
 
 def get_list_allintimes(hash_key, default=0):
     """日采集列表数量与入库数量相等的次数(扣除标题去重数量 + 增量(全量)去重数量)"""
-    if aggregate_list_results.get(hash_key):
-        default = aggregate_list_results[hash_key]["list_allintimes"]
+    if aggregate_crawl_list_items.get(hash_key):
+        default = aggregate_crawl_list_items[hash_key]["list_allintimes"]
     return default
 
 
 def get_list_runtimes(hash_key, default=0):
     """列表采集运行频次"""
-    if aggregate_results.get(hash_key):
-        default = aggregate_results[hash_key]["list_runtimes"]
+    if aggregate_count_items.get(hash_key):
+        default = aggregate_count_items[hash_key]["list_runtimes"]
     return default
 
 
 def get_list_nodatatimes(hash_key, default=-1):
     """列表页采集无数据次数(过滤后)"""
-    if aggregate_list_results.get(hash_key):
-        default = aggregate_list_results[hash_key]["list_nodatatimes"]
+    if aggregate_crawl_list_items.get(hash_key):
+        default = aggregate_crawl_list_items[hash_key]["list_nodatatimes"]
     return default
 
 
 def get_detail_downloadnum(hash_key, default=0):
     """详情页下载量"""
-    if aggregate_results.get(hash_key):
-        default = aggregate_results[hash_key]["detail_count"]
+    if aggregate_count_items.get(hash_key):
+        default = aggregate_count_items[hash_key]["detail_count"]
     return default
 
 
 def get_detail_downloadsuccessnum(hash_key, default=0):
     """详情页下载成功量"""
-    if aggregate_results.get(hash_key):
-        default = aggregate_results[hash_key]["detail_rel_count"]
+    if aggregate_count_items.get(hash_key):
+        default = aggregate_count_items[hash_key]["detail_rel_count"]
     return default
 
 
@@ -481,30 +402,27 @@ def get_detail_downloadfailnum(**kwargs):
     return count
 
 
-def main():
+def start_monitor():
     summary_queue = []
-    crawlers = get_crawler_basic_information()
-    for crawler in crawlers:
-        join_data = {**crawler, "is_valid": False}  # 创建爬虫基础数据
-
-        site = crawler["site"]
-        channel = crawler["channel"]
-        spidercode = crawler["code"]
+    spider_lst = get_spider_lst()
+    for spider in spider_lst:
+        site = spider["site"]
+        channel = spider["channel"]
+        spidercode = spider["code"]
 
+        join_data = {**spider, "is_valid": False}  # 创建爬虫基础数据
         hash_key = get_md5(site, channel, spidercode)
-        query_result = (aggregate_results.get(hash_key)
-                        or aggregate_query_py_spiders.get(hash_key))
+        query_result = aggregate_count_items.get(hash_key)
         if query_result:
-            # crawlab平台
-            crawlab = get_node_and_taskid(spidercode)
+            crawlab = get_node_and_taskid(spidercode)  # crawlab 运行详情
             if crawlab:
                 join_data["py_taskid"] = crawlab["crawlab_taskid"] or ""
                 join_data["py_nodename"] = crawlab["node_ip"] or ""
 
-            # 聚合查询心跳统计结果
+            # 聚合查询 - 统计数据采集详情信息(来自采集心跳)
             result = query_result
             join_data["frame"] = result["frame"]  # 采集框架
-            join_data["spider_id"] = result["spider_id"]
+            join_data["batch_no"] = result["batch_no"]
             join_data["business_type"] = result["business_type"]
             join_data["list_count"] = result["list_count"]
             join_data["list_rel_count"] = result["list_rel_count"]
@@ -512,12 +430,12 @@ def main():
             join_data["detail_rel_count"] = result["detail_rel_count"]
             join_data["spidercode_at_site_num"] = result["spidercode_at_site_num"]
 
-        # 列表采集汇总数据
+        # 采集汇总 - 列表数据
         join_data["list_isgetdata"] = get_list_isgetdata(hash_key)
         join_data["list_allintimes"] = get_list_allintimes(hash_key)
         join_data["list_runtimes"] = get_list_runtimes(hash_key)
         join_data["list_nodatatimes"] = get_list_nodatatimes(hash_key)
-        # 详情采集汇总数据
+        # 采集汇总 - 详情数据
         join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)
         join_data["detail_downloadsuccessnum"] = get_detail_downloadsuccessnum(hash_key)
         join_data["detail_downloadfailnum"] = get_detail_downloadfailnum(**join_data)
@@ -527,10 +445,11 @@ def main():
             join_data["is_valid"] = True
 
         summary_queue.append(join_data)
-        logger.info(f"[Monitor]{crawler['site']}-{crawler['channel']}-{spidercode}--完成统计")
+        logger.info(f"{site} {channel} {spidercode} --统计完成")
     # 上传数据库
     save(summary_queue, spider_monitor)
+    logger.info(f"爬虫监控 - 统计完成")
 
 
 if __name__ == '__main__':
-    main()
+    start_monitor()