package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "math/rand" "strconv" "strings" "time" ) var ( MongoBase *mongodb.MongodbSim MongoStd *mongodb.MongodbSim GF GlobalConf Es *elastic.Elastic //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 2) //保存协程 // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitES() { Es = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19805", S_esurl: "http://172.17.4.184:19805", I_size: 5, Username: "es_all", Password: "TopJkO2E_d1x", } Es.InitElasticSize() } func InitMgo() { MongoStd = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: GF.MongoStd.Host, Size: 10, DbName: GF.MongoStd.DB, UserName: GF.MongoStd.Username, Password: GF.MongoStd.Password, Direct: GF.MongoStd.Direct, } MongoStd.InitPool() MongoBase = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27001", MongodbAddr: GF.MongoBase.Host, Size: 10, DbName: GF.MongoBase.DB, UserName: GF.MongoBase.Username, Password: GF.MongoBase.Password, Direct: GF.MongoBase.Direct, } MongoBase.InitPool() } func InitLog() { err := log.InitLog( log.Path("./logs/log.out"), //log.Path(""), log.Level("info"), log.Compress(true), log.MaxSize(10), log.MaxBackups(10), log.MaxAge(7), log.Format("json"), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) } } func main() { InitConfig() InitMgo() InitLog() InitES() go updateEsMethod() go updateMethod() sess := MongoBase.GetMgoConn() defer MongoBase.DestoryMongoConn(sess) query := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter() count := 0 //ch := make(chan bool, 20) //wg := &sync.WaitGroup{} var secondTmp int = 10000 //var lock sync.Mutex for tmp := make(map[string]interface{}); query.Next(tmp); count++ { //if count%1000 == 0 { // log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int("pre secondTmp", secondTmp)) //} companyId := util.ObjToString(tmp["company_id"]) stdUpdate := make(map[string]interface{}, 0) //lock.Lock() secondTmp += getRandom() //lock.Unlock() second := strconv.Itoa(secondTmp) var dates []string var first string // 注册时间不为空 if util.ObjToString(tmp["establish_date"]) != "" { dates = strings.Split(util.ObjToString(tmp["establish_date"]), "-") } else { dates = strings.Split(strings.Split(util.ObjToString(tmp["create_time"]), " ")[0], "-") } first = strings.Join(dates, "") first = first[2:] seo_id := first + second stdUpdate["seo_id"] = seo_id stdUpdate["autoid"] = tmp["_id"] //stdUpdateData["$set"] = stdUpdate if count%1000 == 0 { log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int(" secondTmp", secondTmp), zap.String("seo_id", seo_id)) } //MongoStd.Update("qyxy_std", map[string]interface{}{"_id": companyId}, stdUpdateData, true, false) //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": companyId}, {"$set": stdUpdate}, } // 更新es updateEsPool <- []map[string]interface{}{ {"_id": companyId}, stdUpdate, } //ch <- true //wg.Add(1) //go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // // companyId := util.ObjToString(tmp["company_id"]) // stdUpdate := make(map[string]interface{}, 0) // //stdUpdateData := make(map[string]interface{}, 0) // // lock.Lock() // secondTmp += getRandom() // lock.Unlock() // second := strconv.Itoa(secondTmp) // // // // //if count%1000 == 0 { // // log.Info("main", zap.Any("current ", count), zap.Any("_id", tmp["_id"]), zap.Int("secondTmp", secondTmp)) // //} // // var dates []string // var first string // // 注册时间不为空 // if util.ObjToString(tmp["establish_date"]) != "" { // dates = strings.Split(util.ObjToString(tmp["establish_date"]), "-") // } else { // dates = strings.Split(strings.Split(util.ObjToString(tmp["create_time"]), " ")[0], "-") // } // // first = strings.Join(dates, "") // first = first[2:] // seo_id := first + second // stdUpdate["seo_id"] = seo_id // stdUpdate["autoid"] = tmp["_id"] // //stdUpdateData["$set"] = stdUpdate // // //MongoStd.Update("qyxy_std", map[string]interface{}{"_id": companyId}, stdUpdateData, true, false) // // //更新MongoDB // updatePool <- []map[string]interface{}{ // {"_id": companyId}, // {"$set": stdUpdate}, // } // // // 更新es // updateEsPool <- []map[string]interface{}{ // {"_id": companyId}, // stdUpdate, // } // //}(tmp) tmp = make(map[string]interface{}) } //wg.Wait() log.Info("main", zap.String("deal", "over"), zap.Int("count", count), zap.Int("secondTmp", secondTmp)) select {} } func formatNumber(num, length int) string { numStr := strconv.Itoa(num) // 将数字转换为字符串 numLen := len(numStr) // 获取数字字符串的长度 // 计算需要添加的前导零的个数 leadingZeros := length - numLen // 生成前导零的字符串 leadingZerosStr := "" for i := 0; i < leadingZeros; i++ { leadingZerosStr += "0" } // 将前导零字符串附加到数字字符串前面 formattedStr := leadingZerosStr + numStr return formattedStr } //getRandom getRandom func getRandom() int { // 设置随机种子 rand.Seed(time.Now().UnixNano()) // 生成1到20之间的随机数 randomNumber := rand.Intn(200) + 1 return randomNumber } //updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("qyxy", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("qyxy", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } //updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoStd.UpdateBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoStd.UpdateBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }