package main import ( "context" "github.com/olivere/elastic/v7" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" utils "jygit.jydev.jianyu360.cn/data_processing/common_utils" "strings" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" ) var ( MgoP *mongodb.MongodbSim MgoB *mongodb.MongodbSim GF GlobalConf ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitMgo() { MgoP = &mongodb.MongodbSim{ MongodbAddr: GF.MongoP.Host, Size: 10, DbName: GF.MongoP.DB, UserName: GF.MongoP.Username, Password: GF.MongoP.Password, Direct: GF.MongoP.Direct, } MgoP.InitPool() MgoB = &mongodb.MongodbSim{ MongodbAddr: GF.MongoB.Host, Size: 10, DbName: GF.MongoB.DB, UserName: GF.MongoB.Username, Password: GF.MongoB.Password, Direct: GF.MongoB.Direct, } MgoB.InitPool() } func main() { InitConfig() InitMgo() //InitEs() //sync() updateReview() log.Println("over") select {} } func sync() { sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) ctx := context.Background() coll := sess.M.C.Database("qfw").Collection("projectset_20230904") find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "ids": 1}) cur, err := coll.Find(ctx, bson.D{}, find) if err != nil { log.Println(err) } // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(GF.ES.URL), elastic.SetBasicAuth(GF.ES.Username, GF.ES.Password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } count := 0 for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } if count%10000 == 0 { log.Println("current:", count, tmp["_id"]) } if ids, ok := tmp["ids"].([]interface{}); ok { projectID := mongodb.BsonIdToSId(tmp["_id"]) for i := len(ids) - 1; i >= 0; i-- { idStr := utils.ObjToString(ids[i]) if idStr != "" { if idStr > "5a862e7040d2d9bbe88e3b1f" { bidd, _ := MgoB.FindById("bidding", idStr, map[string]interface{}{"review_experts": 1}) if len(*bidd) > 0 { if review_experts, ok := (*bidd)["review_experts"]; ok { update := map[string]interface{}{ "review_experts": review_experts, } MgoP.UpdateById("projectset_20230904", projectID, map[string]interface{}{"$set": update}) esUpdate := map[string]interface{}{ "review_experts": review_experts, } client.Update(). Index("projectset_v3"). Id(projectID). Doc(esUpdate). Do(context.Background()) break } } } else { bidd, _ := MgoB.FindById("bidding_back", idStr, map[string]interface{}{"review_experts": 1}) if len(*bidd) > 0 { if review_experts, ok := (*bidd)["review_experts"]; ok { update := map[string]interface{}{ "review_experts": review_experts, } MgoP.UpdateById("projectset_20230904", projectID, map[string]interface{}{"$set": update}) esUpdate := map[string]interface{}{ "review_experts": review_experts, } client.Update(). Index("projectset_v3"). Id(projectID). Doc(esUpdate). Do(context.Background()) break } } } } } } tmp = make(map[string]interface{}) } } func updateReview() { sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) ctx := context.Background() coll := sess.M.C.Database("qfw_data").Collection("projectset") find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", 1}}).SetProjection(bson.M{"_id": 1, "review_experts": 1}) cur, err := coll.Find(ctx, bson.D{}, find) if err != nil { log.Println(err) } count := 0 for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } if count%10000 == 0 { log.Println("current:", count, tmp["_id"]) } projectID := mongodb.BsonIdToSId(tmp["_id"]) if reviews, ok := tmp["review_experts"].([]interface{}); ok { if len(reviews) == 0 { continue } else { ds := make([]string, 0) for _, v := range reviews { ds = append(ds, utils.ObjToString(v)) } dsStr := strings.Join(ds, ",") update := map[string]interface{}{ "review_experts": dsStr, } //log.Println("---", projectID) MgoP.UpdateById("projectset", projectID, map[string]interface{}{"$set": update}) } } tmp = make(map[string]interface{}) } log.Println("over") }