Browse Source

更新 保存es

wcc 1 năm trước cách đây
mục cha
commit
0a82ab3dbd
5 tập tin đã thay đổi với 107 bổ sung100 xóa
  1. 3 4
      createEsIndex/attachment.go
  2. 19 15
      createEsIndex/bidding_es.go
  3. 1 1
      createEsIndex/go.mod
  4. 2 0
      createEsIndex/go.sum
  5. 82 80
      createEsIndex/main.go

+ 3 - 4
createEsIndex/attachment.go

@@ -12,7 +12,7 @@ import (
 	"sync"
 )
 
-//attachmentBiddingTask 附件补采入库es
+// attachmentBiddingTask 附件补采入库es
 func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersData) {
 	defer util.Catch()
 	var MgoOther *mongodb.MongodbSim
@@ -109,10 +109,9 @@ func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersDa
 			}
 
 			if len(update) > 0 {
-				updateBiddingPool <- []map[string]interface{}{{
+				updateBiddingPool <- map[string]interface{}{
 					"_id": tmp["_id"],
-				},
-					{"$set": update},
+					"set": update,
 				}
 			}
 			// 这里直接放入存 es库 bidding的通道

+ 19 - 15
createEsIndex/bidding_es.go

@@ -122,19 +122,24 @@ func biddingTask(mapInfo map[string]interface{}) {
 					newTmp["object_type"] = objectType
 				}
 			}
-
+			//
+			if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" {
+				newTmp["pici"] = time.Now().Unix()
+				update["pici"] = time.Now().Unix()
+			}
+			//
+			saveEsPool <- newTmp
 			if len(update) > 0 {
-				updateBiddingPool <- []map[string]interface{}{{
+				updateBiddingPool <- map[string]interface{}{
 					"_id": tmp["_id"],
-				},
-					{"$set": update},
+					"set": update,
+				}
+
+				if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
+					// 剑鱼信息发布数据 通过udp通知信息发布程序
+					go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
 				}
 			}
-			if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
-				// 剑鱼信息发布数据 通过udp通知信息发布程序
-				go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
-			}
-			saveEsPool <- newTmp
 		}(tmp)
 		tmp = map[string]interface{}{}
 	}
@@ -176,6 +181,7 @@ func biddingTask(mapInfo map[string]interface{}) {
 	//	Port: 1910,
 	//}
 	//log.Info("bidding index es over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
+	//}
 }
 
 // biddingAllTask 补充存量数据
