project.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package main
  2. import (
  3. "fmt"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "log"
  7. "sync"
  8. )
  9. func updateProject() {
  10. defer util.Catch()
  11. // 项目数据
  12. MgoP := &mongodb.MongodbSim{
  13. MongodbAddr: "172.17.4.85:27080",
  14. //MongodbAddr: "127.0.0.1:27080",
  15. Size: 10,
  16. DbName: "qfw",
  17. //Direct: true,
  18. }
  19. MgoP.InitPool()
  20. sess := MgoP.GetMgoConn()
  21. defer MgoP.DestoryMongoConn(sess)
  22. selected := map[string]interface{}{"projectname": 1, "tag_topinformation": 1, "pici": 1}
  23. it := sess.DB("qfw").C("projectset_20230904").Find(nil).Select(selected).Sort("pici").Iter()
  24. fmt.Println("updateProject 开始")
  25. count := 0
  26. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  27. if count%10000 == 0 {
  28. log.Println("current", count, tmp["projectname"])
  29. }
  30. if _, ok := tmp["tag_topinformation"]; ok {
  31. update := map[string]interface{}{
  32. "tag_topinformation": tmp["tag_topinformation"],
  33. }
  34. projectID := mongodb.BsonIdToSId(tmp["_id"])
  35. // 更新es
  36. updateEsPool <- []map[string]interface{}{
  37. {"_id": projectID},
  38. //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
  39. update,
  40. }
  41. log.Println("aaaaaaaaaaaa", projectID, tmp["projectname"])
  42. //err := Es.UpdateDocument("projectset", projectID, update)
  43. //if err != nil {
  44. // log.Println(projectID, tmp["projectname"], "更新es 失败")
  45. //}
  46. }
  47. }
  48. log.Println("结束~~~~~~~~~~~~~")
  49. }
  50. // updateProjectDetail 更新项目详情字段
  51. func updateProjectDetail() {
  52. defer util.Catch()
  53. // 项目数据
  54. sess := MgoR.GetMgoConn()
  55. defer MgoR.DestoryMongoConn(sess)
  56. where := map[string]interface{}{
  57. "pici": map[string]interface{}{
  58. "$gt": 1111,
  59. "$lte": 222,
  60. },
  61. }
  62. it := sess.DB("qfw").C("projectset_20230904").Find(where).Select(nil).Iter()
  63. fmt.Println("updateProject 开始")
  64. count := 0
  65. ch := make(chan bool, 15)
  66. wg := &sync.WaitGroup{}
  67. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  68. if count%2000 == 0 {
  69. log.Println("current", count, tmp["pici"], tmp["_id"])
  70. }
  71. ch <- true
  72. wg.Add(1)
  73. go func(tmp map[string]interface{}) {
  74. defer func() {
  75. <-ch
  76. wg.Done()
  77. }()
  78. projectID := mongodb.BsonIdToSId(tmp["_id"])
  79. update := make(map[string]interface{})
  80. //项目详情,辅助字段,处理过的list里面id,英文逗号拼接
  81. detailIds := make([]string, 0)
  82. detail := make([]string, 0) //最终的详情字段
  83. list := tmp["list"].([]interface{})
  84. for _, m := range list {
  85. tmpM := m.(map[string]interface{})
  86. //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail
  87. infoid := util.ObjToString(tmpM["infoid"])
  88. if infoid != "" {
  89. detailIds = append(detailIds, infoid)
  90. if infoid > "5a862e7040d2d9bbe88e3b1f" {
  91. biddingData, _ := MgoB.FindById("bidding", infoid, nil)
  92. biddingDetail := util.ObjToString((*biddingData)["detail"])
  93. da, _ := CleanHTMLTags(biddingDetail)
  94. characterArray := SplitTextByChinesePunctuation(da)
  95. detail = append(detail, RemoveDuplicates(characterArray)...)
  96. } else {
  97. biddingData, _ := MgoB.FindById("bidding_back", infoid, nil)
  98. biddingDetail := util.ObjToString((*biddingData)["detail"])
  99. da, _ := CleanHTMLTags(biddingDetail)
  100. characterArray := SplitTextByChinesePunctuation(da)
  101. detail = append(detail, RemoveDuplicates(characterArray)...)
  102. }
  103. }
  104. }
  105. if len(detail) > 0 {
  106. detailNew := RemoveDuplicates(detail)
  107. update["detail"] = detailNew
  108. // 更新es
  109. updateEsPool <- []map[string]interface{}{
  110. {"_id": projectID},
  111. //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
  112. update,
  113. }
  114. //newTmp["detail"] = strings.Join(detailNew, " ")
  115. }
  116. //
  117. //
  118. }(tmp)
  119. }
  120. wg.Wait()
  121. log.Println("数据处理完毕")
  122. }