main.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package main
  2. import (
  3. "data_fusion/config"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  11. "reflect"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. ArrLock []sync.Mutex
  17. )
  18. func init() {
  19. config.Init("./common.toml")
  20. InitLog()
  21. InitMgo()
  22. redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db)
  23. initData()
  24. updatePool = make(chan []map[string]interface{}, 5000)
  25. updateSp = make(chan bool, 5)
  26. recordPool = make(chan []map[string]interface{}, 5000)
  27. recordSp = make(chan bool, 5)
  28. for i := 0; i < config.Conf.Serve.Thread; i++ {
  29. ArrLock = append(ArrLock, sync.Mutex{})
  30. }
  31. }
  32. func main() {
  33. go updateMethod()
  34. go updateMethod1()
  35. sess := MgoB.GetMgoConn()
  36. defer MgoB.DestoryMongoConn(sess)
  37. ch := make(chan bool, config.Conf.Serve.Thread)
  38. wg := &sync.WaitGroup{}
  39. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("635ff15d631ff1ac3d095c41")}
  40. q := bson.M{"extracttype": -1}
  41. f := map[string]interface{}{"detail": 0, "contenthtml": 0, "summary": 0, "description": 0}
  42. it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Select(f).Iter()
  43. count := 0
  44. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  45. if count%20000 == 0 {
  46. log.Info("main", zap.Int("current:", count))
  47. }
  48. ch <- true
  49. wg.Add(1)
  50. go func(tmp map[string]interface{}) {
  51. defer func() {
  52. <-ch
  53. wg.Done()
  54. }()
  55. repeatId := util.ObjToString(tmp["repeat_id"])
  56. ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Lock()
  57. if str := redis.GetStr("fusion_id", repeatId); str != "" {
  58. tmp1 := findData(repeatId)
  59. var update map[string]interface{}
  60. var fs []string
  61. update, fs = mergeTmp(tmp1, tmp)
  62. update1 := util.DeepCopy(update).(map[string]interface{})
  63. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  64. if fs != nil && len(fs) > 0 {
  65. updatePool <- []map[string]interface{}{
  66. {"_id": tmp1["_id"]},
  67. {"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
  68. }
  69. }
  70. record := make(map[string]interface{})
  71. record["$push"] = map[string]interface{}{
  72. "ids": mongodb.BsonIdToSId(tmp["_id"]),
  73. "record": update1,
  74. }
  75. recordPool <- []map[string]interface{}{
  76. {"_id": mongodb.StringTOBsonId(repeatId)},
  77. record,
  78. }
  79. redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
  80. } else {
  81. tmp1 := findData(repeatId)
  82. var update map[string]interface{}
  83. var fs []string
  84. update, fs = mergeTmp(tmp1, tmp)
  85. set := util.DeepCopy(update).(map[string]interface{})
  86. if fs != nil && len(fs) > 0 {
  87. set["fusion_fields"] = fs
  88. }
  89. if len(set) > 0 {
  90. updatePool <- []map[string]interface{}{
  91. {"_id": tmp1["_id"]},
  92. {"$set": set},
  93. }
  94. }
  95. record := make(map[string]interface{})
  96. record["_id"] = tmp1["_id"]
  97. record["template_id"] = mongodb.BsonIdToSId(tmp1["_id"])
  98. record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])}
  99. var recordlist []map[string]interface{}
  100. update1 := util.DeepCopy(update).(map[string]interface{})
  101. update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
  102. recordlist = append(recordlist, update1)
  103. record["record"] = recordlist
  104. recordPool <- []map[string]interface{}{
  105. {"_id": tmp1["_id"]},
  106. {"$set": record},
  107. }
  108. redis.PutCKV("fusion_id", repeatId, mongodb.BsonIdToSId(tmp["_id"]))
  109. }
  110. ArrLock[util.HashCode(repeatId)%config.Conf.Serve.Thread].Unlock()
  111. }(tmp)
  112. tmp = map[string]interface{}{}
  113. }
  114. wg.Wait()
  115. log.Info("fusion over...", zap.Int("count:", count))
  116. c := make(chan bool, 1)
  117. <-c
  118. }
  119. func findData(id string) map[string]interface{} {
  120. tmp, _ := MgoB.FindById(config.Conf.DB.Mongo.Coll, id, nil)
  121. if tmp != nil && len(*tmp) > 0 {
  122. return *tmp
  123. } else {
  124. tmp, _ = MgoB.FindById("bidding", id, nil)
  125. if tmp != nil && len(*tmp) > 0 {
  126. return *tmp
  127. }
  128. }
  129. return nil
  130. }
  131. func getWeight(tmp map[string]interface{}) (int, string) {
  132. var w int
  133. if util.IntAll(tmp["publishtime"]) <= 0 {
  134. return 0, "发布时间小于0"
  135. }
  136. if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 {
  137. return 0, "竞品网站数据"
  138. }
  139. for k, v := range config.Conf.Serve.Weight {
  140. if tmp[k] != nil {
  141. if k == "attachments" {
  142. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  143. if atts, o2 := pinfo["attachments"].(map[string]interface{}); o2 {
  144. if len(atts) > 0 {
  145. w += v
  146. }
  147. }
  148. }
  149. } else {
  150. if reflect.TypeOf(tmp[k]).String() == "string" {
  151. if util.ObjToString(tmp[k]) != "" {
  152. w += v
  153. }
  154. } else if reflect.TypeOf(tmp[k]).String() == "float64" {
  155. if util.Float64All(tmp[k]) > 0 {
  156. w += v
  157. }
  158. } else if reflect.TypeOf(tmp[k]).String() == "[]interface {}" {
  159. if len(tmp[k].([]interface{})) > 0 {
  160. w += v
  161. }
  162. } else {
  163. w += v
  164. }
  165. }
  166. }
  167. }
  168. return w, ""
  169. }
  170. // @Description tmp模版数据, tmp1补充数据
  171. // @Author J 2023/1/3 11:31
  172. func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) (map[string]interface{}, []string) {
  173. update := make(map[string]interface{})
  174. var fs []string
  175. for _, v := range config.Conf.Serve.Fields {
  176. if v == "attachments" {
  177. if pinfo1, ok1 := tmp1["projectinfo"].(map[string]interface{}); ok1 {
  178. if ats, ok2 := pinfo1[v].(map[string]interface{}); ok2 {
  179. if pinfo1[v] != nil && len(ats) > 0 {
  180. if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
  181. if pinfo[v] == nil {
  182. pinfo[v] = pinfo1[v]
  183. update["projectinfo"] = pinfo
  184. update["attach_text"] = tmp1["attach_text"] // 补充附件文本
  185. fs = append(fs, v)
  186. fs = append(fs, "attach_text")
  187. }
  188. } else {
  189. update["projectinfo"] = map[string]interface{}{v: pinfo1[v]}
  190. update["attach_text"] = tmp1["attach_text"]
  191. fs = append(fs, v)
  192. fs = append(fs, "attach_text")
  193. }
  194. }
  195. } else {
  196. if pinfo1[v] != nil {
  197. log.Error("mergeTmp err...", zap.Any("id", mongodb.BsonIdToSId(tmp1["_id"])))
  198. }
  199. }
  200. }
  201. } else if v == "city" {
  202. if util.ObjToString(tmp["area"]) == util.ObjToString(tmp1["area"]) {
  203. if tmp[v] == nil && tmp1[v] != nil {
  204. update[v] = util.ObjToString(tmp1[v])
  205. fs = append(fs, v)
  206. }
  207. }
  208. } else if v == "district" {
  209. if util.ObjToString(tmp["city"]) == util.ObjToString(tmp1["city"]) {
  210. if tmp[v] == nil && tmp1[v] != nil {
  211. update[v] = util.ObjToString(tmp1[v])
  212. fs = append(fs, v)
  213. }
  214. }
  215. } else {
  216. if tmp[v] == nil && tmp1[v] != nil {
  217. if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
  218. update[v] = util.ObjToString(tmp1[v])
  219. fs = append(fs, v)
  220. } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
  221. update[v] = tmp1[v]
  222. fs = append(fs, v)
  223. } else {
  224. update[v] = tmp1[v]
  225. fs = append(fs, v)
  226. }
  227. }
  228. }
  229. }
  230. return update, fs
  231. }
  232. func updateMethod() {
  233. arru := make([][]map[string]interface{}, 500)
  234. indexu := 0
  235. for {
  236. select {
  237. case v := <-recordPool:
  238. arru[indexu] = v
  239. indexu++
  240. if indexu == 500 {
  241. recordSp <- true
  242. go func(arru [][]map[string]interface{}) {
  243. defer func() {
  244. <-recordSp
  245. }()
  246. MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...)
  247. }(arru)
  248. arru = make([][]map[string]interface{}, 500)
  249. indexu = 0
  250. }
  251. case <-time.After(1000 * time.Millisecond):
  252. if indexu > 0 {
  253. recordSp <- true
  254. go func(arru [][]map[string]interface{}) {
  255. defer func() {
  256. <-recordSp
  257. }()
  258. MgoB.UpSertBulk(config.Conf.DB.Mongo.Record, arru...)
  259. }(arru[:indexu])
  260. arru = make([][]map[string]interface{}, 500)
  261. indexu = 0
  262. }
  263. }
  264. }
  265. }
  266. func updateMethod1() {
  267. arru := make([][]map[string]interface{}, 500)
  268. indexu := 0
  269. for {
  270. select {
  271. case v := <-updatePool:
  272. arru[indexu] = v
  273. indexu++
  274. if indexu == 500 {
  275. updateSp <- true
  276. go func(arru [][]map[string]interface{}) {
  277. defer func() {
  278. <-updateSp
  279. }()
  280. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  281. }(arru)
  282. arru = make([][]map[string]interface{}, 500)
  283. indexu = 0
  284. }
  285. case <-time.After(1000 * time.Millisecond):
  286. if indexu > 0 {
  287. updateSp <- true
  288. go func(arru [][]map[string]interface{}) {
  289. defer func() {
  290. <-updateSp
  291. }()
  292. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  293. }(arru[:indexu])
  294. arru = make([][]map[string]interface{}, 500)
  295. indexu = 0
  296. }
  297. }
  298. }
  299. }