package main import ( "context" "fmt" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "math/rand" "time" ) var ( MongoStd, MongoBase *mongodb.MongodbSim GF GlobalConf // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) seed = 188 findThread = 5 updateThread = 4 seoidCh = make(chan int64, 1000) ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitMgo() { MongoStd = &mongodb.MongodbSim{ MongodbAddr: GF.MongoStd.Host, Size: GF.MongoStd.Size, DbName: GF.MongoStd.DB, UserName: GF.MongoStd.Username, Password: GF.MongoStd.Password, Direct: GF.MongoStd.Direct, } MongoStd.InitPool() MongoBase = &mongodb.MongodbSim{ MongodbAddr: GF.MongoBase.Host, Size: GF.MongoBase.Size, DbName: GF.MongoBase.DB, UserName: GF.MongoBase.Username, Password: GF.MongoBase.Password, Direct: GF.MongoBase.Direct, } MongoBase.InitPool() } func main() { InitConfig() InitMgo() id := int64(GF.Env.Autoid) //起始 autoid //生成seoid //31395235740 startSeoId := int64(GF.Env.Nseoid) //起始 nseo_id rand.Seed(time.Now().UnixNano()) go func() { for { startSeoId += int64(rand.Intn(seed) + 2) seoidCh <- startSeoId } }() go updateMethod() loop := 0 count := int64(0) for loop < 10 { //数据处理线程 dealTh := make(chan bool, 20) defer func() { for i := 0; i < 20; i++ { dealTh <- true } }() sess := MongoBase.GetMgoConn() defer MongoBase.DestoryMongoConn(sess) ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour) coll := sess.M.C.Database("mixdata").Collection("company_base") find := options.Find().SetBatchSize(200).SetSort(bson.D{bson.E{"_id", 1}}).SetProjection(bson.M{"_id": 1, "company_id": 1, "establish_date": 1, "create_time": 1}) cur, err := coll.Find(ctx, bson.M{"_id": bson.M{"$gt": id}}, find) if err != nil { log.Println("mgo find err", err.Error()) return } for tmp := make(map[string]interface{}); cur.Next(ctx); { count++ if cur != nil { cur.Decode(&tmp) id = util.Int64All(tmp["_id"]) dealTh <- true go func(tmp map[string]interface{}) { defer func() { <-dealTh }() Data(tmp) }(tmp) tmp = make(map[string]interface{}) } else { cur.Close(ctx) break } if count%10000 == 0 { go log.Println("current,id", count, id) } } loop++ } log.Println("over ---- ") time.Sleep(100000 * time.Hour) } func Data(tmp map[string]interface{}) { seoid := <-seoidCh ed, _ := tmp["establish_date"].(string) pre := "" if len(ed) == 10 { pre = ed[2:4] + ed[5:7] + ed[8:10] } else { cd, _ := tmp["create_time"].(string) if len(cd) > 9 { pre = cd[2:4] + cd[5:7] + cd[8:10] } } updatePool <- []map[string]interface{}{ {"_id": tmp["company_id"]}, {"$set": bson.M{ "nseo_id": fmt.Sprintf("%s%d", pre, seoid), "autoid": tmp["_id"], }}, } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 8) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoStd.UpdateBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoStd.UpdateBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }