package main import ( "go.mongodb.org/mongo-driver/bson/primitive" "log" "sync" "time" util "utils" "utils/mongodb" ) func winnerEsTaskOnce() { defer util.Catch() arrEs := []map[string]interface{}{} winerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} now := time.Now() preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime)) task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime)) //task_sid = "5e6598f82c27dc56292158da" //task_eid = "620576342566c40049f26155" log.Println("winner 区间id:", task_sid, task_eid) //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ //"$gte": mongodb.StringTOBsonId(task_sid), "$lt": mongodb.StringTOBsonId(task_eid), }, } //参数 winnerent, _ := standard["winnerent"].(map[string]interface{}) win_ent := util.ObjToString(winnerent["collect1"]) //win_enterr := qu.ObjToString(winnerent["collect2"]) index, _ := winnerent["index"].(string) itype, _ := winnerent["type"].(string) //mongo sess := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(sess) log.Println("q:", q, "db:", standardMgo.DbName, "coll:", win_ent) it_1 := sess.DB(standardMgo.DbName).C(win_ent).Find(&q).Sort("_id").Iter() num_1 := 0 for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ { if num_1%2000 == 0 && num_1 > 0 { log.Println("当前表:", win_ent, "数量:", num_1) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} tmp_id := mongodb.BsonIdToSId(tmp["_id"]) savetmp["_id"] = tmp_id savetmp["name"] = tmp["company_name"] savetmp["winner_name"] = tmp["company_name"] savetmp["pici"] = tmp["updatetime"] if province := util.ObjToString(tmp["province"]); province != "" { savetmp["province"] = province } if city := util.ObjToString(tmp["city"]); city != "" { savetmp["city"] = city } if text := util.ObjToString(tmp["tag_business"]); text != "" { savetmp["tag_business"] = text } winerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs //Es1.BulkSave(index, itype, &tmps, true) Es2.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } winerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr) // it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter() // num_2 := 0 // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ { // if num_2%100 == 0 && num_2 > 0 { // log.Println("当前表:", win_enterr, "数量:", num_2) // } // pool <- true // wg.Add(1) // go func(tmp map[string]interface{}) { // defer func() { // <-pool // wg.Done() // }() // savetmp := map[string]interface{}{} // tmp_id := mongodb.BsonIdToSId(tmp["_id"]) // savetmp["_id"] = tmp_id // savetmp["name"] = tmp["name"] // savetmp["winner_name"] = tmp["name"] // savetmp["pici"] = tmp["updatetime"] // winerEsLock.Lock() // arrEs = append(arrEs, savetmp) // if len(arrEs) >= BulkSize { // tmps := arrEs // elastic.BulkSave(index, itype, &tmps, true) // arrEs = []map[string]interface{}{} // } // winerEsLock.Unlock() // }(tmp) // tmp = make(map[string]interface{}) // } wg.Wait() winerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs //Es1.BulkSave(index, itype, &tmps, true) Es2.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } winerEsLock.Unlock() log.Println("winnerextract 索引完毕! 总计:", num_1) }