@@ -255,10 +261,9 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 			}
 			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
 			if len(update) > 0 {
-				updateBiddingPool <- []map[string]interface{}{{
+				updateBiddingPool <- map[string]interface{}{
 					"_id": tmp["_id"],
-				},
-					{"$set": update},
+					"set": update,
 				}
 			}
 			saveEsPool <- newTmp
@@ -361,10 +366,9 @@ func dealData(coll, gtid, lteid, kword string, routines int) {
 			}
 			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
 			if len(update) > 0 {
-				updateBiddingPool <- []map[string]interface{}{{
+				updateBiddingPool <- map[string]interface{}{
 					"_id": tmp["_id"],
-				},
-					{"$set": update},
+					"set": update,
 				}
 			}
 			saveEsPool <- newTmp

+ 1 - 1
createEsIndex/go.mod

@@ -11,7 +11,7 @@ require (
 	go.mongodb.org/mongo-driver v1.10.2
 	go.uber.org/zap v1.23.0
 	jygit.jydev.jianyu360.cn/BP/jynats v0.0.0-20231206094405-2ff9da3175bc
-	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231223031213-3f719e173cb5
 )
 
 require (

+ 2 - 0
createEsIndex/go.sum

@@ -624,6 +624,8 @@ jygit.jydev.jianyu360.cn/BP/jynats v0.0.0-20231206094405-2ff9da3175bc h1:/vhHYTg
 jygit.jydev.jianyu360.cn/BP/jynats v0.0.0-20231206094405-2ff9da3175bc/go.mod h1:bbyXOaS5NyeNIit6mb5fV+Exxqg8g+Su0JnU8u8i2Xs=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e h1:oXoOPTtZIfnvce0ulokHBSjXqhLiN6DPVOYTm3V6z8U=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231223031213-3f719e173cb5 h1:IJlZ+JTn7UvVeHyALb+yWacmtE94TW2XvBIRgTyRmzU=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231223031213-3f719e173cb5/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
 rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

+ 82 - 80
createEsIndex/main.go

@@ -35,8 +35,8 @@ var (
 	JyUdpAddr  *net.UDPAddr
 	NeUdpAddr  *net.UDPAddr
 
-	EsBulkSize        = 50                                        // es批量保存大小
-	updateBiddingPool = make(chan []map[string]interface{}, 5000) //更新bingding数据
+	EsBulkSize        = 50                                      // es批量保存大小
+	updateBiddingPool = make(chan map[string]interface{}, 5000) //更新bingding数据
 	updateBiddingSp   = make(chan bool, 5)
 	saveEsPool        = make(chan map[string]interface{}, 5000) //保存binding数据到es
 	saveEsSp          = make(chan bool, 5)
@@ -306,43 +306,44 @@ type UdpNode struct {
 	retry     int
 }
 
-//UpdateBidding 更新bidding表数据
+// UpdateBidding 更新bidding表数据
 func UpdateBidding() {
-	arru := make([][]map[string]interface{}, 200)
-	indexu := 0
+	//arru := make([][]map[string]interface{}, 10)
+	//indexu := 0
 	for {
 		select {
 		case v := <-updateBiddingPool:
-			arru[indexu] = v
-			indexu++
-			if indexu == 200 {
-				updateBiddingSp <- true
-				go func(arru [][]map[string]interface{}) {
-					defer func() {
-						<-updateBiddingSp
-					}()
-					MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
-				}(arru)
-				arru = make([][]map[string]interface{}, 200)
-				indexu = 0
-			}
-		case <-time.After(1000 * time.Millisecond):
-			if indexu > 0 {
-				updateBiddingSp <- true
-				go func(arru [][]map[string]interface{}) {
-					defer func() {
-						<-updateBiddingSp
-					}()
-					MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
-				}(arru[:indexu])
-				arru = make([][]map[string]interface{}, 200)
-				indexu = 0
-			}
+			MgoB.UpdateById(config.Conf.DB.MongoB.Coll, v["_id"], map[string]interface{}{"$set": v["set"]})
+			//arru[indexu] = v
+			//indexu++
+			//if indexu == 10 {
+			//	updateBiddingSp <- true
+			//go func(arru [][]map[string]interface{}) {
+			//	defer func() {
+			//	<-updateBiddingSp
+			//}()
+			//MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
+			//}(arru)
+			//arru = make([][]map[string]interface{}, 10)
+			//indexu = 0
+			//}
+			//case <-time.After(1000 * time.Millisecond):
+			//	if indexu > 0 {
+			//		updateBiddingSp <- true
+			//		go func(arru [][]map[string]interface{}) {
+			//			defer func() {
+			//				<-updateBiddingSp
+			//			}()
+			//			MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
+			//		}(arru[:indexu])
+			//		arru = make([][]map[string]interface{}, 200)
+			//		indexu = 0
+			//	}
 		}
 	}
 }
 
-//SaveBidErr 记录错误信息,暂时记录 附件过长的
+// SaveBidErr 记录错误信息,暂时记录 附件过长的
 func SaveBidErr() {
 	arru := make([]map[string]interface{}, 200)
 	indexu := 0
@@ -378,62 +379,63 @@ func SaveBidErr() {
 	}
 }
 
-//SaveEsMethod 保存到es
+// SaveEsMethod 保存到es
 func SaveEsMethod() {
-	arru := make([]map[string]interface{}, EsBulkSize)
-	indexu := 0
+	//arru := make([]map[string]interface{}, EsBulkSize)
+	//indexu := 0
 	for {
 		select {
 		case v := <-saveEsPool:
-			arru[indexu] = v
-			indexu++
-			if indexu == EsBulkSize {
-				saveEsSp <- true
-				go func(arru []map[string]interface{}) {
-					defer func() {
-						<-saveEsSp
-					}()
-					Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
-					//if config.Conf.DB.Es.IndexTmp != "" {
-					//	if config.Conf.DB.Es.Addr2 != "" {
-					//		Es2.BulkSave(config.Conf.DB.Es.IndexTmp, arru) // 新集群
-					//	}
-					//	Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru) //老集群
-					//}
-					if config.Conf.DB.Es.Addr2 != "" {
-						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
-					}
-
-				}(arru)
-				arru = make([]map[string]interface{}, EsBulkSize)
-				indexu = 0
-			}
-		case <-time.After(1000 * time.Millisecond):
-			if indexu > 0 {
-				saveEsSp <- true
-				go func(arru []map[string]interface{}) {
-					defer func() {
-						<-saveEsSp
-					}()
-					Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
-					//if config.Conf.DB.Es.IndexTmp != "" {
-					//	if config.Conf.DB.Es.Addr2 != "" {
-					//		Es2.BulkSave(config.Conf.DB.Es.IndexTmp, arru) // 新集群
-					//	}
-					//	Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
-					//}
-					if config.Conf.DB.Es.Addr2 != "" {
-						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
-					}
-				}(arru[:indexu])
-				arru = make([]map[string]interface{}, EsBulkSize)
-				indexu = 0
+			Es.Save(config.Conf.DB.Es.IndexB, v)
+			if config.Conf.DB.Es.Addr2 != "" {
+				Es2.Save(config.Conf.DB.Es.Indexb2, v)
 			}
+
+			//arru[indexu] = v
+			//indexu++
+			//if indexu == EsBulkSize {
+			//	saveEsSp <- true
+			//go func(arru []map[string]interface{}) {
+			//	defer func() {
+			//<-saveEsSp
+			//}()
+			//if config.Conf.DB.Es.IndexTmp != "" {
+			//	if config.Conf.DB.Es.Addr2 != "" {
+			//		Es2.BulkSave(config.Conf.DB.Es.IndexTmp, arru) // 新集群
+			//	}
+			//	Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru) //老集群
+			//}
+
+			//}(arru)
+			//arru = make([]map[string]interface{}, EsBulkSize)
+			//indexu = 0
+			//}
+			//case <-time.After(1000 * time.Millisecond):
+			//	if indexu > 0 {
+			//		saveEsSp <- true
+			//		go func(arru []map[string]interface{}) {
+			//			defer func() {
+			//				<-saveEsSp
+			//			}()
+			//			Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
+			//if config.Conf.DB.Es.IndexTmp != "" {
+			//	if config.Conf.DB.Es.Addr2 != "" {
+			//		Es2.BulkSave(config.Conf.DB.Es.IndexTmp, arru) // 新集群
+			//	}
+			//	Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
+			//}
+			//		if config.Conf.DB.Es.Addr2 != "" {
+			//			Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
+			//		}
+			//	}(arru[:indexu])
+			//	arru = make([]map[string]interface{}, EsBulkSize)
+			//	indexu = 0
+			//}
 		}
 	}
 }
 
-//SaveAllEsMethod 保存爬虫采集临时数据,保存在华为云上
+// SaveAllEsMethod 保存爬虫采集临时数据,保存在华为云上
 func SaveAllEsMethod() {
 	arru := make([]map[string]interface{}, EsBulkSize)
 	indexu := 0
@@ -570,7 +572,7 @@ func task() {
 	util.Debug("over ---", count)
 }
 
-//LastUdpJob 处理UDP 没有接受数据
+// LastUdpJob 处理UDP 没有接受数据
 func LastUdpJob() {
 	for {
 		responselock.Lock()
@@ -588,7 +590,7 @@ func LastUdpJob() {
 	}
 }
 
-//sendErrMailApi 发送邮件
+// sendErrMailApi 发送邮件
 func sendErrMailApi(title, body string) {
 	var tomail, api string
 	if config.Conf.Mail.Send {