package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" ) //biddingAllData 处理芜湖存量bidding数据 func biddingAllData() { type Biddingall struct { Coll string Gtid string Lteid string } type RoutinesConf struct { Num int } type AllConf struct { All map[string]Biddingall Routines RoutinesConf } var all AllConf viper.SetConfigFile("biddingall.toml") viper.SetConfigName("biddingall") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") err := viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 fmt.Println("ReadInConfig err =>", err) return } err = viper.Unmarshal(&all) if err != nil { fmt.Println("biddingAllDataTask Unmarshal err =>", err) return } for k, conf := range all.All { go dealData(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num) } } func dealData(coll, gtid, lteid, kword string, routines int) { ch := make(chan bool, routines) wg := &sync.WaitGroup{} q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } biddingConn := Mgo.GetMgoConn() it := biddingConn.DB("qfw").C(coll).Find(&q).Select(nil).Iter() c1, index := 0, 0 var indexLock sync.Mutex for tmp := make(map[string]interface{}); it.Next(tmp); c1++ { if c1%20000 == 0 { log.Info(kword, zap.Int("current:", c1)) log.Info(kword, zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { return } //1.采购单位 buyer := util.ObjToString(tmp["buyer"]) rests := tree.Match(buyer, true) if len(rests) > 0 { indexLock.Lock() index++ indexLock.Unlock() Mgo.SaveByOriID("bidding_wuhu_all", tmp) return } //2.中标单位 winner := util.ObjToString(tmp["winner"]) rests = tree.Match(winner, true) if len(rests) > 0 { indexLock.Lock() index++ indexLock.Unlock() Mgo.SaveByOriID("bidding_wuhu_all", tmp) return } //3.中标候选人 winnerorder, ok := tmp["winnerorder"].([]map[string]interface{}) if ok { for _, v := range winnerorder { res := tree.Match(util.ObjToString(v["entname"]), true) if len(res) > 0 { Mgo.SaveByOriID("bidding_wuhu_all", tmp) return } } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index)) }