main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "data_fusion/config"
  8. "fmt"
  9. "go.uber.org/zap"
  10. "reflect"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. ArrLock []sync.Mutex
  17. )
  18. func init() {
  19. config.Init("./common.toml")
  20. InitLog()
  21. InitMgo()
  22. redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db)
  23. initData()
  24. updatePool = make(chan []map[string]interface{}, 5000)
  25. updateSp = make(chan bool, 5)
  26. recordPool = make(chan []map[string]interface{}, 5000)
  27. recordSp = make(chan bool, 5)
  28. for i := 0; i < config.Conf.Serve.Thread; i++ {
  29. ArrLock = append(ArrLock, sync.Mutex{})
  30. }
  31. }
  32. func main() {
  33. go updateMethod()
  34. go updateMethod1()
  35. sess := MgoB.GetMgoConn()
  36. defer MgoB.DestoryMongoConn(sess)
  37. ch := make(chan bool, config.Conf.Serve.Thread)
  38. wg := &sync.WaitGroup{}
  39. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("635ff15d631ff1ac3d095c41")}
  40. //f := map[string]interface{}{"contenthtml": 0}
  41. it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(nil).Select(nil).Iter()
  42. count := 0
  43. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  44. if count%20000 == 0 {
  45. log.Info("main", zap.Int("current:", count))
  46. }
  47. if repeat := util.IntAll(tmp["extracttype"]); repeat != -1 {
  48. continue
  49. }
  50. ch <- true
  51. wg.Add(1)
  52. rid := util.ObjToString(tmp["repeat_id"])
  53. ArrLock[util.HashCode(rid)%config.Conf.Serve.Thread].Lock()
  54. go func(tmp map[string]interface{}) {
  55. defer func(rid string) {
  56. <-ch
  57. wg.Done()
  58. ArrLock[util.HashCode(rid)%config.Conf.Serve.Thread].Unlock()
  59. }(rid)
  60. repeatId := util.ObjToString(tmp["repeat_id"])
  61. if str := redis.GetStr("fusion_id", repeatId); str != "" {
  62. mid := strings.Split(str, "-")[0]
  63. tmp1 := findData(mid)
  64. w, s := getWeight(tmp)
  65. w1, _ := getWeight(tmp1)
  66. var update map[string]interface{}
  67. var fs []string
  68. if w > w1 {
  69. update, fs = mergeTmp(tmp, tmp1)
  70. if len(update) > 0 {
  71. updatePool <- []map[string]interface{}{
  72. {"_id": tmp["_id"]},
  73. {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  74. //{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  75. }
  76. }
  77. record := make(map[string]interface{})
  78. record["$set"] = map[string]interface{}{
  79. "template_id": mongodb.BsonIdToSId(tmp["_id"]),
  80. "template_weight": w,
  81. }
  82. update1 := util.DeepCopy(update).(map[string]interface{})
  83. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  84. update1["weight"] = w
  85. if w == 0 {
  86. update1["remark"] = s
  87. }
  88. record["$push"] = map[string]interface{}{
  89. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  90. "record": update1,
  91. }
  92. recordPool <- []map[string]interface{}{
  93. {"_id": mongodb.StringTOBsonId(repeatId)},
  94. record,
  95. }
  96. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str))
  97. } else {
  98. update, fs = mergeTmp(tmp1, tmp)
  99. if len(update) > 0 {
  100. updatePool <- []map[string]interface{}{
  101. {"_id": (tmp1)["_id"]},
  102. {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  103. //{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  104. }
  105. }
  106. record := make(map[string]interface{})
  107. record["$set"] = map[string]interface{}{
  108. "template_weight": w1,
  109. }
  110. update1 := util.DeepCopy(update).(map[string]interface{})
  111. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  112. update1["weight"] = w
  113. if w == 0 {
  114. update1["remark"] = s
  115. }
  116. record["$push"] = map[string]interface{}{
  117. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  118. "record": update1,
  119. }
  120. recordPool <- []map[string]interface{}{
  121. {"_id": mongodb.StringTOBsonId(repeatId)},
  122. record,
  123. }
  124. redis.PutCKV("fusion_id", mid, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
  125. }
  126. } else {
  127. tmp1 := findData(repeatId)
  128. w, s := getWeight(tmp)
  129. w1, s1 := getWeight(tmp1)
  130. var update map[string]interface{}
  131. var fs []string
  132. if w > w1 {
  133. update, fs = mergeTmp(tmp, tmp1)
  134. if len(update) > 0 {
  135. set := util.DeepCopy(update).(map[string]interface{})
  136. set["fusion_fields"] = fs
  137. updatePool <- []map[string]interface{}{
  138. {"_id": tmp["_id"]},
  139. {"$set": set},
  140. //{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  141. //{"$set": map[string]interface{}{"fusion_fields": fs}},
  142. }
  143. }
  144. record := make(map[string]interface{})
  145. record["_id"] = tmp1["_id"]
  146. record["template_id"] = mongodb.BsonIdToSId(tmp["_id"])
  147. record["template_weight"] = w
  148. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)}
  149. var recordlist []map[string]interface{}
  150. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w})
  151. update1 := util.DeepCopy(update).(map[string]interface{})
  152. update1["infoid"] = mongodb.BsonIdToSId(tmp1["_id"])
  153. update1["weight"] = w1
  154. if w1 == 0 {
  155. update1["remark"] = s1
  156. }
  157. recordlist = append(recordlist, update1)
  158. record["record"] = recordlist
  159. recordPool <- []map[string]interface{}{
  160. {"_id": tmp1["_id"]},
  161. {"$set": record},
  162. }
  163. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])))
  164. } else {
  165. update, fs = mergeTmp(tmp1, tmp)
  166. if len(update) > 0 {
  167. set := util.DeepCopy(update).(map[string]interface{})
  168. set["fusion_fields"] = fs
  169. updatePool <- []map[string]interface{}{
  170. {"_id": tmp1["_id"]},
  171. {"$set": set},
  172. //{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  173. //{"$set": map[string]interface{}{"fusion_fields": fs}},
  174. }
  175. }
  176. record := make(map[string]interface{})
  177. record["_id"] = tmp1["_id"]
  178. record["template_id"] = mongodb.BsonIdToSId(tmp1["_id"])
  179. record["template_weight"] = w1
  180. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])}
  181. var recordlist []map[string]interface{}
  182. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp1["_id"]), "weight": w1})
  183. update1 := util.DeepCopy(update).(map[string]interface{})
  184. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  185. update1["weight"] = w
  186. if w == 0 {
  187. update1["remark"] = s
  188. }
  189. recordlist = append(recordlist, update1)
  190. record["record"] = recordlist
  191. recordPool <- []map[string]interface{}{
  192. {"_id": tmp1["_id"]},
  193. {"$set": record},
  194. }
  195. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp1["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
  196. }
  197. }
  198. }(tmp)
  199. tmp = map[string]interface{}{}
  200. }
  201. wg.Wait()
  202. log.Info("fusion over...", zap.Int("count:", count))
  203. c := make(chan bool, 1)
  204. <-c
  205. }
  206. func findData(id string) map[string]interface{} {
  207. tmp, _ := MgoB.FindById("bidding_fusion", id, nil)
  208. if tmp != nil && len(*tmp) > 0 {
  209. return *tmp
  210. } else {
  211. tmp, _ = MgoB.FindById("bidding", id, nil)
  212. if tmp != nil && len(*tmp) > 0 {
  213. return *tmp
  214. }
  215. }
  216. return nil
  217. }
  218. func getWeight(tmp map[string]interface{}) (int, string) {
  219. var w int
  220. if util.IntAll(tmp["publishtime"]) <= 0 {
  221. return 0, "发布时间小于0"
  222. }
  223. if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 {
  224. return 0, "竞品网站数据"
  225. }
  226. for k, v := range config.Conf.Serve.Weight {
  227. if tmp[k] != nil {
  228. if k == "attachments" {
  229. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  230. if atts, o2 := pinfo["attachments"].(map[string]interface{}); o2 {
  231. if len(atts) > 0 {
  232. w += v
  233. }
  234. }
  235. }
  236. } else {
  237. if reflect.TypeOf(tmp[k]).String() == "string" {
  238. if util.ObjToString(tmp[k]) != "" {
  239. w += v
  240. }
  241. } else if reflect.TypeOf(tmp[k]).String() == "float64" {
  242. if util.Float64All(tmp[k]) > 0 {
  243. w += v
  244. }
  245. } else if reflect.TypeOf(tmp[k]).String() == "[]interface {}" {
  246. if len(tmp[k].([]interface{})) > 0 {
  247. w += v
  248. }
  249. } else {
  250. w += v
  251. }
  252. }
  253. }
  254. }
  255. return w, ""
  256. }
  257. // @Description tmp模版数据, tmp1补充数据
  258. // @Author J 2023/1/3 11:31
  259. func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) (map[string]interface{}, []string) {
  260. update := make(map[string]interface{})
  261. var fs []string
  262. for _, v := range config.Conf.Serve.Fields {
  263. if v == "attachments" {
  264. if pinfo1, ok1 := tmp1["projectinfo"].(map[string]interface{}); ok1 {
  265. if ats, ok2 := pinfo1[v].(map[string]interface{}); ok2 {
  266. if pinfo1[v] != nil && len(ats) > 0 {
  267. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  268. if pinfo[v] == nil {
  269. pinfo[v] = pinfo1[v]
  270. update["projectinfo"] = pinfo
  271. update["attach_text"] = tmp1["attach_text"] // 补充附件文本
  272. fs = append(fs, v)
  273. fs = append(fs, "attach_text")
  274. }
  275. } else {
  276. update["projectinfo"] = map[string]interface{}{v: pinfo1[v]}
  277. update["attach_text"] = tmp1["attach_text"]
  278. fs = append(fs, v)
  279. fs = append(fs, "attach_text")
  280. }
  281. }
  282. } else {
  283. if pinfo1[v] != nil {
  284. log.Error("mergeTmp err...", zap.Any("id", mongodb.BsonIdToSId(tmp1["_id"])))
  285. }
  286. }
  287. }
  288. } else {
  289. if tmp[v] == nil && tmp1[v] != nil {
  290. if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
  291. update[v] = util.ObjToString(tmp1[v])
  292. fs = append(fs, v)
  293. } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
  294. update[v] = tmp1[v]
  295. fs = append(fs, v)
  296. } else {
  297. update[v] = tmp1[v]
  298. fs = append(fs, v)
  299. }
  300. }
  301. }
  302. }
  303. return update, fs
  304. }
  305. func updateMethod() {
  306. arru := make([][]map[string]interface{}, 500)
  307. indexu := 0
  308. for {
  309. select {
  310. case v := <-recordPool:
  311. arru[indexu] = v
  312. indexu++
  313. if indexu == 500 {
  314. recordSp <- true
  315. go func(arru [][]map[string]interface{}) {
  316. defer func() {
  317. <-recordSp
  318. }()
  319. MgoB.UpSertBulk("bidding_fusion_record", arru...)
  320. }(arru)
  321. arru = make([][]map[string]interface{}, 500)
  322. indexu = 0
  323. }
  324. case <-time.After(1000 * time.Millisecond):
  325. if indexu > 0 {
  326. recordSp <- true
  327. go func(arru [][]map[string]interface{}) {
  328. defer func() {
  329. <-recordSp
  330. }()
  331. MgoB.UpSertBulk("bidding_fusion_record", arru...)
  332. }(arru[:indexu])
  333. arru = make([][]map[string]interface{}, 500)
  334. indexu = 0
  335. }
  336. }
  337. }
  338. }
  339. func updateMethod1() {
  340. arru := make([][]map[string]interface{}, 500)
  341. indexu := 0
  342. for {
  343. select {
  344. case v := <-updatePool:
  345. arru[indexu] = v
  346. indexu++
  347. if indexu == 500 {
  348. updateSp <- true
  349. go func(arru [][]map[string]interface{}) {
  350. defer func() {
  351. <-updateSp
  352. }()
  353. MgoB.UpdateBulk("bidding_fusion", arru...)
  354. }(arru)
  355. arru = make([][]map[string]interface{}, 500)
  356. indexu = 0
  357. }
  358. case <-time.After(1000 * time.Millisecond):
  359. if indexu > 0 {
  360. updateSp <- true
  361. go func(arru [][]map[string]interface{}) {
  362. defer func() {
  363. <-updateSp
  364. }()
  365. MgoB.UpdateBulk("bidding_fusion", arru...)
  366. }(arru[:indexu])
  367. arru = make([][]map[string]interface{}, 500)
  368. indexu = 0
  369. }
  370. }
  371. }
  372. }