// fieldmain package service import ( "log" qu "qfw/util" "sync" "time" ) func init() { ResultInfos = []*ResultInfo{} } func DataClean(udpInfo map[string]interface{}) { defer qu.Catch() gtid := udpInfo["gtid"].(string) lteid := udpInfo["lteid"].(string) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": qu.StringTOBsonId(gtid), "$lte": qu.StringTOBsonId(lteid), }, } mdb := Mogdbs["extract"] sess := mdb.Mongodb.GetMgoConn() defer mdb.Mongodb.DestoryMongoConn(sess) query := sess.DB(mdb.Name).C(mdb.Coll).Find(q).Iter() index := 0 wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); index++ { //清洗数据流开始 wg.Add(1) ClearThread <- true go func(data map[string]interface{}) { defer func() { <-ClearThread wg.Done() }() Fields(data) }(tmp) tmp = make(map[string]interface{}) if index%1000 == 0 { log.Println("清洗数据量:", index) } } wg.Wait() log.Println("清洗数据量:", index) time.Sleep(5 * time.Second) for { if len(UpResults) < 1 { break } } //清洗完成,next node if nextnodes, ok := Sysconfig["nextnode"].([]interface{}); ok { for _, nextnode := range nextnodes { if tmp, ok := nextnode.(map[string]interface{}); ok { //todo 发送通知 log.Println(tmp) } } } } func Fields(tmp map[string]interface{}) { data := &ResultInfo{} data.BidAmount(tmp) //中标金额清理 //其他字段清洗... if data.Flag > 0 { //有字段发生变化 ResultInfos = append(ResultInfos, data) } }