Bladeren bron

feat: 新增存量刷库方法

zhangxinlei1996 8 maanden geleden
bovenliggende
commit
c6a9e33cad
1 gewijzigde bestanden met toevoegingen van 39 en 0 verwijderingen
  1. 39 0
      data_project_information/task.go

+ 39 - 0
data_project_information/task.go

@@ -68,6 +68,45 @@ func task() {
 	}
 }
 
+func taskOld() {
+	query := es.NewBoolQuery().
+		Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")).
+		MinimumNumberShouldMatch(1)
+
+	query.Must(es.NewRangeQuery("pici").Lte(1732244400))
+
+	util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding_ai", query)))
+
+	client := Es.GetEsConn()
+	defer Es.DestoryEsConn(client)
+
+	wg := &sync.WaitGroup{}
+
+	fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
+
+	countDocs := 0
+
+	res, err := client.Scroll().Index("bidding_ai").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
+	if err == nil {
+		taskInfoA(res, wg, &countDocs)
+		scrollId := res.ScrollId
+		for {
+			searchResult, err := client.Scroll("1m").Index("bidding_ai").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
+			if err != nil {
+				util.Debug("Es Search Data Error:", err.Error())
+				break
+			}
+			taskInfoA(searchResult, wg, &countDocs)
+			scrollId = searchResult.ScrollId
+		}
+		wg.Wait()
+		util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
+		_, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
+	} else {
+		util.Debug(err)
+	}
+}
+
 func taskAdd() {
 
 	client := Es.GetEsConn()