task.go 7.3 KB


  1. package main
  2. import (
  3. "data_clear_sync/config"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. func taskinfoV1() {
  16. defer util.Catch()
  17. sess := MongoV1.GetMgoConn()
  18. defer MongoV1.DestoryMongoConn(sess)
  19. ch := make(chan bool, config.Conf.Serve.Thread)
  20. wg := &sync.WaitGroup{}
  21. //q := bson.M{"_id": mongodb.StringTOBsonId("5caeb0bca5cb26b9b733eb0c")}
  22. f := bson.M{"contenthtml": 0, "description": 0, "detail": 0, "kvtext": 0, "projectinfo": 0}
  23. it := sess.DB(config.Conf.DB.Mongo1.Dbname).C(config.Conf.DB.Mongo1.Coll).Find(nil).Select(&f).Iter()
  24. count := 0
  25. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  26. if count%2000 == 0 {
  27. log.Info("taskinfoV1", zap.Int("current:", count))
  28. }
  29. ch <- true
  30. wg.Add(1)
  31. go func(tmp map[string]interface{}) {
  32. defer func() {
  33. <-ch
  34. wg.Done()
  35. }()
  36. id := mongodb.BsonIdToSId(tmp["_id"])
  37. info, _ := Mongo.FindById("bidding", id, f)
  38. coll := "bidding"
  39. if len(*info) == 0 {
  40. info, _ = Mongo.FindById("bidding_back", id, f)
  41. if len(*info) == 0 {
  42. util.Debug("not find id", id)
  43. return
  44. }
  45. coll = "bidding_back"
  46. }
  47. update := make(map[string]interface{})
  48. updateEs := make(map[string]interface{})
  49. mdinfo := make(map[string]interface{})
  50. record := make(map[string]interface{})
  51. for _, f := range Fields {
  52. if f == "budget" || f == "bidamount" {
  53. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) {
  54. update[f] = util.Float64All(tmp[f])
  55. record[f] = util.Float64All(tmp[f])
  56. updateEs[f] = util.Float64All(tmp[f])
  57. mdinfo[f] = "数据清洗"
  58. }
  59. } else if f == "s_winner" {
  60. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.Float64All(tmp[f]) != util.Float64All((*info)[f]) {
  61. update[f] = util.ObjToString(tmp[f])
  62. record[f] = util.ObjToString(tmp[f])
  63. updateEs[f] = util.ObjToString(tmp[f])
  64. cid := companyFun(update)
  65. update["entidlist"] = cid
  66. record["entidlist"] = cid
  67. updateEs["entidlist"] = cid
  68. }
  69. } else {
  70. if tmp[f] != nil && tmp[fmt.Sprintf("ck_%s", f)] != nil && util.ObjToString(tmp[f]) != util.ObjToString((*info)[f]) {
  71. update[f] = util.ObjToString(tmp[f])
  72. record[f] = util.ObjToString(tmp[f])
  73. updateEs[f] = util.ObjToString(tmp[f])
  74. mdinfo[f] = "数据清洗"
  75. }
  76. }
  77. }
  78. for _, f := range config.Conf.Serve.ExField {
  79. if f == "package" {
  80. }
  81. }
  82. if md, ok := (*info)["modifyinfo"].(map[string]interface{}); ok {
  83. for k, v := range mdinfo {
  84. md[k] = v
  85. }
  86. mdinfo = md
  87. }
  88. now := time.Now().Unix()
  89. record["updatetime"] = now
  90. record["modifypath"] = "数据清洗"
  91. if len(update) > 0 {
  92. updateEsPool <- []map[string]interface{}{
  93. {"_id": id},
  94. updateEs,
  95. }
  96. update["modifyinfo"] = mdinfo
  97. if coll == "bidding" {
  98. updatePool <- []map[string]interface{}{
  99. {"_id": mongodb.StringTOBsonId(id)},
  100. {"$set": update},
  101. }
  102. } else {
  103. Mongo.UpdateById("bidding_back", id, map[string]interface{}{"$set": update})
  104. }
  105. updateRcPool <- []map[string]interface{}{
  106. {"_id": mongodb.StringTOBsonId(id)},
  107. {"$set": bson.M{"updatetime": now}, "$push": bson.M{"modify": record}},
  108. }
  109. }
  110. }(tmp)
  111. tmp = map[string]interface{}{}
  112. }
  113. wg.Wait()
  114. log.Info("taskinfoV1 over...", zap.Int("count:", count))
  115. }
  116. func taskinfoV2() {
  117. defer util.Catch()
  118. sess := MongoV2.GetMgoConn()
  119. defer MongoV2.DestoryMongoConn(sess)
  120. ch := make(chan bool, config.Conf.Serve.Thread)
  121. wg := &sync.WaitGroup{}
  122. //q := bson.M{"_id": mongodb.StringTOBsonId("6229797d32770a446e976450")}
  123. f := bson.M{"v_baseinfo.contenthtml": 0, "v_baseinfo.description": 0, "v_baseinfo.detail": 0, "v_baseinfo.kvtext": 0, "v_baseinfo.projectinfo": 0}
  124. it := sess.DB(config.Conf.DB.Mongo2.Dbname).C(config.Conf.DB.Mongo2.Coll).Find(nil).Select(&f).Iter()
  125. count := 0
  126. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  127. if count%2000 == 0 {
  128. log.Info("taskinfoV2", zap.Int("current:", count))
  129. }
  130. ch <- true
  131. wg.Add(1)
  132. go func(tmp map[string]interface{}) {
  133. defer func() {
  134. <-ch
  135. wg.Done()
  136. }()
  137. id := mongodb.BsonIdToSId(tmp["_id"])
  138. binfo := util.ObjToMap(tmp["v_baseinfo"])
  139. tinfo := util.ObjToMap(tmp["v_taginfo"])
  140. info, _ := Mongo.FindById("bidding", id, f)
  141. coll := "bidding"
  142. if len(*info) == 0 {
  143. info, _ = Mongo.FindById("bidding_back", id, f)
  144. if len(*info) == 0 {
  145. util.Debug("not find id", id)
  146. return
  147. }
  148. coll = "bidding_back"
  149. }
  150. update := make(map[string]interface{})
  151. updateEs := make(map[string]interface{})
  152. mdinfo := make(map[string]interface{})
  153. record := make(map[string]interface{})
  154. for _, f := range Fields {
  155. if f == "budget" || f == "bidamount" {
  156. if (*tinfo)[f] != nil && util.Float64All((*binfo)[f]) != util.Float64All((*info)[f]) {
  157. update[f] = util.Float64All((*binfo)[f])
  158. record[f] = util.Float64All((*binfo)[f])
  159. updateEs[f] = util.Float64All((*binfo)[f])
  160. mdinfo[f] = "数据清洗"
  161. }
  162. } else if f == "s_winner" {
  163. if (*tinfo)[f] != nil && util.Float64All((*binfo)[f]) != util.Float64All((*info)[f]) {
  164. update[f] = util.ObjToString((*binfo)[f])
  165. record[f] = util.ObjToString((*binfo)[f])
  166. updateEs[f] = util.ObjToString((*binfo)[f])
  167. cid := companyFun(update)
  168. update["entidlist"] = cid
  169. record["entidlist"] = cid
  170. updateEs["entidlist"] = cid
  171. }
  172. } else {
  173. if (*tinfo)[f] != nil && util.ObjToString((*binfo)[f]) != util.ObjToString((*info)[f]) {
  174. update[f] = util.ObjToString((*binfo)[f])
  175. record[f] = util.ObjToString((*binfo)[f])
  176. updateEs[f] = util.ObjToString((*binfo)[f])
  177. mdinfo[f] = "数据清洗"
  178. }
  179. }
  180. }
  181. for _, f := range config.Conf.Serve.ExField {
  182. if f == "package" {
  183. }
  184. }
  185. if md, ok := (*info)["modifyinfo"].(map[string]interface{}); ok {
  186. for k, v := range mdinfo {
  187. md[k] = v
  188. }
  189. mdinfo = md
  190. }
  191. now := time.Now().Unix()
  192. record["updatetime"] = now
  193. record["modifypath"] = "数据清洗"
  194. if len(update) > 0 {
  195. updateEsPool <- []map[string]interface{}{
  196. {"_id": id},
  197. updateEs,
  198. }
  199. update["modifyinfo"] = mdinfo
  200. if coll == "bidding" {
  201. updatePool <- []map[string]interface{}{
  202. {"_id": mongodb.StringTOBsonId(id)},
  203. {"$set": update},
  204. }
  205. } else {
  206. Mongo.UpdateById("bidding_back", id, map[string]interface{}{"$set": update})
  207. }
  208. updateRcPool <- []map[string]interface{}{
  209. {"_id": mongodb.StringTOBsonId(id)},
  210. {"$set": bson.M{"updatetime": now}, "$push": bson.M{"modify": record}},
  211. }
  212. }
  213. }(tmp)
  214. tmp = map[string]interface{}{}
  215. }
  216. wg.Wait()
  217. log.Info("taskinfoV2 over...", zap.Int("count:", count))
  218. }
  219. // @Description entidlist
  220. // @Author J 2022/6/7 2:36 PM
  221. func companyFun(tmp map[string]interface{}) (cid []string) {
  222. sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  223. for _, w := range sWinnerarr {
  224. if w != "" {
  225. id := redis.GetStr("qyxy_id", w)
  226. if id == "" {
  227. id = "-"
  228. }
  229. cid = append(cid, id)
  230. }
  231. }
  232. return cid
  233. }