3
0
Эх сурвалжийг харах

优化聚合统计查询sql

dongzhaorui 1 жил өмнө
parent
commit
1b50827551

+ 75 - 69
A数据处理/sync_data/summary.py

@@ -15,27 +15,27 @@ from pymongo import MongoClient
 from log import logger
 
 # mongo
-MONGO_HOST = "172.17.4.87"
-MONGO_PORT = 27080
+MONGO_HOST = "192.168.3.182"
+MONGO_PORT = 27017
 MONGO_DB = "py_spider"
 client = MongoClient(MONGO_HOST, MONGO_PORT)
 mongodb = client[MONGO_DB]
 
-# 爬虫数据表
+# 数据生产
 data_bak = mongodb["data_bak"]
 
 # 心跳表
 spider_heartbeat = mongodb["pyspider_heartbeat"]
 
-# 竞品列表
+# 采集任务列表
 ybw_list = mongodb["ybw_list"]
 zbytb_list = mongodb["zbytb_list"]
 
-# 主题爬虫
+# 主题爬虫采集任务表
 zgzb_list = mongodb["zgzb_list"]
 
 # 列表页汇总表
-summary_table_of_list_pages = mongodb["list"]
+summary_table = mongodb["list"]
 
 
 def save(documents, collection, ordered=False):
@@ -59,34 +59,30 @@ def save(documents, collection, ordered=False):
     return len(documents)
 
 
-def summary_data(document, runtime, only_count_list_page=False):
-    """对聚合的数据进行汇总和分类"""
-    summary_lst = []
-    spider_item = document["spider_item"]
-    for item in spider_item:
-        spidercode = item["spidercode"]
-        site = item["site"]
-        data = {
-            "business_type": item["business_type"],
-            "site": site,
-            "channel": item["channel"],
-            "spidercode": spidercode,
-            "count": document["count"],
-            "rel_count": document["rel_count"],
-            "runtime": runtime,
-            "create_at": Int64(datetime.now().timestamp())
-        }
-        if len(spider_item) > 1:
-            logger.warning(f"[数据汇总]{spidercode} -> {site} --映射关系错误")
-            data["warning"] = "spidercode业务对应关系错误"
-
-        if only_count_list_page:
-            if str(item["business_type"]).endswith("List"):
-                summary_lst.append(data)
-            continue
-        summary_lst.append(data)
-
-    return summary_lst
+def pick_data(items, runtime, only_count_list_page=False):
+    """聚合的数据进行分类"""
+    results = []
+
+    spidercode = items["spidercode"]
+    site = items["site"]
+    data = {
+        "business_type": items["business_type"],
+        "site": site,
+        "channel": items["channel"],
+        "spidercode": spidercode,
+        "count": items["count"],
+        "rel_count": items["rel_count"],
+        "runtime": runtime,
+        "create_at": Int64(datetime.now().timestamp())
+    }
+
+    if only_count_list_page:
+        if str(items["business_type"]).endswith("List"):
+            results.append(data)
+    else:
+        results.append(data)
+
+    return results
 
 
 def feapder_crawl_aggregate_of_list_pages(datestr=None):
@@ -96,6 +92,7 @@ def feapder_crawl_aggregate_of_list_pages(datestr=None):
         yesterday = today + timedelta(days=-1)
         datestr = yesterday.strftime("%Y-%m-%d")
 
+    count = 0
     pipeline = [
         {"$match": {"runtime": datestr}},
         {
@@ -103,29 +100,26 @@ def feapder_crawl_aggregate_of_list_pages(datestr=None):
                 "_id": "$batch_no",
                 "rel_count": {"$sum": "$rel_count"},
                 "count": {"$sum": "$count"},
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "business_type": "$business_type"
-                    }
-                }
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
+                "business_type": {"$first": "$business_type"},
             }
         },
         {"$sort": SON([("rel_count", -1)])}
     ]
-    #  $group阶段的内存限制为100M,默认情况下,如果stage超过此限制,
-    #  $group将产生错误,但是,要允许处理大型数据集,请将allowDiskUse选项设置为true以启用$group操作以写入临时文件。
+    #  $group 阶段的内存限制为100M,默认情况下,如果stage超过此限制,$group 将产生错误,
+    #  若要允许处理大型数据集,请将 allowDiskUse 选项设置为 true 以启用 $group
+    #  操作以写入临时文件。
     cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
     try:
         results = []
         for doc in cursor:
