12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "esindex/config"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.uber.org/zap"
- "sync"
- "time"
- )
- var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
- func buyerEsTaskOnce() {
- defer util.Catch()
- arrEs := []map[string]interface{}{}
- buyerEsLock := &sync.Mutex{}
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- now := time.Now()
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
- curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
- task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
- log.Info("buyer 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId(task_sid),
- "$lt": mongodb.StringTOBsonId(task_eid),
- },
- }
- //mongo
- sess := MgoQ.GetMgoConn()
- defer MgoQ.DestoryMongoConn(sess)
- it_1 := sess.DB(MgoQ.DbName).C("buyer_enterprise").Find(&q).Select(map[string]interface{}{
- "buyer_name": 1,
- "institute_type": 1,
- "buyerclass": 1,
- "fixedphone": 1,
- "mobilephone": 1,
- "latestfixedphone": 1,
- "latestmobilephone": 1,
- "province": 1,
- "city": 1,
- }).Sort("_id").Iter()
- num_1 := 0
- for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
- if num_1%2000 == 0 && num_1 > 0 {
- log.Info("current", zap.Int("数量", num_1))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- _id := mongodb.BsonIdToSId(tmp["_id"])
- if util.ObjToString(tmp["buyerclass"]) != "" {
- savetmp["buyerclass"] = tmp["buyerclass"]
- }
- savetmp["_id"] = _id
- savetmp["name"] = tmp["buyer_name"]
- savetmp["buyer_name"] = tmp["buyer_name"]
- for _, f := range fieldArr {
- if val := util.ObjToString(tmp[f]); val != "" {
- savetmp[f] = val
- }
- }
- buyerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= EsBulkSize {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- buyerEsLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- log.Info("buyer over!", zap.Int("总计", num_1))
- }
|