attachment.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package main
  2. import (
  3. "encoding/json"
  4. "esindex/config"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  10. "net"
  11. "sync"
  12. )
  13. //attachmentBiddingTask 附件补采入库es
  14. func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersData) {
  15. defer util.Catch()
  16. var MgoOther *mongodb.MongodbSim
  17. //初始化MongoDB
  18. MgoOther = &mongodb.MongodbSim{
  19. MongodbAddr: other.MgoAddr,
  20. DbName: other.MgoDB,
  21. Size: 10,
  22. UserName: other.MgoUsername,
  23. Password: other.MgoPassword,
  24. }
  25. MgoOther.InitPool()
  26. log.Info("attachmentBiddingTask", zap.Any("MgoOther", MgoOther))
  27. stype := util.ObjToString(mapInfo["stype"])
  28. q, _ := mapInfo["query"].(map[string]interface{})
  29. if q == nil {
  30. q = map[string]interface{}{
  31. "_id": map[string]interface{}{
  32. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  33. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  34. },
  35. }
  36. } else {
  37. //针对gte/lte,单独转换
  38. q = convertToMongoID(q)
  39. }
  40. ch := make(chan bool, 10)
  41. wg := &sync.WaitGroup{}
  42. //bidding库
  43. biddingConn := MgoOther.GetMgoConn()
  44. count, _ := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Count()
  45. log.Info(other.MgoColl, zap.Int64("同步总数:", count))
  46. it := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Select(map[string]interface{}{
  47. "contenthtml": 0,
  48. }).Iter()
  49. c1, index := 0, 0
  50. var indexLock sync.Mutex
  51. for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
  52. if c1%1000 == 0 {
  53. log.Info("attachmentBiddingTask", zap.Int("current:", c1))
  54. log.Info("attachmentBiddingTask", zap.Any("current:_id =>", tmp["_id"]))
  55. }
  56. ch <- true
  57. wg.Add(1)
  58. go func(tmp map[string]interface{}) {
  59. defer func() {
  60. <-ch
  61. wg.Done()
  62. }()
  63. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  64. tmp = make(map[string]interface{})
  65. return
  66. }
  67. //只针对增量数据处理;全量数据 需要用extracttype字段判断
  68. //7: 重复数据
  69. //8: 不重复
  70. if util.IntAll(tmp["dataprocess"]) != 8 {
  71. return
  72. }
  73. //// 增量数据使用上面判断;全量数据使用下面配置
  74. //-1:重复 ,1:不重复 ,0:入库 9:分类
  75. //if util.IntAll(tmp["extracttype"]) != 1 {
  76. // return
  77. //}
  78. //针对产权数据,暂时不入es 索引库
  79. if util.IntAll(tmp["infoformat"]) == 3 {
  80. return
  81. }
  82. /**
  83. 数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime
  84. 字段,所以下面判断才会处理
  85. */
  86. if stype == "bidding_history" && tmp["history_updatetime"] == nil {
  87. return
  88. }
  89. indexLock.Lock()
  90. index++
  91. indexLock.Unlock()
  92. newTmp, update := GetEsField(tmp, stype)
  93. newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
  94. //针对中国政府采购网,单独处理
  95. if util.ObjToString(tmp["site"]) == "中国政府采购网" {
  96. objectType := MatchService(tmp)
  97. if objectType != "" {
  98. newTmp["object_type"] = objectType
  99. }
  100. }
  101. if len(update) > 0 {
  102. updateBiddingPool <- []map[string]interface{}{{
  103. "_id": tmp["_id"],
  104. },
  105. {"$set": update},
  106. }
  107. }
  108. saveEsPool <- newTmp
  109. }(tmp)
  110. tmp = map[string]interface{}{}
  111. }
  112. wg.Wait()
  113. log.Info("attachmentBiddingTask over", zap.Int("count", c1), zap.Int("index", index))
  114. if other.NextAddr != "" {
  115. //发送udp,附件补采 才需要
  116. data := map[string]interface{}{
  117. //"stype": "update",
  118. "gtid": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  119. "lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  120. }
  121. //udp 传递的信息
  122. for k, v := range other.Data {
  123. data[k] = v
  124. }
  125. //下个udp 地址信息
  126. target := &net.UDPAddr{
  127. Port: other.NextPort,
  128. IP: net.ParseIP(other.NextAddr),
  129. }
  130. bytes, _ := json.Marshal(data)
  131. err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  132. if err != nil {
  133. log.Info("attachmentBiddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target))
  134. }
  135. log.Info("attachmentBiddingTask ", zap.Any("target", target), zap.Any("data", data))
  136. }
  137. }