Browse Source

更新bidding,记录 每个段的索引数量

wcc 1 year ago
parent
commit
0cd9c1c2a6

+ 7 - 5
createEsIndex/bidding_es.go

@@ -35,11 +35,6 @@ func biddingTask(mapInfo map[string]interface{}) {
 	defer util.Catch()
 
 	stype := util.ObjToString(mapInfo["stype"])
-	if stype == "bidding" {
-		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
-			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
-		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix()}}, false, true)
-	}
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {
 		q = map[string]interface{}{
@@ -137,6 +132,13 @@ func biddingTask(mapInfo map[string]interface{}) {
 	wg.Wait()
 	log.Info("biddingTask over", zap.Int("count", c1), zap.Int("index", index))
 
+	//更新状态
+	if stype == "bidding" {
+		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
+			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
+		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix(), "index_num": index}}, false, true)
+	}
+
 	////发送udp,附件补采 才需要
 	//data := map[string]interface{}{
 	//	"stype": "update",

+ 1 - 0
createEsIndex/common.toml

@@ -13,6 +13,7 @@
     size = 15
     user = "SJZY_RWbid_ES"
     password = "SJZY@B4i4D5e6S"
+    direct = true
 
 [db.mongoP] ## projectset 项目信息
     addr = "192.168.3.206:27002"

+ 1 - 0
createEsIndex/config/conf.go

@@ -92,6 +92,7 @@ type mgo struct {
 	Size     int
 	User     string
 	Password string
+	Direct   bool
 }
 
 //env 全局的相关配置

+ 1 - 1
createEsIndex/init.go

@@ -54,7 +54,7 @@ func InitMgo() {
 		Size:        config.Conf.DB.MongoB.Size,
 		UserName:    config.Conf.DB.MongoB.User,
 		Password:    config.Conf.DB.MongoB.Password,
-		//Direct:      true,
+		Direct:      config.Conf.DB.MongoB.Direct,
 	}
 	MgoB.InitPool()
 	if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {

+ 10 - 2
createEsIndex/main.go

@@ -33,7 +33,7 @@ var (
 	UdpTaskMap = &sync.Map{}
 	JyUdpAddr  *net.UDPAddr
 
-	EsBulkSize        = 100                                       // es批量保存大小
+	EsBulkSize        = 50                                        // es批量保存大小
 	updateBiddingPool = make(chan []map[string]interface{}, 5000) //更新bingding数据
 	updateBiddingSp   = make(chan bool, 5)
 	saveEsPool        = make(chan map[string]interface{}, 5000) //保存binding数据到es
@@ -117,7 +117,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
 			tasktype, _ := mapInfo["stype"].(string)
 			switch tasktype {
-			case "index-by-id": //单个索引
+			case "index-by-id": //单个索引,更新pici
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingTaskById(mapInfo)
+				}()
+			case "index_by_id": //单个索引,不更新pici
 				pool <- true
 				go func() {
 					defer func() {