123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package main
- import (
- "fmt"
- "github.com/spf13/viper"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "sync"
- )
- //biddingAllData 处理芜湖存量bidding数据
- func biddingAllData() {
- type Biddingall struct {
- Coll string
- Gtid string
- Lteid string
- }
- type RoutinesConf struct {
- Num int
- }
- type AllConf struct {
- All map[string]Biddingall
- Routines RoutinesConf
- }
- var all AllConf
- viper.SetConfigFile("biddingall.toml")
- viper.SetConfigName("biddingall") // 配置文件名称(无扩展名)
- viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
- viper.AddConfigPath("./")
- err := viper.ReadInConfig() // 查找并读取配置文件
- if err != nil { // 处理读取配置文件的错误
- fmt.Println("ReadInConfig err =>", err)
- return
- }
- err = viper.Unmarshal(&all)
- if err != nil {
- fmt.Println("biddingAllDataTask Unmarshal err =>", err)
- return
- }
- for k, conf := range all.All {
- go dealData(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num)
- }
- }
- func dealData(coll, gtid, lteid, kword string, routines int) {
- ch := make(chan bool, routines)
- wg := &sync.WaitGroup{}
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(gtid),
- "$lte": mongodb.StringTOBsonId(lteid),
- },
- }
- biddingConn := Mgo.GetMgoConn()
- it := biddingConn.DB("qfw").C(coll).Find(&q).Select(nil).Iter()
- c1, index := 0, 0
- var indexLock sync.Mutex
- for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
- if c1%20000 == 0 {
- log.Info(kword, zap.Int("current:", c1))
- log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- return
- }
- //1.采购单位
- buyer := util.ObjToString(tmp["buyer"])
- rests := tree.Match(buyer, true)
- if len(rests) > 0 {
- indexLock.Lock()
- index++
- indexLock.Unlock()
- Mgo.SaveByOriID("bidding_wuhu_all", tmp)
- return
- }
- //2.中标单位
- winner := util.ObjToString(tmp["winner"])
- rests = tree.Match(winner, true)
- if len(rests) > 0 {
- indexLock.Lock()
- index++
- indexLock.Unlock()
- Mgo.SaveByOriID("bidding_wuhu_all", tmp)
- return
- }
- //3.中标候选人
- winnerorder, ok := tmp["winnerorder"].([]map[string]interface{})
- if ok {
- for _, v := range winnerorder {
- res := tree.Match(util.ObjToString(v["entname"]), true)
- if len(res) > 0 {
- Mgo.SaveByOriID("bidding_wuhu_all", tmp)
- return
- }
- }
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
- }
|