123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package main
- import (
- "log"
- qu "qfw/util"
- elastic "qfw/util/elastic"
- "sync"
- "time"
- "gopkg.in/mgo.v2/bson"
- )
- var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
- func buyerEsTaskOnce() {
- defer qu.Catch()
- arrEs := []map[string]interface{}{}
- buyerEsLock := &sync.Mutex{}
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- now := time.Now()
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
- curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
- task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
- log.Println("buyer 区间id:", task_sid, task_eid)
- // task_sid = "5e6611b7aec95406dccf7151"
- // task_eid = "5f7249164bdc0447a6c90fa5"
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": qu.StringTOBsonId(task_sid),
- "$lt": qu.StringTOBsonId(task_eid),
- },
- }
- //参数
- buyerent, _ := standard["buyerent"].(map[string]interface{})
- buyer_ent := qu.ObjToString(buyerent["collect1"])
- //buyer_enterr := qu.ObjToString(buyerent["collect2"])
- index, _ := buyerent["index"].(string)
- itype, _ := buyerent["type"].(string)
- //mongo
- sess := mgostandard.GetMgoConn()
- defer mgostandard.DestoryMongoConn(sess)
- log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_ent)
- it_1 := sess.DB(mgostandard.DbName).C(buyer_ent).Find(&q).Select(map[string]interface{}{
- "buyer_name": 1,
- "institute_type": 1,
- "buyerclass": 1,
- "fixedphone": 1,
- "mobilephone": 1,
- "latestfixedphone": 1,
- "latestmobilephone": 1,
- "province": 1,
- "city": 1,
- }).Sort("_id").Iter()
- num_1 := 0
- for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
- if num_1%100 == 0 && num_1 > 0 {
- log.Println("当前表:", buyer_ent, "数量:", num_1)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- _id := qu.BsonIdToSId(tmp["_id"])
- if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
- for _, v := range qu.ObjArrToStringArr(buyerclass) {
- if len(buyerclass) >= 2 && v != "其它" {
- savetmp["buyerclass"] = v
- break
- } else if len(buyerclass) == 1 {
- savetmp["buyerclass"] = v
- break
- }
- }
- }
- savetmp["_id"] = _id
- savetmp["name"] = tmp["buyer_name"]
- savetmp["buyer_name"] = tmp["buyer_name"]
- for _, f := range fieldArr {
- if val := qu.ObjToString(tmp[f]); val != "" {
- savetmp[f] = val
- }
- }
- buyerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= BulkSize {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
- // it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
- // num_2 := 0
- // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
- // if num_2%100 == 0 && num_2 > 0 {
- // log.Println("当前表:", buyer_enterr, "数量:", num_2)
- // }
- // pool <- true
- // wg.Add(1)
- // go func(tmp map[string]interface{}) {
- // defer func() {
- // <-pool
- // wg.Done()
- // }()
- // savetmp := map[string]interface{}{}
- // tmp_id := qu.BsonIdToSId(tmp["_id"])
- // savetmp["_id"] = tmp_id
- // savetmp["name"] = tmp["name"]
- // savetmp["buyer_name"] = tmp["name"]
- // if tmp["buyerclass"] != nil {
- // savetmp["buyerclass"] = tmp["buyerclass"]
- // }
- // for _, f := range fieldArr {
- // if val := qu.ObjToString(tmp[f]); val != "" {
- // savetmp[f] = val
- // }
- // }
- // buyerEsLock.Lock()
- // arrEs = append(arrEs, savetmp)
- // if len(arrEs) >= BulkSize {
- // tmps := arrEs
- // elastic.BulkSave(index, itype, &tmps, true)
- // arrEs = []map[string]interface{}{}
- // }
- // buyerEsLock.Unlock()
- // }(tmp)
- // tmp = make(map[string]interface{})
- // }
- wg.Wait()
- buyerEsLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- log.Println("buyeres 索引完毕! 总计:", num_1)
- }
|