Преглед изворни кода

update:将luaconfig中的event字段添加到基础信息;暂时取消"list_allintimes"信息统计

dongzhaorui пре 2 година
родитељ
комит
7000cfa0e4
1 измењених фајлова са 17 додато и 11 уклоњено
  1. 17 11
      A数据处理/sync_data/monitor_summary.py

+ 17 - 11
A数据处理/sync_data/monitor_summary.py

@@ -108,7 +108,10 @@ def get_crawler_basic_information():
     """爬虫基础信息"""
     crawler_lst = []
     q = {"platform": "python", "state": 11}
-    projection = {"_id": 0, "site": 1, "channel": 1, "modifyuser": 1, "modifyuserid": 1, "code":1}
+    projection = {
+        "_id": 0, "site": 1, "channel": 1,
+        "modifyuser": 1, "modifyuserid": 1, "code": 1, "event": 1
+    }
     cursor = spider_lua_config.find(q, projection=projection)
     try:
         for doc in cursor:
@@ -118,6 +121,7 @@ def get_crawler_basic_information():
                 "spidercode": doc["code"],
                 "modifyid": doc["modifyuserid"],
                 "modifyuser": doc["modifyuser"],
+                "event": doc["event"]
             })
     finally:
         client.close()
@@ -128,15 +132,14 @@ def get_crawler_basic_information():
 def get_node_and_taskid(runtime, spidercode):
     """获取最新爬虫工作节点和任务id"""
     q = {"runtime": runtime, "spidercode": spidercode}
-    projection = {"node_ip": 1, "crawlab_taskid":1, "_id": 0}
+    projection = {"node_ip": 1, "crawlab_taskid": 1, "_id": 0}
     sort = [("_id", -1)]
     result = spider_heartbeat.find_one(q, projection=projection, sort=sort)
     return result
 
 
-def aggregate_query(runtime):
+def aggregate_query(runtime, is_print_error=False):
     """feapder采集聚合查询"""
-    websites = []
     aggregate_items = {}
 
     pipeline = [
@@ -162,7 +165,7 @@ def aggregate_query(runtime):
     try:
         for doc in cursor:
             spider_item = doc["spider_item"]
-
+            tmp_websites = []
             for items in spider_item:
                 site = items["site"]
                 channel = items["channel"]
@@ -170,7 +173,7 @@ def aggregate_query(runtime):
                 business_type = items["business_type"]
 
                 if len(spider_item) > 1:
-                    logger.warning(f"{spidercode} -> {site}--对应的关系数量异常")
+                    logger.warning(f"{spidercode} -> {site}--对应的关系数量异常, {len(spider_item) }")
 
                 is_list = str(business_type).endswith("List")
                 hash_key = get_hask_key(**items)  # 防止多站点对应1个spidercode,数据相互重叠
@@ -219,14 +222,16 @@ def aggregate_query(runtime):
 
                 # 监控爬虫任务,当 spidercode_at_site_num > 1
                 # 表明创建的爬虫任务存在问题,需要反馈给数据寻源组
-                if site not in websites:
+                if site not in tmp_websites:
+                    # TODO 排查下统计是否逻辑有问题
                     aggregate_items[hash_key]["spidercode_at_site_num"] = 1
-                    websites.append(site)
+                    tmp_websites.append(site)
                 else:
                     aggregate_items[hash_key]["spidercode_at_site_num"] += 1
 
-    # except Exception as e:
-    #     logger.exception(e)
+    except Exception as e:
+        if is_print_error:
+            logger.exception(e)
 
     finally:
         client.close()
@@ -323,7 +328,8 @@ def main():
             join_data["detail_rel_count"] = result["detail_rel_count"]
             # 计算列表页数据
             join_data["list_isgetdata"] = get_list_isgetdata(hash_key)  # 列表页是否采集数据
-            join_data["list_allintimes"] = get_list_allintimes(hash_key)  # 日采集列表的总入库量
+            # join_data["list_allintimes"] = get_list_allintimes(hash_key)  # 日采集列表的总入库量
+            join_data["list_allintimes"] = -1  # 暂不做统计,原因:无法获取列表页抽取的条目总数
             join_data["list_runtimes"] = get_list_runtimes(hash_key)  # 列表页采集运行频次
             # 计算详情页数据
             join_data["detail_downloadnum"] = get_detail_downloadnum(hash_key)  # 详情页下载量