main.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "data_fusion/config"
  8. "fmt"
  9. "go.uber.org/zap"
  10. "reflect"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. func init() {
  16. config.Init("./common.toml")
  17. InitLog()
  18. InitMgo()
  19. redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db)
  20. initData()
  21. updatePool = make(chan []map[string]interface{}, 5000)
  22. updateSp = make(chan bool, 5)
  23. recordPool = make(chan []map[string]interface{}, 5000)
  24. recordSp = make(chan bool, 5)
  25. }
  26. func main() {
  27. go updateMethod()
  28. go updateMethod1()
  29. sess := MgoB.GetMgoConn()
  30. defer MgoB.DestoryMongoConn(sess)
  31. ch := make(chan bool, config.Conf.Serve.Thread)
  32. wg := &sync.WaitGroup{}
  33. q := map[string]interface{}{"_id": mongodb.StringTOBsonId("639751bb063a7b816e026aa1")}
  34. it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(q).Select(nil).Iter()
  35. count := 0
  36. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  37. if count%2000 == 0 {
  38. log.Info("main", zap.Int("current:", count))
  39. }
  40. if repeat := util.IntAll(tmp["repeat"]); repeat != 1 {
  41. continue
  42. }
  43. ch <- true
  44. wg.Add(1)
  45. go func(tmp map[string]interface{}) {
  46. defer func() {
  47. <-ch
  48. wg.Done()
  49. }()
  50. repeatId := util.ObjToString(tmp["repeat_id"])
  51. if str := redis.GetStr("fusion_id", repeatId); str != "" {
  52. mid := strings.Split(str, "-")[0]
  53. tmp1, _ := MgoB.FindById("bidding_fusion", mid, nil)
  54. w, s := getWeight(tmp)
  55. w1, s1 := getWeight(*tmp1)
  56. util.Debug(w, s, w1, s1)
  57. var update map[string]interface{}
  58. if w > w1 {
  59. update = mergeTmp(tmp, *tmp1)
  60. //if len(update) > 0 {
  61. // updatePool <- []map[string]interface{}{
  62. // {"_id": tmp["_id"]},
  63. // {"$set": update},
  64. // }
  65. //}
  66. record := make(map[string]interface{})
  67. record["$set"] = map[string]interface{}{
  68. "template_id": mongodb.BsonIdToSId(tmp["_id"]),
  69. "template_weight": w,
  70. }
  71. update1 := util.DeepCopy(update).(map[string]interface{})
  72. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  73. update1["weight"] = w
  74. if w == 0 {
  75. update1["remark"] = s
  76. }
  77. record["$push"] = map[string]interface{}{
  78. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  79. "record": update1,
  80. }
  81. recordPool <- []map[string]interface{}{
  82. {"_id": mongodb.StringTOBsonId(repeatId)},
  83. record,
  84. }
  85. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str))
  86. } else {
  87. update = mergeTmp(*tmp1, tmp)
  88. //if len(update) > 0 {
  89. // updatePool <- []map[string]interface{}{
  90. // {"_id": (*tmp1)["_id"]},
  91. // {"$set": update},
  92. // }
  93. //}
  94. record := make(map[string]interface{})
  95. record["$set"] = map[string]interface{}{
  96. "template_weight": w1,
  97. }
  98. update1 := util.DeepCopy(update).(map[string]interface{})
  99. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  100. update1["weight"] = w
  101. if w == 0 {
  102. update1["remark"] = s
  103. }
  104. record["$push"] = map[string]interface{}{
  105. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  106. "record": update1,
  107. }
  108. recordPool <- []map[string]interface{}{
  109. {"_id": mongodb.StringTOBsonId(repeatId)},
  110. record,
  111. }
  112. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
  113. }
  114. } else {
  115. tmp1, _ := MgoB.FindById("bidding_fusion", repeatId, nil)
  116. w, s := getWeight(tmp)
  117. w1, s1 := getWeight(*tmp1)
  118. var update map[string]interface{}
  119. if w > w1 {
  120. update = mergeTmp(tmp, *tmp1)
  121. //if len(update) > 0 {
  122. // updatePool <- []map[string]interface{}{
  123. // {"_id": tmp["_id"]},
  124. // {"$set": update},
  125. // }
  126. //}
  127. record := make(map[string]interface{})
  128. record["_id"] = (*tmp1)["_id"]
  129. record["template_id"] = mongodb.BsonIdToSId(tmp["_id"])
  130. record["template_weight"] = w
  131. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)}
  132. var recordlist []map[string]interface{}
  133. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w})
  134. update1 := util.DeepCopy(update).(map[string]interface{})
  135. update1["infoid"] = mongodb.BsonIdToSId((*tmp1)["_id"])
  136. update1["weight"] = w1
  137. if w1 == 0 {
  138. update1["remark"] = s1
  139. }
  140. recordlist = append(recordlist, update1)
  141. record["record"] = recordlist
  142. recordPool <- []map[string]interface{}{
  143. {"_id": (*tmp1)["_id"]},
  144. {"$set": record},
  145. }
  146. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])))
  147. } else {
  148. update = mergeTmp(*tmp1, tmp)
  149. //if len(update) > 0 {
  150. // updatePool <- []map[string]interface{}{
  151. // {"_id": (*tmp1)["_id"]},
  152. // {"$set": update},
  153. // }
  154. //}
  155. record := make(map[string]interface{})
  156. record["_id"] = (*tmp1)["_id"]
  157. record["template_id"] = mongodb.BsonIdToSId((*tmp1)["_id"])
  158. record["template_weight"] = w1
  159. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])}
  160. var recordlist []map[string]interface{}
  161. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId((*tmp1)["_id"]), "weight": w1})
  162. update1 := util.DeepCopy(update).(map[string]interface{})
  163. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  164. update1["weight"] = w
  165. if w == 0 {
  166. update1["remark"] = s
  167. }
  168. recordlist = append(recordlist, update1)
  169. record["record"] = recordlist
  170. recordPool <- []map[string]interface{}{
  171. {"_id": (*tmp1)["_id"]},
  172. {"$set": record},
  173. }
  174. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId((*tmp1)["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
  175. }
  176. }
  177. }(tmp)
  178. tmp = map[string]interface{}{}
  179. }
  180. c := make(chan bool, 1)
  181. <-c
  182. }
  183. func getWeight(tmp map[string]interface{}) (int, string) {
  184. var w int
  185. if util.IntAll(tmp["publishtime"]) <= 0 {
  186. return 0, "发布时间小于0"
  187. }
  188. if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 {
  189. return 0, "竞品网站数据"
  190. }
  191. for k, v := range config.Conf.Serve.Weight {
  192. if tmp[k] != nil {
  193. util.Debug(k)
  194. if reflect.TypeOf(tmp[k]).String() == "string" {
  195. if util.ObjToString(tmp[k]) != "" {
  196. w += v
  197. }
  198. } else if reflect.TypeOf(tmp[k]).String() == "float64" {
  199. if util.Float64All(tmp[k]) > 0 {
  200. w += v
  201. }
  202. } else {
  203. w += v
  204. }
  205. }
  206. }
  207. return w, ""
  208. }
  209. // @Description tmp模版数据, tmp1补充数据
  210. // @Author J 2023/1/3 11:31
  211. func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) map[string]interface{} {
  212. update := make(map[string]interface{})
  213. for _, v := range config.Conf.Serve.Fields {
  214. if tmp[v] == nil && tmp1[v] != nil {
  215. if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
  216. update[v] = util.ObjToString(tmp1[v])
  217. } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
  218. update[v] = tmp1[v]
  219. } else {
  220. update[v] = tmp1[v]
  221. }
  222. }
  223. }
  224. return update
  225. }
  226. func updateMethod() {
  227. arru := make([][]map[string]interface{}, 500)
  228. indexu := 0
  229. for {
  230. select {
  231. case v := <-recordPool:
  232. arru[indexu] = v
  233. indexu++
  234. if indexu == 500 {
  235. recordSp <- true
  236. go func(arru [][]map[string]interface{}) {
  237. defer func() {
  238. <-recordSp
  239. }()
  240. MgoB.UpSertBulk("bidding_fusion_record", arru...)
  241. }(arru)
  242. arru = make([][]map[string]interface{}, 500)
  243. indexu = 0
  244. }
  245. case <-time.After(1000 * time.Millisecond):
  246. if indexu > 0 {
  247. recordSp <- true
  248. go func(arru [][]map[string]interface{}) {
  249. defer func() {
  250. <-recordSp
  251. }()
  252. MgoB.UpSertBulk("bidding_fusion_record", arru...)
  253. }(arru[:indexu])
  254. arru = make([][]map[string]interface{}, 500)
  255. indexu = 0
  256. }
  257. }
  258. }
  259. }
  260. func updateMethod1() {
  261. arru := make([][]map[string]interface{}, 500)
  262. indexu := 0
  263. for {
  264. select {
  265. case v := <-updatePool:
  266. arru[indexu] = v
  267. indexu++
  268. if indexu == 500 {
  269. updateSp <- true
  270. go func(arru [][]map[string]interface{}) {
  271. defer func() {
  272. <-updateSp
  273. }()
  274. MgoB.UpdateBulk("bidding_fusion", arru...)
  275. }(arru)
  276. arru = make([][]map[string]interface{}, 500)
  277. indexu = 0
  278. }
  279. case <-time.After(1000 * time.Millisecond):
  280. if indexu > 0 {
  281. updateSp <- true
  282. go func(arru [][]map[string]interface{}) {
  283. defer func() {
  284. <-updateSp
  285. }()
  286. MgoB.UpdateBulk("bidding_fusion", arru...)
  287. }(arru[:indexu])
  288. arru = make([][]map[string]interface{}, 500)
  289. indexu = 0
  290. }
  291. }
  292. }
  293. }