package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "esindex/config" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "sync" "time" ) var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"} func buyerEsTaskOnce() { defer util.Catch() arrEs := []map[string]interface{}{} buyerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} now := time.Now() preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime)) task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime)) log.Info("buyer 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid)) //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ //"$gte": mongodb.StringTOBsonId(task_sid), "$lt": mongodb.StringTOBsonId(task_eid), }, } //mongo sess := MgoQ.GetMgoConn() defer MgoQ.DestoryMongoConn(sess) it_1 := sess.DB(MgoQ.DbName).C("buyer_enterprise").Find(&q).Select(map[string]interface{}{ "buyer_name": 1, "institute_type": 1, "buyerclass": 1, "fixedphone": 1, "mobilephone": 1, "latestfixedphone": 1, "latestmobilephone": 1, "province": 1, "city": 1, }).Sort("_id").Iter() num_1 := 0 for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ { if num_1%2000 == 0 && num_1 > 0 { log.Info("current", zap.Int("数量", num_1)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} _id := mongodb.BsonIdToSId(tmp["_id"]) if util.ObjToString(tmp["buyerclass"]) != "" { savetmp["buyerclass"] = tmp["buyerclass"] } savetmp["_id"] = _id savetmp["name"] = tmp["buyer_name"] savetmp["buyer_name"] = tmp["buyer_name"] for _, f := range fieldArr { if val := util.ObjToString(tmp[f]); val != "" { savetmp[f] = val } } buyerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexBuyer, config.Conf.DB.Es.TypeBuyer, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() buyerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexBuyer, config.Conf.DB.Es.TypeBuyer, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() log.Info("buyer over!", zap.Int("总计", num_1)) }