fieldmain.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. // fieldmain
  2. package service
  3. import (
  4. "log"
  5. qu "qfw/util"
  6. "sync"
  7. "time"
  8. )
  9. func init() {
  10. ResultInfos = []*ResultInfo{}
  11. }
  12. func DataClean(udpInfo map[string]interface{}) {
  13. defer qu.Catch()
  14. gtid := udpInfo["gtid"].(string)
  15. lteid := udpInfo["lteid"].(string)
  16. q := map[string]interface{}{
  17. "_id": map[string]interface{}{
  18. "$gt": qu.StringTOBsonId(gtid),
  19. "$lte": qu.StringTOBsonId(lteid),
  20. },
  21. }
  22. mdb := Mogdbs["extract"]
  23. sess := mdb.Mongodb.GetMgoConn()
  24. defer mdb.Mongodb.DestoryMongoConn(sess)
  25. query := sess.DB(mdb.Name).C(mdb.Coll).Find(q).Iter()
  26. index := 0
  27. wg := &sync.WaitGroup{}
  28. for tmp := make(map[string]interface{}); query.Next(tmp); index++ {
  29. //清洗数据流开始
  30. wg.Add(1)
  31. ClearThread <- true
  32. go func(data map[string]interface{}) {
  33. defer func() {
  34. <-ClearThread
  35. wg.Done()
  36. }()
  37. Fields(data)
  38. }(tmp)
  39. tmp = make(map[string]interface{})
  40. if index%1000 == 0 {
  41. log.Println("清洗数据量:", index)
  42. }
  43. }
  44. wg.Wait()
  45. log.Println("清洗数据量:", index)
  46. time.Sleep(5 * time.Second)
  47. for {
  48. if len(UpResults) < 1 {
  49. break
  50. }
  51. }
  52. //清洗完成,next node
  53. if nextnodes, ok := Sysconfig["nextnode"].([]interface{}); ok {
  54. for _, nextnode := range nextnodes {
  55. if tmp, ok := nextnode.(map[string]interface{}); ok {
  56. //todo 发送通知
  57. log.Println(tmp)
  58. }
  59. }
  60. }
  61. }
  62. func Fields(tmp map[string]interface{}) {
  63. data := &ResultInfo{}
  64. data.BidAmount(tmp) //中标金额清理
  65. //其他字段清洗...
  66. if data.Flag > 0 { //有字段发生变化
  67. ResultInfos = append(ResultInfos, data)
  68. }
  69. }