util.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package util
  2. import (
  3. mu "mfw/util"
  4. "mongodb"
  5. qu "qfw/util"
  6. "sync"
  7. )
  8. var (
  9. Config map[string]interface{}
  10. Mgo *mongodb.MongodbSim
  11. MgoB *mongodb.MongodbSim
  12. Coll string
  13. StartID string //bidding_processing_ids表ID
  14. //udp
  15. Udpclient mu.UdpClient //udp对象
  16. UdpPort string
  17. NextAddr string
  18. NextPort int
  19. NextStype string
  20. //mail
  21. UdptaskMap = &sync.Map{}
  22. Tomail string
  23. Api string
  24. //ocr
  25. OcrServerAddr string //ocr服务治理中心
  26. )
  27. func GetIdInterval(id string) (gtid, lteid string) {
  28. defer qu.Catch()
  29. qu.Debug("获取id段...")
  30. query := map[string]interface{}{
  31. "_id": map[string]interface{}{
  32. "$gt": mongodb.StringTOBsonId(id),
  33. },
  34. "dataprocess": 8,
  35. }
  36. list, _ := MgoB.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, 0, 1) //查找一个id区间
  37. if len(*list) > 0 {
  38. gtid = qu.ObjToString((*list)[0]["gtid"]) //起始id
  39. lteid = qu.ObjToString((*list)[0]["lteid"]) //结束id
  40. StartID = mongodb.BsonIdToSId((*list)[0]["_id"])
  41. qu.Debug("当前轮ID区间:", gtid, lteid, "表ID:", StartID)
  42. return
  43. }
  44. return "", ""
  45. }
  46. func UpdateBiddingData(gtid, lteid string) {
  47. defer qu.Catch()
  48. //查询数据
  49. sess := MgoB.GetMgoConn()
  50. defer MgoB.DestoryMongoConn(sess)
  51. ch := make(chan bool, 10)
  52. wg := &sync.WaitGroup{}
  53. lock := &sync.Mutex{}
  54. query := map[string]interface{}{
  55. "_id": map[string]interface{}{
  56. "$gt": mongodb.StringTOBsonId(gtid),
  57. "$lte": mongodb.StringTOBsonId(lteid),
  58. },
  59. }
  60. fields := map[string]interface{}{
  61. "file_add_log": 0,
  62. }
  63. it := sess.DB(MgoB.DbName).C("bidding_downloadfile_log").Find(&query).Select(&fields).Iter()
  64. n := 0
  65. arr := [][]map[string]interface{}{}
  66. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  67. ch <- true
  68. wg.Add(1)
  69. go func(tmp map[string]interface{}) {
  70. defer func() {
  71. <-ch
  72. wg.Done()
  73. }()
  74. update := []map[string]interface{}{
  75. {"_id": tmp["_id"]},
  76. {"$set": tmp},
  77. }
  78. lock.Lock()
  79. arr = append(arr, update)
  80. if len(arr) > 100 {
  81. MgoB.UpdateBulk("bidding", arr...)
  82. arr = [][]map[string]interface{}{}
  83. }
  84. lock.Unlock()
  85. }(tmp)
  86. if n%100 == 0 {
  87. qu.Debug("current:", n)
  88. }
  89. tmp = map[string]interface{}{}
  90. }
  91. wg.Wait()
  92. if len(arr) > 0 {
  93. MgoB.UpdateBulk("bidding", arr...)
  94. arr = [][]map[string]interface{}{}
  95. }
  96. qu.Debug("更新数据完毕:", gtid, lteid)
  97. }