123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- "log"
- "sync"
- util "utils"
- "utils/elastic"
- "utils/mongodb"
- )
- /*
- func buyerTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- session := extractmgo.GetMgoConn()
- defer extractmgo.DestoryMongoConn(session)
- c, _ := buyer["collect"].(string)
- db, _ := buyer["db"].(string)
- index, _ := buyer["index"].(string)
- itype, _ := buyer["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(map[string]interface{}{
- "buyer_name": 1,
- "province": 1,
- "city": 1,
- "district": 1,
- }).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- //go IS.Add("buyer")
- tmp["name"] = tmp["buyer_name"]
- delete(tmp, "buyer_name")
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println(mapInfo, "create buyer index...over", n)
- }
- */
- //buyer_err
- func buyerTask_err(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //mongo
- sess := standardMgo.GetMgoConn()
- defer standardMgo.DestoryMongoConn(sess)
- c, _ := buyer["collect"].(string)
- index, _ := buyer["index"].(string)
- itype, _ := buyer["type"].(string)
- count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count()
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{
- "name": 1,
- "buyer_name": 1,
- "institute_type": 1,
- "buyerclass": 1,
- "fixedphone": 1,
- "mobilephone": 1,
- "latestfixedphone": 1,
- "latestmobilephone": 1,
- "province": 1,
- "city": 1,
- }).Sort("_id").Iter()
- arrEs := []map[string]interface{}{}
- buyerEsLock := &sync.Mutex{}
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- i := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
- if i%1000 == 0 {
- log.Println("current:", i)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- util.Debug(tmp)
- savetmp := map[string]interface{}{}
- tmp_id := util.BsonIdToSId(tmp["_id"])
- savetmp["_id"] = tmp_id
- savetmp["name"] = tmp["name"]
- savetmp["buyer_name"] = tmp["name"]
- util.Debug(tmp["buyerclass"])
- if tmp["buyerclass"] != nil {
- savetmp["buyerclass"] = tmp["buyerclass"]
- }
- for _, f := range fieldArr {
- if val := util.ObjToString(tmp[f]); val != "" {
- savetmp[f] = val
- }
- }
- util.Debug(savetmp)
- buyerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= MgoBulkSize {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- buyerEsLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- log.Println(mapInfo, "create buyer index...over", i)
- }
- //buyer_enterprise
- func buyerTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //mongo
- sess := standardMgo.GetMgoConn()
- defer standardMgo.DestoryMongoConn(sess)
- c, _ := buyer["collect"].(string)
- index, _ := buyer["index"].(string)
- itype, _ := buyer["type"].(string)
- count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count()
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{
- "buyer_name": 1,
- "institute_type": 1,
- "buyerclass": 1,
- "fixedphone": 1,
- "mobilephone": 1,
- "latestfixedphone": 1,
- "latestmobilephone": 1,
- "province": 1,
- "city": 1,
- }).Sort("_id").Iter()
- arrEs := []map[string]interface{}{}
- buyerEsLock := &sync.Mutex{}
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- i := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
- if i%1000 == 0 {
- log.Println("current:", i)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- _id := util.BsonIdToSId(tmp["_id"])
- //if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
- // for _, v := range util.ObjArrToStringArr(buyerclass) {
- // if len(buyerclass) >= 2 && v != "其它" {
- // savetmp["buyerclass"] = v
- // break
- // } else if len(buyerclass) == 1 {
- // savetmp["buyerclass"] = v
- // break
- // }
- // }
- //}
- if util.ObjToString(tmp["buyerclass"]) != "" {
- savetmp["buyerclass"] = tmp["buyerclass"]
- }
- savetmp["_id"] = _id
- savetmp["name"] = tmp["buyer_name"]
- savetmp["buyer_name"] = tmp["buyer_name"]
- for _, f := range fieldArr {
- if val := util.ObjToString(tmp[f]); val != "" {
- savetmp[f] = val
- }
- }
- buyerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= EsBulkSize {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- buyerEsLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- arrEs = []map[string]interface{}{}
- }
- buyerEsLock.Unlock()
- log.Println(mapInfo, "create buyer index...over", i)
- }
|