12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- // fieldmain
- package service
- import (
- "log"
- qu "qfw/util"
- "sync"
- "time"
- )
- func init() {
- ResultInfos = []*ResultInfo{}
- }
- func DataClean(udpInfo map[string]interface{}) {
- defer qu.Catch()
- gtid := udpInfo["gtid"].(string)
- lteid := udpInfo["lteid"].(string)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": qu.StringTOBsonId(gtid),
- "$lte": qu.StringTOBsonId(lteid),
- },
- }
- mdb := Mogdbs["extract"]
- sess := mdb.Mongodb.GetMgoConn()
- defer mdb.Mongodb.DestoryMongoConn(sess)
- query := sess.DB(mdb.Name).C(mdb.Coll).Find(q).Iter()
- index := 0
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); index++ {
- //清洗数据流开始
- wg.Add(1)
- ClearThread <- true
- go func(data map[string]interface{}) {
- defer func() {
- <-ClearThread
- wg.Done()
- }()
- Fields(data)
- }(tmp)
- tmp = make(map[string]interface{})
- if index%1000 == 0 {
- log.Println("清洗数据量:", index)
- }
- }
- wg.Wait()
- log.Println("清洗数据量:", index)
- time.Sleep(5 * time.Second)
- for {
- if len(UpResults) < 1 {
- break
- }
- }
- //清洗完成,next node
- if nextnodes, ok := Sysconfig["nextnode"].([]interface{}); ok {
- for _, nextnode := range nextnodes {
- if tmp, ok := nextnode.(map[string]interface{}); ok {
- //todo 发送通知
- log.Println(tmp)
- }
- }
- }
- }
- func Fields(tmp map[string]interface{}) {
- data := &ResultInfo{}
- data.BidAmount(tmp) //中标金额清理
- //其他字段清洗...
- if data.Flag > 0 { //有字段发生变化
- ResultInfos = append(ResultInfos, data)
- }
- }
|