-            results.extend(summary_data(doc, datestr, True))
-        save(results, summary_table_of_list_pages)
+            results.extend(pick_data(doc, datestr, True))
+        count = save(results, summary_table)
     finally:
         client.close()
-        logger.info("feapder - 数据汇总完成")
+        logger.info(f"feapder - 数据汇总 {count} 条")
 
 
 def competing_products_crawl_aggregate(collection, datestr=None):
@@ -136,6 +130,7 @@ def competing_products_crawl_aggregate(collection, datestr=None):
         today = datetime.now().date()
     yesterday = today + timedelta(days=-1)
 
+    count = 0
     publish_time = yesterday.strftime("%Y-%m-%d")
     pipeline = [
         {
@@ -155,14 +150,18 @@ def competing_products_crawl_aggregate(collection, datestr=None):
                 "_id": "$channel",
                 "count": {"$sum": 1},  # 当天采集总数
                 "rel_count": {"$sum": "$rel_count"},  # es检索结果为0的总数
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "business_type": "List"
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
+                "business_type": {
+                    "$first": {
+                        "$cond": {
+                            "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
+                            "then": "List",
+                            "else": "$business_type"
+                        }
                     }
-                }
+                },
             }
         },
     ]
@@ -170,17 +169,19 @@ def competing_products_crawl_aggregate(collection, datestr=None):
     try:
         results = []
         for doc in cursor:
-            results.extend(summary_data(doc, publish_time))
-        save(results, summary_table_of_list_pages)
+            results.extend(pick_data(doc, publish_time))
+        count = save(results, summary_table)
     finally:
         client.close()
+        return count
 
 
 def competing_products_crawl_aggregate_of_list_pages(datestr=None):
     """竞品采集列表页数据汇总"""
-    competing_products_crawl_aggregate(ybw_list, datestr)
-    competing_products_crawl_aggregate(zbytb_list, datestr)
-    logger.info("竞品采集 - 数据汇总完成")
+    count = 0
+    count += competing_products_crawl_aggregate(ybw_list, datestr)
+    count += competing_products_crawl_aggregate(zbytb_list, datestr)
+    logger.info(f"竞品采集 - 数据汇总 {count} 条")
 
 
 def zgzb_crawl_aggregate_of_list_pages(datestr=None):
@@ -194,6 +195,7 @@ def zgzb_crawl_aggregate_of_list_pages(datestr=None):
     start_time = int(datetime.combine(yesterday, time()).timestamp())
     end_time = int(datetime.combine(today, time()).timestamp())
 
+    count = 0
     pipeline = [
         {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
         {
@@ -201,14 +203,18 @@ def zgzb_crawl_aggregate_of_list_pages(datestr=None):
                 "_id": "$spidercode",
                 "count": {"$sum": 1},  # 当天采集总数
                 "rel_count": {"$sum": 1},  # 当天采集总数
-                "spider_item": {
-                    "$addToSet": {
-                        "site": "$site",
-                        "channel": "$channel",
-                        "spidercode": "$spidercode",
-                        "business_type": "List"
+                "site": {"$first": "$site"},
+                "channel": {"$first": "$channel"},
+                "spidercode": {"$first": "$spidercode"},
+                "business_type": {
+                    "$first": {
+                        "$cond": {
+                            "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
+                            "then": "List",
+                            "else": "$business_type"
+                        }
                     }
-                }
+                },
             }
         },
     ]
@@ -216,11 +222,11 @@ def zgzb_crawl_aggregate_of_list_pages(datestr=None):
     try:
         results = []
         for doc in cursor:
-            results.extend(summary_data(doc, runtime))
-        save(results, summary_table_of_list_pages)
+            results.extend(pick_data(doc, runtime))
+        count = save(results, summary_table)
     finally:
         client.close()
-        logger.info("中国招标投标公共服务平台 - 数据汇总完成")
+        logger.info(f"中国招标投标公共服务平台 - 数据汇总 {count} 条")
 
 
 def start_summary():