package main import ( "go.mongodb.org/mongo-driver/bson" "log" "sync" util "utils" "utils/elastic" "utils/mongodb" ) /* func buyerTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } session := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(session) c, _ := buyer["collect"].(string) db, _ := buyer["db"].(string) index, _ := buyer["index"].(string) itype, _ := buyer["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(map[string]interface{}{ "buyer_name": 1, "province": 1, "city": 1, "district": 1, }).Iter() arr := make([]map[string]interface{}, savesizei) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { //go IS.Add("buyer") tmp["name"] = tmp["buyer_name"] delete(tmp, "buyer_name") arr[i] = tmp n++ if i == savesizei-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, savesizei) } if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println(mapInfo, "create buyer index...over", n) } */ //buyer_err func buyerTask_err(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //mongo sess := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(sess) c, _ := buyer["collect"].(string) index, _ := buyer["index"].(string) itype, _ := buyer["type"].(string) count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count() log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{ "name": 1, "buyer_name": 1, "institute_type": 1, "buyerclass": 1, "fixedphone": 1, "mobilephone": 1, "latestfixedphone": 1, "latestmobilephone": 1, "province": 1, "city": 1, }).Sort("_id").Iter() arrEs := []map[string]interface{}{} buyerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} i := 0 for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 { if i%1000 == 0 { log.Println("current:", i) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() util.Debug(tmp) savetmp := map[string]interface{}{} tmp_id := util.BsonIdToSId(tmp["_id"]) savetmp["_id"] = tmp_id savetmp["name"] = tmp["name"] savetmp["buyer_name"] = tmp["name"] util.Debug(tmp["buyerclass"]) if tmp["buyerclass"] != nil { savetmp["buyerclass"] = tmp["buyerclass"] } for _, f := range fieldArr { if val := util.ObjToString(tmp[f]); val != "" { savetmp[f] = val } } util.Debug(savetmp) buyerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= MgoBulkSize { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() buyerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() log.Println(mapInfo, "create buyer index...over", i) } //buyer_enterprise func buyerTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //mongo sess := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(sess) c, _ := buyer["collect"].(string) index, _ := buyer["index"].(string) itype, _ := buyer["type"].(string) count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count() log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{ "buyer_name": 1, "institute_type": 1, "buyerclass": 1, "fixedphone": 1, "mobilephone": 1, "latestfixedphone": 1, "latestmobilephone": 1, "province": 1, "city": 1, }).Sort("_id").Iter() arrEs := []map[string]interface{}{} buyerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} i := 0 for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 { if i%1000 == 0 { log.Println("current:", i) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} _id := util.BsonIdToSId(tmp["_id"]) //if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 { // for _, v := range util.ObjArrToStringArr(buyerclass) { // if len(buyerclass) >= 2 && v != "其它" { // savetmp["buyerclass"] = v // break // } else if len(buyerclass) == 1 { // savetmp["buyerclass"] = v // break // } // } //} if util.ObjToString(tmp["buyerclass"]) != "" { savetmp["buyerclass"] = tmp["buyerclass"] } savetmp["_id"] = _id savetmp["name"] = tmp["buyer_name"] savetmp["buyer_name"] = tmp["buyer_name"] for _, f := range fieldArr { if val := util.ObjToString(tmp[f]); val != "" { savetmp[f] = val } } buyerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() buyerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } buyerEsLock.Unlock() log.Println(mapInfo, "create buyer index...over", i) }