package main import ( "go.mongodb.org/mongo-driver/bson" "log" util "utils" "utils/elastic" "utils/mongodb" ) func winnerTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } session := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(session) c, _ := winner["collect"].(string) index, _ := winner["index"].(string) itype, _ := winner["type"].(string) count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(standardMgo.DbName).C(c).Find(q).Select(bson.M{"pici": 0}).Iter() arr := make([]map[string]interface{}, EsBulkSize) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { go IS.Add("winner") arr[i] = tmp n++ if i == EsBulkSize-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, EsBulkSize) } if n%EsBulkSize == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println(mapInfo, "create winner index...over", n) }