package main import ( "log" qu "qfw/util" elastic "qfw/util/elastic" "sync" "time" "gopkg.in/mgo.v2/bson" ) var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"} func buyerEsTaskOnce() { defer qu.Catch() arrEs := []map[string]interface{}{} buyerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} now := time.Now() preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime)) task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime)) log.Println("buyer 区间id:", task_sid, task_eid) // task_sid = "5e6611b7aec95406dccf7151" // task_eid = "5f7249164bdc0447a6c90fa5" //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": qu.StringTOBsonId(task_sid), "$lt": qu.StringTOBsonId(task_eid), }, } //参数 buyerent, _ := standard["buyerent"].(map[string]interface{}) buyer_ent := qu.ObjToString(buyerent["collect1"]) //buyer_enterr := qu.ObjToString(buyerent["collect2"]) index, _ := buyerent["index"].(string) itype, _ := buyerent["type"].(string) //mongo sess := mgostandard.GetMgoConn() defer mgostandard.DestoryMongoConn(sess) log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_ent) it_1 := sess.DB(mgostandard.DbName).C(buyer_ent).Find(&q).Select(map[string]interface{}{ "buyer_name": 1, "institute_type": 1, "buyerclass": 1, "fixedphone": 1, "mobilephone": 1, "latestfixedphone": 1, "latestmobilephone": 1, "province": 1, "city": 1, }).Sort("_id").Iter() num_1 := 0 for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ { if num_1%100 == 0 && num_1 > 0 { log.Println("当前表:", buyer_ent, "数量:", num_1) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} _id := qu.BsonIdToSId(tmp["_id"]) if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 { for _, v := range qu.ObjArrToStringArr(buyerclass) { if len(buyerclass) >= 2 && v != "其它" { savetmp["buyerclass"] = v break } else if len(buyerclass) == 1 { savetmp["buyerclass"] = v break } } } savetmp["_id"] = _id savetmp["name"] = tmp["buyer_name"] savetmp["buyer_name"] = tmp["buyer_name"] for _, f := range fieldArr { if val := qu.ObjToString(tmp[f]); val != "" { savetmp[f] = val } } buyerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= BulkSize { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr) // it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter() // num_2 := 0 // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ { // if num_2%100 == 0 && num_2 > 0 { // log.Println("当前表:", buyer_enterr, "数量:", num_2) // } // pool <- true // wg.Add(1) // go func(tmp map[string]interface{}) { // defer func() { // <-pool // wg.Done() // }() // savetmp := map[string]interface{}{} // tmp_id := qu.BsonIdToSId(tmp["_id"]) // savetmp["_id"] = tmp_id // savetmp["name"] = tmp["name"] // savetmp["buyer_name"] = tmp["name"] // if tmp["buyerclass"] != nil { // savetmp["buyerclass"] = tmp["buyerclass"] // } // for _, f := range fieldArr { // if val := qu.ObjToString(tmp[f]); val != "" { // savetmp[f] = val // } // } // buyerEsLock.Lock() // arrEs = append(arrEs, savetmp) // if len(arrEs) >= BulkSize { // tmps := arrEs // elastic.BulkSave(index, itype, &tmps, true) // arrEs = []map[string]interface{}{} // } // buyerEsLock.Unlock() // }(tmp) // tmp = make(map[string]interface{}) // } wg.Wait() buyerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() log.Println("buyeres 索引完毕! 总计:", num_1) }