task.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "data_sync/config"
  8. "fmt"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "go.uber.org/zap"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. func taskinfoV1() {
  16. defer util.Catch()
  17. sess := MongoV1.GetMgoConn()
  18. defer MongoV1.DestoryMongoConn(sess)
  19. ch := make(chan bool, config.Conf.Serve.Thread)
  20. wg := &sync.WaitGroup{}
  21. //q := bson.M{"_id": mongodb.StringTOBsonId("5caeb0bca5cb26b9b733eb0c")}
  22. f := bson.M{"contenthtml": 0, "description": 0, "detail": 0, "kvtext": 0, "projectinfo": 0}
  23. it := sess.DB(config.Conf.DB.Mongo1.Dbname).C(config.Conf.DB.Mongo1.Coll).Find(nil).Select(&f).Iter()
  24. count := 0
  25. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  26. if count%2000 == 0 {
  27. log.Info("taskinfoV1", zap.Int("current:", count))
  28. }
  29. ch <- true
  30. wg.Add(1)
  31. go func(tmp map[string]interface{}) {
  32. defer func() {
  33. <-ch
  34. wg.Done()
  35. }()
  36. id := mongodb.BsonIdToSId(tmp["_id"])
  37. info, _ := Mongo.FindById("bidding", id, f)
  38. coll := "bidding"
  39. if len(*info) == 0 {
  40. info, _ = Mongo.FindById("bidding_back", id, f)
  41. if len(*info) == 0 {
  42. util.Debug("not find id", id)
  43. return
  44. }
  45. coll = "bidding_back"
  46. }
  47. update := make(map[string]interface{})
  48. updateEs := make(map[string]interface{})
  49. mdinfo := make(map[string]interface{})
  50. record := make(map[string]interface{})
  51. for _, f := range Fields {
  52. if f == "budget" || f == "bidamount" {
  53. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) {
  54. update[f] = util.Float64All(tmp[f])
  55. record[f] = util.Float64All(tmp[f])
  56. updateEs[f] = util.Float64All(tmp[f])
  57. mdinfo[f] = "数据清洗"
  58. }
  59. } else if f == "s_winner" {
  60. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) {
  61. update[f] = util.ObjToString(tmp[f])
  62. record[f] = util.ObjToString(tmp[f])
  63. updateEs[f] = util.ObjToString(tmp[f])
  64. cid := companyFun(update)
  65. update["entidlist"] = cid
  66. record["entidlist"] = cid
  67. updateEs["entidlist"] = cid
  68. }
  69. } else {
  70. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.ObjToString(tmp[f]) != util.ObjToString((*info)[f]) {
  71. update[f] = util.ObjToString(tmp[f])
  72. record[f] = util.ObjToString(tmp[f])
  73. updateEs[f] = util.ObjToString(tmp[f])
  74. mdinfo[f] = "数据清洗"
  75. }
  76. }
  77. }
  78. for _, f := range config.Conf.Serve.ExField {
  79. if f == "package" {
  80. }
  81. }
  82. if md, ok := (*info)["modifyinfo"].(map[string]interface{}); ok {
  83. for k, v := range mdinfo {
  84. md[k] = v
  85. }
  86. mdinfo = md
  87. }
  88. now := time.Now().Unix()
  89. record["updatetime"] = now
  90. record["modifypath"] = "数据清洗"
  91. if len(update) > 0 {
  92. updateEsPool <- []map[string]interface{}{
  93. {"_id": id},
  94. updateEs,
  95. }
  96. update["modifyinfo"] = mdinfo
  97. if coll == "bidding" {
  98. updatePool <- []map[string]interface{}{
  99. {"_id": mongodb.StringTOBsonId(id)},
  100. {"$set": update},
  101. }
  102. } else {
  103. Mongo.UpdateById("bidding_back", id, map[string]interface{}{"$set": update})
  104. }
  105. updateRcPool <- []map[string]interface{}{
  106. {"_id": mongodb.StringTOBsonId(id)},
  107. {"$set": bson.M{"updatetime": now}, "$push": bson.M{"modify": record}},
  108. }
  109. }
  110. }(tmp)
  111. tmp = map[string]interface{}{}
  112. }
  113. wg.Wait()
  114. log.Info("taskinfoV1 over...", zap.Int("count:", count))
  115. }
  116. func taskinfoV2() {
  117. defer util.Catch()
  118. }
  119. // @Description entidlist
  120. // @Author J 2022/6/7 2:36 PM
  121. func companyFun(tmp map[string]interface{}) (cid []string) {
  122. sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  123. for _, w := range sWinnerarr {
  124. if w != "" {
  125. id := redis.GetStr("qyxy_id", w)
  126. if id == "" {
  127. id = "-"
  128. }
  129. cid = append(cid, id)
  130. }
  131. }
  132. return cid
  133. }