update.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // update
  2. package service
  3. import (
  4. "log"
  5. qu "qfw/util"
  6. "time"
  7. )
  8. func init() {
  9. go update()
  10. }
  11. func update() {
  12. for {
  13. if len(ResultInfos) > 0 {
  14. UpdateLock.Lock()
  15. UpResults = ResultInfos
  16. ResultInfos = []*ResultInfo{}
  17. UpdateLock.Unlock()
  18. up()
  19. UpResults = []*ResultInfo{}
  20. }
  21. time.Sleep(5 * time.Second)
  22. }
  23. }
  24. func up() {
  25. log.Println("开始更新")
  26. binfos := []*BidInfo{}
  27. einfos := []*ExtractInfo{}
  28. pinfos := []*ProjectInfo{}
  29. for _, v := range UpResults {
  30. if v.Bidding != nil {
  31. binfos = append(binfos, v.Bidding) //招标信息清洗结果
  32. }
  33. if v.Extract != nil {
  34. einfos = append(einfos, v.Extract) //抽取清洗结果
  35. }
  36. if v.Project != nil {
  37. pinfos = append(pinfos, v.Project) //项目清洗结果
  38. }
  39. }
  40. updateB := [][]map[string]interface{}{}
  41. updateE := [][]map[string]interface{}{}
  42. updateP := [][]map[string]interface{}{}
  43. if len(binfos) > 0 { //招标信息:拼装更新语句
  44. for _, tmp := range binfos {
  45. set := map[string]interface{}{}
  46. for _, v := range tmp.UpParam {
  47. set[v] = tmp.Data[v]
  48. }
  49. tmparr := []map[string]interface{}{
  50. map[string]interface{}{
  51. "_id": qu.StringTOBsonId(tmp.Id),
  52. },
  53. map[string]interface{}{"$set": set},
  54. }
  55. updateB = append(updateB, tmparr)
  56. }
  57. }
  58. if len(einfos) > 0 { //抽取信息:拼装更新语句
  59. for _, tmp := range einfos {
  60. set := map[string]interface{}{}
  61. for _, v := range tmp.UpParam {
  62. set[v] = tmp.Data[v]
  63. }
  64. tmparr := []map[string]interface{}{
  65. map[string]interface{}{
  66. "_id": qu.StringTOBsonId(tmp.Id),
  67. },
  68. map[string]interface{}{"$set": set},
  69. }
  70. updateE = append(updateE, tmparr)
  71. }
  72. }
  73. if len(pinfos) > 0 { //项目信息:拼装更新语句
  74. for _, tmp := range pinfos {
  75. set := map[string]interface{}{}
  76. for _, v := range tmp.UpParam {
  77. set[v] = tmp.Data[v]
  78. }
  79. tmparr := []map[string]interface{}{
  80. map[string]interface{}{
  81. "_id": qu.StringTOBsonId(tmp.Id),
  82. },
  83. map[string]interface{}{"$set": set},
  84. }
  85. updateP = append(updateP, tmparr)
  86. }
  87. }
  88. //执行更新
  89. if len(updateB) > 0 {
  90. db := Mogdbs["qfw"]
  91. db.Mongodb.UpSertBulk(db.Coll, updateB...)
  92. }
  93. if len(updateE) > 0 {
  94. db := Mogdbs["extract"]
  95. db.Mongodb.UpSertBulk(db.Coll, updateE...)
  96. }
  97. if len(updateP) > 0 {
  98. db := Mogdbs["project"]
  99. db.Mongodb.UpSertBulk(db.Coll, updateP...)
  100. }
  101. log.Println("更新完毕", "updateB:", len(updateE), "updateE:", len(updateB), "updateP:", len(updateP))
  102. }