소스 검색

更新 保存 bidding 批量处理

wcc 1 년 전
부모
커밋
195419ed0e
3개의 변경된 파일74개의 추가작업 그리고 16개의 파일을 삭제
  1. 9 9
      createEsIndex/common.toml
  2. 2 0
      createEsIndex/init.go
  3. 63 7
      createEsIndex/main.go

+ 9 - 9
createEsIndex/common.toml

@@ -95,15 +95,15 @@ detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与
     "千里马(平台|网站)+", "[“\"]?优质采(平台|电子交易平台|云采购平台|交易平台)?[”\"]?", "《?(中国采购与|中国)?招(投)?标(与采购|采购导航)?网》?",
     "《?元博网(采购与招标网)?》?", "《?(中国)?招标采购导航网》?", "中\\W{0,3}国采\\W{0,3}招\\W{0,3}网\\W*[((]?(bidcenter.com.cn)?[))]?", "已方宝", "中国招标与采购"]
 ## 华为云集群1
-addr2 = "http://127.0.0.1:19905"
-username2 = "jybid"
-password2 = "Top2023_JEB01i@31"
-indexb2 = "bidding"
-## 华为云新集群2,原来阿里云数据迁移华为云新集群
-addr3 = "http://127.0.0.1:19908"
-username3 = "jybid"
-password3 = "Top2023_JEB01i@31"
-indexb3 = "bidding"
+#addr2 = "http://127.0.0.1:19905"
+#username2 = "jybid"
+#password2 = "Top2023_JEB01i@31"
+#indexb2 = "bidding"
+### 华为云新集群2,原来阿里云数据迁移华为云新集群
+#addr3 = "http://127.0.0.1:19908"
+#username3 = "jybid"
+#password3 = "Top2023_JEB01i@31"
+#indexb3 = "bidding"
 
 
 ## 预处理数据 索引配置

+ 2 - 0
createEsIndex/init.go

@@ -433,6 +433,8 @@ func InitBitmap() {
 				log.Info("InitBitmap", zap.Any("cache.FromBuffer", err))
 			}
 		}
+	} else {
+		log.Info("InitBitmap", zap.Any(*dbfile, "文件不存在"))
 	}
 	log.Info("InitBitmap", zap.Any("cache.FromBuffer", "success"))
 	//监听,写入文件保存

+ 63 - 7
createEsIndex/main.go

@@ -103,11 +103,11 @@ func main() {
 		go task_index()  //定时同步更新winner_enterprise、buyer_enterprise ES索引;这个功能很少变动,几乎不需要维护
 	}
 
-	go UpdateBidding() //更新bidding表数据
-	go SaveEsMethod()
-	go SaveAllEsMethod()
-	go SaveProjectEs()
-	go SaveBiddingAllDataEs()
+	go UpdateBidding()        //更新bidding表数据
+	go SaveBiddingEsMethod()  //保存es bidding数据
+	go SaveAllEsMethod()      // 保存爬虫采集临时数据
+	go SaveProjectEs()        //保存项目索引数据
+	go SaveBiddingAllDataEs() //保存stype=bidding_all_data 数据
 
 	go SaveBidErr()
 	//添加预处理函数
@@ -126,8 +126,8 @@ func main() {
 	<-signalChan
 	saveDb()
 
-	ch := make(chan bool, 1)
-	<-ch
+	//ch := make(chan bool, 1)
+	//<-ch
 }
 
 var pool = make(chan bool, 20)
@@ -465,6 +465,62 @@ func SaveEsMethod() {
 	}
 }
 
+// SaveBiddingEsMethod 批量保存bidding数据
+func SaveBiddingEsMethod() {
+	arru := make([]map[string]interface{}, EsBulkSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveEsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == EsBulkSize {
+				saveEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEsSp
+					}()
+					if config.Conf.DB.Es.Addr != "" {
+						Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
+					}
+					// 集群地址2
+					if config.Conf.DB.Es.Addr2 != "" {
+						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
+					}
+					// 集群地址3
+					if config.Conf.DB.Es.Addr3 != "" {
+						Es3.BulkSave(config.Conf.DB.Es.Indexb3, arru)
+					}
+				}(arru)
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEsSp
+					}()
+					if config.Conf.DB.Es.Addr != "" {
+						Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
+					}
+					// 集群地址2
+					if config.Conf.DB.Es.Addr2 != "" {
+						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
+					}
+					// 集群地址3
+					if config.Conf.DB.Es.Addr3 != "" {
+						Es3.BulkSave(config.Conf.DB.Es.Indexb3, arru)
+					}
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
 // SaveAllEsMethod 保存爬虫采集临时数据,保存在华为云上
 func SaveAllEsMethod() {
 	arru := make([]map[string]interface{}, EsBulkSize)