package main import ( "context" "fmt" es7 "github.com/olivere/elastic/v7" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sync" "time" ) var ( MongoBase *mongodb.MongodbSim MongoStd *mongodb.MongodbSim GF GlobalConf Es *elastic.Elastic EsClient *es7.Client //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 // 更新mongo //updatePool = make(chan []map[string]interface{}, 5000) //updateSp = make(chan bool, 5) ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitES() { //Es = &elastic.Elastic{ // //S_esurl: "http://127.0.0.1:19805", // S_esurl: "http://172.17.4.184:19805", // I_size: 5, // Username: "es_all", // Password: "TopJkO2E_d1x", //} //Es.InitElasticSize() //Es = &elastic.Elastic{ // //S_esurl: "http://127.0.0.1:19805", // S_esurl: "http://192.168.3.149:9201", // I_size: 5, // Username: "", // Password: "", //} //Es.InitElasticSize() //url := "http://127.0.0.1:19805" url := GF.ES.URL username := GF.ES.Username password := GF.ES.Password // 创建 Elasticsearch 客户端 EsClient, _ = es7.NewClient( es7.SetURL(url), es7.SetBasicAuth(username, password), es7.SetSniff(false), ) fmt.Println(EsClient) } func InitMgo() { //qyxy_std MongoStd = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: GF.MongoStd.Host, Size: 10, DbName: GF.MongoStd.DB, UserName: GF.MongoStd.Username, Password: GF.MongoStd.Password, Direct: GF.MongoStd.Direct, } MongoStd.InitPool() //company_base MongoBase = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27001", MongodbAddr: GF.MongoBase.Host, Size: 10, DbName: GF.MongoBase.DB, UserName: GF.MongoBase.Username, Password: GF.MongoBase.Password, Direct: GF.MongoBase.Direct, } MongoBase.InitPool() } func main() { InitConfig() InitMgo() InitES() go updateEsMethod() sess := MongoStd.GetMgoConn() defer MongoStd.DestoryMongoConn(sess) autoid := int64(GF.Env.Autoid) endid := int64(GF.Env.Endid) ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour) coll := sess.M.C.Database("mixdata").Collection("qyxy_std") find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"autoid", 1}}).SetProjection(bson.M{"_id": 1, "autoid": 1, "nseo_id": 1}) cur, err := coll.Find(ctx, bson.M{"autoid": bson.M{"$gt": autoid, "$lte": endid}}, find) if err != nil { log.Println(err) } else { log.Println("autoid:", autoid, "endid: ", endid) } count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } autoid = util.Int64All(tmp["autoid"]) if count%10000 == 0 { log.Println("current ", count, autoid, tmp["_id"]) } if autoid == 0 { continue } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() companyId := util.ObjToString(tmp["_id"]) stdUpdate := make(map[string]interface{}, 0) stdUpdate["nseo_id"] = tmp["nseo_id"] stdUpdate["autoid"] = tmp["autoid"] // ////更新MongoDB //updatePool <- []map[string]interface{}{ // {"_id": companyId}, // {"$set": stdUpdate}, //} // // 更新es updateEsPool <- []map[string]interface{}{ {"_id": companyId}, stdUpdate, } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over ---- ", count) select {} } //updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 1000) indexu := 0 ctx := context.Background() for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 1000 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() UpdateBulk(EsClient, ctx, "qyxy", arru...) }(arru) arru = make([][]map[string]interface{}, 1000) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() UpdateBulk(EsClient, ctx, "qyxy", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 1000) indexu = 0 } } } } func UpdateBulk(client *es7.Client, c context.Context, index string, docs ...[]map[string]interface{}) (err error) { bulkService := client.Bulk().Index(index).Type("_doc") for _, d := range docs { id := d[0]["_id"].(string) doc := es7.NewBulkUpdateRequest().Id(id).Doc(d[1]) bulkService.Add(doc) } _, err = bulkService.Do(c) if err != nil { fmt.Printf("UpdateBulk all success err is %v\n", err) } return }