123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- package main
- import (
- "fmt"
- "github.com/spf13/viper"
- "go.uber.org/zap"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- "sync"
- )
- var (
- GF GlobalConf
- Mgo *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址
- MgoB *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址
- MgoM *mongodb.MongodbSim //86 marked 表
- )
- func InitConfig() (err error) {
- viper.SetConfigFile("config.toml") // 指定配置文件路径
- viper.SetConfigName("config") // 配置文件名称(无扩展名)
- viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
- viper.AddConfigPath("./")
- viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
- viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
- err = viper.ReadInConfig() // 查找并读取配置文件
- if err != nil { // 处理读取配置文件的错误
- return
- }
- err = viper.Unmarshal(&GF)
- return err
- }
- func InitLog() {
- err := log.InitLog(
- //log.Path("./logs/log.out"),
- log.Path(""),
- log.Level("info"),
- log.Compress(true),
- log.MaxSize(10),
- log.MaxBackups(10),
- log.MaxAge(7),
- log.Format("json"),
- )
- if err != nil {
- fmt.Printf("InitLog failed: %v\n", err)
- }
- }
- func InitMgo() {
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: GF.Mongo.Host,
- DbName: GF.Mongo.DB,
- Size: GF.Mongo.Size,
- UserName: GF.Mongo.Username,
- Password: GF.Mongo.Password,
- Direct: GF.Mongo.Direct,
- }
- Mgo.InitPool()
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: GF.Mongo.Host,
- DbName: "qfw",
- Size: GF.Mongo.Size,
- UserName: GF.Mongo.Username,
- Password: GF.Mongo.Password,
- Direct: GF.Mongo.Direct,
- }
- MgoB.InitPool()
- MgoM = &mongodb.MongodbSim{
- MongodbAddr: GF.MongoM.Host,
- DbName: GF.MongoM.DB,
- Size: GF.MongoM.Size,
- UserName: GF.MongoM.Username,
- Password: GF.MongoM.Password,
- Direct: GF.MongoM.Direct,
- }
- MgoM.InitPool()
- }
- func main() {
- err := InitConfig()
- if err != nil {
- panic(err)
- }
- InitLog()
- InitMgo()
- deal()
- fmt.Println("over")
- }
- func deal() {
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- count := 0
- query := sess.DB(GF.Mongo.DB).C("bidding").Find(nil).Select(nil).Iter()
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- fmt.Println("current", count)
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- //
- biddingId := ""
- if id, ok := tmp["id"]; ok {
- marked, _ := MgoB.FindById("bidding", util.ObjToString(id), nil)
- if len(*marked) == 0 {
- return
- }
- biddingId = util.ObjToString(id)
- } else {
- marked, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(tmp["_id"]), nil)
- if len(*marked) == 0 {
- return
- }
- biddingId = mongodb.BsonIdToSId(tmp["_id"])
- }
- delete(tmp, "_id")
- tmp["_id"] = mongodb.StringTOBsonId(biddingId)
- fields := strings.Split(GF.Env.NoFields, ",")
- for _, v := range fields {
- delete(tmp, v)
- }
- err := Mgo.InsertOrUpdate("qfw_high", "wcc_bidding", tmp)
- if err != nil {
- log.Info("deal", zap.String("失败", biddingId))
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- fmt.Println("数据处理完毕")
- }
|