main.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package main
  2. import (
  3. "data_fusion/config"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  11. "reflect"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. ArrLock []sync.Mutex
  18. )
  19. func init() {
  20. config.Init("./common.toml")
  21. InitLog()
  22. InitMgo()
  23. redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db)
  24. initData()
  25. updatePool = make(chan []map[string]interface{}, 5000)
  26. updateSp = make(chan bool, 5)
  27. recordPool = make(chan []map[string]interface{}, 5000)
  28. recordSp = make(chan bool, 5)
  29. for i := 0; i < config.Conf.Serve.Thread; i++ {
  30. ArrLock = append(ArrLock, sync.Mutex{})
  31. }
  32. }
  33. func main() {
  34. go updateMethod()
  35. go updateMethod1()
  36. sess := MgoB.GetMgoConn()
  37. defer MgoB.DestoryMongoConn(sess)
  38. ch := make(chan bool, config.Conf.Serve.Thread)
  39. wg := &sync.WaitGroup{}
  40. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("635ff15d631ff1ac3d095c41")}
  41. //f := map[string]interface{}{"contenthtml": 0}
  42. it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(nil).Select(nil).Iter()
  43. count := 0
  44. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  45. if count%20000 == 0 {
  46. log.Info("main", zap.Int("current:", count))
  47. }
  48. ch <- true
  49. wg.Add(1)
  50. go func(tmp map[string]interface{}) {
  51. defer func() {
  52. <-ch
  53. wg.Done()
  54. }()
  55. if repeat := util.IntAll(tmp["extracttype"]); repeat != -1 {
  56. updatePool <- []map[string]interface{}{
  57. {"_id": tmp["_id"]},
  58. {"$set": bson.M{"fusion_tag": 1}},
  59. }
  60. } else {
  61. repeatId := util.ObjToString(tmp["repeat_id"])
  62. ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Lock()
  63. if str := redis.GetStr("fusion_id", repeatId); str != "" {
  64. mid := strings.Split(str, "-")[0]
  65. tmp1 := findData(mid)
  66. w, s := getWeight(tmp)
  67. w1, _ := getWeight(tmp1)
  68. var update map[string]interface{}
  69. var fs []string
  70. if w > w1 {
  71. update, fs = mergeTmp(tmp, tmp1)
  72. update1 := util.DeepCopy(update).(map[string]interface{})
  73. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  74. update1["weight"] = w
  75. update["fusion_tag"] = 1
  76. if fs != nil && len(fs) > 0 {
  77. updatePool <- []map[string]interface{}{
  78. {"_id": tmp["_id"]},
  79. {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  80. }
  81. } else {
  82. updatePool <- []map[string]interface{}{
  83. {"_id": tmp["_id"]},
  84. {"$set": update},
  85. }
  86. }
  87. updatePool <- []map[string]interface{}{
  88. {"_id": tmp1["_id"]},
  89. {"$set": bson.M{"fusion_tag": 0}},
  90. }
  91. record := make(map[string]interface{})
  92. record["$set"] = map[string]interface{}{
  93. "template_id": mongodb.BsonIdToSId(tmp["_id"]),
  94. "template_weight": w,
  95. }
  96. if w == 0 {
  97. update1["remark"] = s
  98. }
  99. record["$push"] = map[string]interface{}{
  100. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  101. "record": update1,
  102. }
  103. recordPool <- []map[string]interface{}{
  104. {"_id": mongodb.StringTOBsonId(repeatId)},
  105. record,
  106. }
  107. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str))
  108. } else {
  109. update, fs = mergeTmp(tmp1, tmp)
  110. update1 := util.DeepCopy(update).(map[string]interface{})
  111. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  112. update1["weight"] = w
  113. update["fusion_tag"] = 1
  114. if fs != nil && len(fs) > 0 {
  115. updatePool <- []map[string]interface{}{
  116. {"_id": (tmp1)["_id"]},
  117. {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  118. }
  119. } else {
  120. updatePool <- []map[string]interface{}{
  121. {"_id": (tmp1)["_id"]},
  122. {"$set": update},
  123. }
  124. }
  125. updatePool <- []map[string]interface{}{
  126. {"_id": (tmp)["_id"]},
  127. {"$set": bson.M{"fusion_tag": 0}},
  128. }
  129. record := make(map[string]interface{})
  130. record["$set"] = map[string]interface{}{
  131. "template_weight": w1,
  132. }
  133. if w == 0 {
  134. update1["remark"] = s
  135. }
  136. record["$push"] = map[string]interface{}{
  137. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  138. "record": update1,
  139. }
  140. recordPool <- []map[string]interface{}{
  141. {"_id": mongodb.StringTOBsonId(repeatId)},
  142. record,
  143. }
  144. redis.PutCKV("fusion_id", mid, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
  145. }
  146. } else {
  147. tmp1 := findData(repeatId)
  148. w, s := getWeight(tmp)
  149. w1, s1 := getWeight(tmp1)
  150. var update map[string]interface{}
  151. var fs []string
  152. if w > w1 {
  153. update, fs = mergeTmp(tmp, tmp1)
  154. set := util.DeepCopy(update).(map[string]interface{})
  155. if fs != nil && len(fs) > 0 {
  156. set["fusion_fields"] = fs
  157. }
  158. set["fusion_tag"] = 1
  159. updatePool <- []map[string]interface{}{
  160. {"_id": tmp["_id"]},
  161. {"$set": set},
  162. }
  163. updatePool <- []map[string]interface{}{
  164. {"_id": tmp1["_id"]},
  165. {"$set": bson.M{"fusion_tag": 0}},
  166. }
  167. record := make(map[string]interface{})
  168. record["_id"] = tmp1["_id"]
  169. record["template_id"] = mongodb.BsonIdToSId(tmp["_id"])
  170. record["template_weight"] = w
  171. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)}
  172. var recordlist []map[string]interface{}
  173. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w})
  174. update1 := util.DeepCopy(update).(map[string]interface{})
  175. update1["infoid"] = mongodb.BsonIdToSId(tmp1["_id"])
  176. update1["weight"] = w1
  177. if w1 == 0 {
  178. update1["remark"] = s1
  179. }
  180. recordlist = append(recordlist, update1)
  181. record["record"] = recordlist
  182. recordPool <- []map[string]interface{}{
  183. {"_id": tmp1["_id"]},
  184. {"$set": record},
  185. }
  186. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])))
  187. } else {
  188. update, fs = mergeTmp(tmp1, tmp)
  189. set := util.DeepCopy(update).(map[string]interface{})
  190. if fs != nil && len(fs) > 0 {
  191. set["fusion_fields"] = fs
  192. }
  193. set["fusion_tag"] = 1
  194. updatePool <- []map[string]interface{}{
  195. {"_id": tmp1["_id"]},
  196. {"$set": set},
  197. }
  198. updatePool <- []map[string]interface{}{
  199. {"_id": tmp["_id"]},
  200. {"$set": bson.M{"fusion_tag": 0}},
  201. }
  202. record := make(map[string]interface{})
  203. record["_id"] = tmp1["_id"]
  204. record["template_id"] = mongodb.BsonIdToSId(tmp1["_id"])
  205. record["template_weight"] = w1
  206. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])}
  207. var recordlist []map[string]interface{}
  208. recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp1["_id"]), "weight": w1})
  209. update1 := util.DeepCopy(update).(map[string]interface{})
  210. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  211. update1["weight"] = w
  212. if w == 0 {
  213. update1["remark"] = s
  214. }
  215. recordlist = append(recordlist, update1)
  216. record["record"] = recordlist
  217. recordPool <- []map[string]interface{}{
  218. {"_id": tmp1["_id"]},
  219. {"$set": record},
  220. }
  221. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp1["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
  222. }
  223. }
  224. ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Unlock()
  225. }
  226. }(tmp)
  227. tmp = map[string]interface{}{}
  228. }
  229. wg.Wait()
  230. log.Info("fusion over...", zap.Int("count:", count))
  231. c := make(chan bool, 1)
  232. <-c
  233. }
  234. func findData(id string) map[string]interface{} {
  235. tmp, _ := MgoB.FindById(config.Conf.DB.Mongo.Coll, id, nil)
  236. if tmp != nil && len(*tmp) > 0 {
  237. return *tmp
  238. } else {
  239. tmp, _ = MgoB.FindById("bidding", id, nil)
  240. if tmp != nil && len(*tmp) > 0 {
  241. return *tmp
  242. }
  243. }
  244. return nil
  245. }
  246. func getWeight(tmp map[string]interface{}) (int, string) {
  247. var w int
  248. if util.IntAll(tmp["publishtime"]) <= 0 {
  249. return 0, "发布时间小于0"
  250. }
  251. if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 {
  252. return 0, "竞品网站数据"
  253. }
  254. for k, v := range config.Conf.Serve.Weight {
  255. if tmp[k] != nil {
  256. if k == "attachments" {
  257. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  258. if atts, o2 := pinfo["attachments"].(map[string]interface{}); o2 {
  259. if len(atts) > 0 {
  260. w += v
  261. }
  262. }
  263. }
  264. } else {
  265. if reflect.TypeOf(tmp[k]).String() == "string" {
  266. if util.ObjToString(tmp[k]) != "" {
  267. w += v
  268. }
  269. } else if reflect.TypeOf(tmp[k]).String() == "float64" {
  270. if util.Float64All(tmp[k]) > 0 {
  271. w += v
  272. }
  273. } else if reflect.TypeOf(tmp[k]).String() == "[]interface {}" {
  274. if len(tmp[k].([]interface{})) > 0 {
  275. w += v
  276. }
  277. } else {
  278. w += v
  279. }
  280. }
  281. }
  282. }
  283. return w, ""
  284. }
  285. // @Description tmp模版数据, tmp1补充数据
  286. // @Author J 2023/1/3 11:31
  287. func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) (map[string]interface{}, []string) {
  288. update := make(map[string]interface{})
  289. var fs []string
  290. for _, v := range config.Conf.Serve.Fields {
  291. if v == "attachments" {
  292. if pinfo1, ok1 := tmp1["projectinfo"].(map[string]interface{}); ok1 {
  293. if ats, ok2 := pinfo1[v].(map[string]interface{}); ok2 {
  294. if pinfo1[v] != nil && len(ats) > 0 {
  295. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  296. if pinfo[v] == nil {
  297. pinfo[v] = pinfo1[v]
  298. update["projectinfo"] = pinfo
  299. update["attach_text"] = tmp1["attach_text"] // 补充附件文本
  300. fs = append(fs, v)
  301. fs = append(fs, "attach_text")
  302. }
  303. } else {
  304. update["projectinfo"] = map[string]interface{}{v: pinfo1[v]}
  305. update["attach_text"] = tmp1["attach_text"]
  306. fs = append(fs, v)
  307. fs = append(fs, "attach_text")
  308. }
  309. }
  310. } else {
  311. if pinfo1[v] != nil {
  312. log.Error("mergeTmp err...", zap.Any("id", mongodb.BsonIdToSId(tmp1["_id"])))
  313. }
  314. }
  315. }
  316. } else if v == "city" {
  317. if util.ObjToString(tmp["area"]) == util.ObjToString(tmp1["area"]) {
  318. if tmp[v] == nil && tmp1[v] != nil {
  319. update[v] = util.ObjToString(tmp1[v])
  320. fs = append(fs, v)
  321. }
  322. }
  323. } else if v == "district" {
  324. if util.ObjToString(tmp["city"]) == util.ObjToString(tmp1["city"]) {
  325. if tmp[v] == nil && tmp1[v] != nil {
  326. update[v] = util.ObjToString(tmp1[v])
  327. fs = append(fs, v)
  328. }
  329. }
  330. } else {
  331. if tmp[v] == nil && tmp1[v] != nil {
  332. if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
  333. update[v] = util.ObjToString(tmp1[v])
  334. fs = append(fs, v)
  335. } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
  336. update[v] = tmp1[v]
  337. fs = append(fs, v)
  338. } else {
  339. update[v] = tmp1[v]
  340. fs = append(fs, v)
  341. }
  342. }
  343. }
  344. }
  345. return update, fs
  346. }
  347. func updateMethod() {
  348. arru := make([][]map[string]interface{}, 500)
  349. indexu := 0
  350. for {
  351. select {
  352. case v := <-recordPool:
  353. arru[indexu] = v
  354. indexu++
  355. if indexu == 500 {
  356. recordSp <- true
  357. go func(arru [][]map[string]interface{}) {
  358. defer func() {
  359. <-recordSp
  360. }()
  361. MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...)
  362. }(arru)
  363. arru = make([][]map[string]interface{}, 500)
  364. indexu = 0
  365. }
  366. case <-time.After(1000 * time.Millisecond):
  367. if indexu > 0 {
  368. recordSp <- true
  369. go func(arru [][]map[string]interface{}) {
  370. defer func() {
  371. <-recordSp
  372. }()
  373. MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...)
  374. }(arru[:indexu])
  375. arru = make([][]map[string]interface{}, 500)
  376. indexu = 0
  377. }
  378. }
  379. }
  380. }
  381. func updateMethod1() {
  382. arru := make([][]map[string]interface{}, 500)
  383. indexu := 0
  384. for {
  385. select {
  386. case v := <-updatePool:
  387. arru[indexu] = v
  388. indexu++
  389. if indexu == 500 {
  390. updateSp <- true
  391. go func(arru [][]map[string]interface{}) {
  392. defer func() {
  393. <-updateSp
  394. }()
  395. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  396. }(arru)
  397. arru = make([][]map[string]interface{}, 500)
  398. indexu = 0
  399. }
  400. case <-time.After(1000 * time.Millisecond):
  401. if indexu > 0 {
  402. updateSp <- true
  403. go func(arru [][]map[string]interface{}) {
  404. defer func() {
  405. <-updateSp
  406. }()
  407. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  408. }(arru[:indexu])
  409. arru = make([][]map[string]interface{}, 500)
  410. indexu = 0
  411. }
  412. }
  413. }
  414. }