瀏覽代碼

feapder 查询字段由 "spider_id" 修改为 "batch_no"

dongzhaorui 1 年之前
父節點
當前提交
a915d3f397
共有 1 個文件被更改,包括 15 次插入18 次删除
  1. 15 18
      A数据处理/sync_data/summary.py

+ 15 - 18
A数据处理/sync_data/summary.py

@@ -38,27 +38,25 @@ zgzb_list = mongodb["zgzb_list"]
 summary_table_of_list_pages = mongodb["list"]
 
 
-def save(documents, collection):
+def save(documents, collection, ordered=False):
     """保存数据"""
     is_list = isinstance(documents, list)
     documents = documents if is_list else [documents]
 
-    count = 0
     data_lst = []
     for item in documents:
         item.pop("_id", None)
         data_lst.append(item)
-        count += 1
-        if len(data_lst) % 100 == 0:
-            collection.insert_many(data_lst)
-            data_lst.clear()
-            logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
+        if len(data_lst) == 100:
+            ret = collection.insert_many(data_lst, ordered)
+            logger.info(f"MongoDB {collection.name} 保存 {len(ret)} 条数据")
+            data_lst = []
 
-    # 提交剩余数据
-    if len(data_lst) > 0:
-        collection.insert_many(data_lst)
-    logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
-    return count
+    if data_lst:
+        collection.insert_many(data_lst, ordered)
+
+    logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
+    return len(documents)
 
 
 def summary_data(document, runtime, only_count_list_page=False):
@@ -79,7 +77,7 @@ def summary_data(document, runtime, only_count_list_page=False):
             "create_at": Int64(datetime.now().timestamp())
         }
         if len(spider_item) > 1:
-            logger.warning(f"[Summary]{spidercode} -> {site} --映射关系错误")
+            logger.warning(f"[数据汇总]{spidercode} -> {site} --映射关系错误")
             data["warning"] = "spidercode业务对应关系错误"
 
         if only_count_list_page:
@@ -102,7 +100,7 @@ def feapder_crawl_aggregate_of_list_pages(datestr=None):
         {"$match": {"runtime": datestr}},
         {
             "$group": {
-                "_id": "$spider_id",
+                "_id": "$batch_no",
                 "rel_count": {"$sum": "$rel_count"},
                 "count": {"$sum": "$count"},
                 "spider_item": {
@@ -127,7 +125,7 @@ def feapder_crawl_aggregate_of_list_pages(datestr=None):
         save(results, summary_table_of_list_pages)
     finally:
         client.close()
-        logger.info("[Summary]feapder数据汇总结束")
+        logger.info("feapder - 数据汇总完成")
 
 
 def competing_products_crawl_aggregate(collection, datestr=None):
@@ -139,7 +137,6 @@ def competing_products_crawl_aggregate(collection, datestr=None):
     yesterday = today + timedelta(days=-1)
 
     publish_time = yesterday.strftime("%Y-%m-%d")
-    table_name = collection.name
     pipeline = [
         {
             "$addFields": {
@@ -177,13 +174,13 @@ def competing_products_crawl_aggregate(collection, datestr=None):
         save(results, summary_table_of_list_pages)
     finally:
         client.close()
-        logger.info(f"[Summary]{table_name}数据汇总结束")
 
 
 def competing_products_crawl_aggregate_of_list_pages(datestr=None):
     """竞品采集列表页数据汇总"""
     competing_products_crawl_aggregate(ybw_list, datestr)
     competing_products_crawl_aggregate(zbytb_list, datestr)
+    logger.info("竞品采集 - 数据汇总完成")
 
 
 def zgzb_crawl_aggregate_of_list_pages(datestr=None):
@@ -223,7 +220,7 @@ def zgzb_crawl_aggregate_of_list_pages(datestr=None):
         save(results, summary_table_of_list_pages)
     finally:
         client.close()
-        logger.info("[Summary]zgzb_list数据汇总结束")
+        logger.info("中国招标投标公共服务平台 - 数据汇总完成")
 
 
 if __name__ == '__main__':