package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" "sync" ) var ( GF GlobalConf Mgo *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址 MgoB *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址 MgoM *mongodb.MongodbSim //86 marked 表 ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitLog() { err := log.InitLog( //log.Path("./logs/log.out"), log.Path(""), log.Level("info"), log.Compress(true), log.MaxSize(10), log.MaxBackups(10), log.MaxAge(7), log.Format("json"), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) } } func InitMgo() { Mgo = &mongodb.MongodbSim{ MongodbAddr: GF.Mongo.Host, DbName: GF.Mongo.DB, Size: GF.Mongo.Size, UserName: GF.Mongo.Username, Password: GF.Mongo.Password, Direct: GF.Mongo.Direct, } Mgo.InitPool() MgoB = &mongodb.MongodbSim{ MongodbAddr: GF.Mongo.Host, DbName: "qfw", Size: GF.Mongo.Size, UserName: GF.Mongo.Username, Password: GF.Mongo.Password, Direct: GF.Mongo.Direct, } MgoB.InitPool() MgoM = &mongodb.MongodbSim{ MongodbAddr: GF.MongoM.Host, DbName: GF.MongoM.DB, Size: GF.MongoM.Size, UserName: GF.MongoM.Username, Password: GF.MongoM.Password, Direct: GF.MongoM.Direct, } MgoM.InitPool() } func main() { err := InitConfig() if err != nil { panic(err) } InitLog() InitMgo() deal() fmt.Println("over") } func deal() { sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) count := 0 query := sess.DB(GF.Mongo.DB).C("bidding").Find(nil).Select(nil).Iter() ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { fmt.Println("current", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() // biddingId := "" if id, ok := tmp["id"]; ok { marked, _ := MgoB.FindById("bidding", util.ObjToString(id), nil) if len(*marked) == 0 { return } biddingId = util.ObjToString(id) } else { marked, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(tmp["_id"]), nil) if len(*marked) == 0 { return } biddingId = mongodb.BsonIdToSId(tmp["_id"]) } delete(tmp, "_id") tmp["_id"] = mongodb.StringTOBsonId(biddingId) fields := strings.Split(GF.Env.NoFields, ",") for _, v := range fields { delete(tmp, v) } err := Mgo.InsertOrUpdate("qfw_high", "wcc_bidding", tmp) if err != nil { log.Info("deal", zap.String("失败", biddingId)) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() fmt.Println("数据处理完毕") }