main.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package main
  2. import (
  3. elastic "app.yhyue.com/moapp/jybase/es"
  4. "app.yhyue.com/moapp/jybase/mongodb"
  5. "dataflow/util"
  6. "fmt"
  7. "github.com/nats-io/nats.go"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  10. u "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  12. "log"
  13. "regexp"
  14. "time"
  15. )
  16. const MGO_SAVE, MGO_UPDATE = "s", "u"
  17. var (
  18. conf util.Conf
  19. ThreadsLimit int
  20. MgoB *mongodb.MongodbSim
  21. MgoQ *mongodb.MongodbSim
  22. MgoP *mongodb.MongodbSim
  23. jn *jnats.Jnats
  24. NatsThreads chan bool
  25. Es elastic.Es
  26. //mgo
  27. DataSaveCache = make(chan map[string]interface{}, 1000)
  28. DataUpdateCache = make(chan []map[string]interface{}, 1000)
  29. DataSaveThreads = make(chan bool, 5)
  30. DataUpdateThreads = make(chan bool, 5)
  31. //other
  32. regLetter = regexp.MustCompile("[a-z]*")
  33. filterFileType = regexp.MustCompile("(jpg|jpeg|png|pdf)")
  34. )
  35. func init() {
  36. conf = util.GetConf()
  37. jn = jnats.NewJnats(conf.Config.Natsurl)
  38. MgoB = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
  39. MgoQ = mongodb.NewMgoWithUser(conf.Config.MongodbQ.Addr, conf.Config.MongodbQ.Dbname, conf.Config.MongodbQ.Username, conf.Config.MongodbQ.Password, conf.Config.MongodbQ.Dbsize)
  40. MgoP = mongodb.NewMgoWithUser(conf.Config.MongodbP.Addr, conf.Config.MongodbP.Dbname, conf.Config.MongodbP.Username, conf.Config.MongodbP.Password, conf.Config.MongodbP.Dbsize)
  41. redis.InitRedis1(conf.Config.Redis.Addr, conf.Config.Redis.DbIndex)
  42. InitOss()
  43. InitFileInfo()
  44. InitKeywordClient()
  45. InitEs()
  46. NatsThreads = make(chan bool, conf.Config.Threads)
  47. }
  48. func main() {
  49. go SaveBulkData()
  50. go UpdateBulkData()
  51. SubscribeNats()
  52. select {}
  53. }
  54. func SubscribeNats() {
  55. //先消费,带压缩
  56. jn.SubZip(conf.Config.Process.Subject+"."+conf.Config.Process.Step, func(msg *nats.Msg) {
  57. NatsThreads <- true
  58. go func(msg *nats.Msg) {
  59. defer func() {
  60. <-NatsThreads
  61. }()
  62. data := &util.MsgInfo{}
  63. err := bson.Unmarshal(msg.Data, &data)
  64. if err != nil {
  65. log.Println("解析数据失败:", err)
  66. data.Err = err.Error()
  67. //SaveErrData()//保存异常数据
  68. } else {
  69. //处理数据
  70. data.Stime = time.Now().Unix()
  71. data.CurrSetp = conf.Config.Process.Step
  72. if data.Extend.MgoSave.SType == MGO_SAVE { //保存
  73. SaveDealData(data.Data)
  74. } else if data.Extend.MgoSave.SType == MGO_UPDATE { //更新
  75. UpdateDealData(data.Id, data.Data)
  76. }
  77. data.Etime = time.Now().Unix()
  78. }
  79. //消息回写
  80. bs, _ := bson.Marshal(data)
  81. err = msg.Respond(bs)
  82. if err != nil {
  83. fmt.Println("回执失败:", data.Id, data.Data["_id"])
  84. //SaveErrData()//保存异常数据
  85. }
  86. }(msg)
  87. })
  88. }
  89. // 保存
  90. func SaveDealData(data map[string]interface{}) {
  91. //分类及部分字段处理
  92. fieldFun(data)
  93. //补充publishtime
  94. if u.IntAll(data["publishtime"]) == -1 {
  95. methodPb(data) //修正发布时间
  96. }
  97. //keyword关键词
  98. DealInfo(map[string]interface{}{}, data)
  99. // entidlist
  100. if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" {
  101. cid := companyFun(s_winner)
  102. if len(cid) > 0 {
  103. data["entidlist"] = cid
  104. }
  105. }
  106. //剑鱼发布信息分类处理
  107. //typeFunc(data)//单独数据流处理
  108. // 附件有效字段
  109. if r := validFile(data); r != 0 {
  110. if r == -1 {
  111. data["isValidFile"] = false
  112. } else {
  113. data["isValidFile"] = true
  114. }
  115. }
  116. //情报标签字段
  117. if data["tag_topinformation"] != nil {
  118. data["tag_set"] = getTagSet(data)
  119. }
  120. //放入通道
  121. DataSaveCache <- data
  122. }
  123. // 更新
  124. func UpdateDealData(id string, data map[string]interface{}) {
  125. fields := map[string]interface{}{
  126. "detail": 0,
  127. "contenthtml": 0,
  128. }
  129. bid, _ := MgoB.FindById("bidding", id, fields)
  130. if bid != nil && len(*bid) > 0 { //找到数据
  131. modifyinfo := make(map[string]bool)
  132. if tmpmodifyinfo, ok := (*bid)["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
  133. for k := range tmpmodifyinfo {
  134. modifyinfo[k] = true
  135. }
  136. }
  137. update := map[string]interface{}{}
  138. del := map[string]interface{}{}
  139. for _, k := range conf.Config.Fields {
  140. tmpV := data[k] //extract v1
  141. bidV := (*bid)[k] //bidding v2
  142. if bidV == nil && tmpV != nil {
  143. update[k] = tmpV
  144. } else if bidV != nil && tmpV != nil && !modifyinfo[k] {
  145. update[k] = tmpV
  146. } else if bidV != nil && tmpV == nil && !modifyinfo[k] {
  147. if k == "s_subscopeclass" && del["subscopeclass"] == nil {
  148. continue
  149. } else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
  150. continue
  151. }
  152. del[k] = 1
  153. }
  154. }
  155. //分类及部分字段处理
  156. fieldFun(data)
  157. //publishtime
  158. if u.IntAll(data["publishtime"]) == -1 {
  159. methodPb(data) //修正发布时间
  160. }
  161. // entidlist
  162. if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" {
  163. cid := companyFun(s_winner)
  164. if len(cid) > 0 {
  165. data["entidlist"] = cid
  166. }
  167. }
  168. //剑鱼发布信息分类处理
  169. //typeFunc(data)
  170. // 附件有效字段
  171. //if r := validFile(data); r != 0 {
  172. // if r == -1 {
  173. // data["isValidFile"] = false
  174. // } else {
  175. // data["isValidFile"] = true
  176. // }
  177. //}
  178. //情报标签字段
  179. //if data["tag_topinformation"] != nil {
  180. // data["tag_set"] = getTagSet(data)
  181. //}
  182. //放入通道
  183. if len(del) > 0 {
  184. DataUpdateCache <- []map[string]interface{}{
  185. {"_id": mongodb.StringTOBsonId(id)},
  186. {"$set": update, "$unset": del},
  187. }
  188. } else {
  189. DataUpdateCache <- []map[string]interface{}{
  190. {"_id": mongodb.StringTOBsonId(id)},
  191. {"$set": update, "$unset": del},
  192. }
  193. }
  194. } else { //未找到数据
  195. log.Println("未找到bidding数据:", id)
  196. }
  197. }
  198. // 批量保存data_bak
  199. func SaveBulkData() {
  200. log.Println("Save Data...")
  201. savearr := make([]map[string]interface{}, 200)
  202. index_save := 0
  203. for {
  204. select {
  205. case v := <-DataSaveCache:
  206. savearr[index_save] = v
  207. index_save++
  208. if index_save == 20 {
  209. DataSaveThreads <- true
  210. go func(tmp []map[string]interface{}) {
  211. defer func() {
  212. <-DataSaveThreads
  213. }()
  214. MgoB.SaveBulk("bidding", tmp...)
  215. }(savearr)
  216. savearr = make([]map[string]interface{}, 200)
  217. index_save = 0
  218. }
  219. case <-time.After(5 * time.Second):
  220. if index_save > 0 {
  221. DataSaveThreads <- true
  222. go func(tmp []map[string]interface{}) {
  223. defer func() {
  224. <-DataSaveThreads
  225. }()
  226. MgoB.SaveBulk("bidding", tmp...)
  227. }(savearr[:index_save])
  228. savearr = make([]map[string]interface{}, 200)
  229. index_save = 0
  230. }
  231. }
  232. }
  233. }
  234. // 批量更新心跳信息
  235. func UpdateBulkData() {
  236. log.Println("Update Data...")
  237. heartarr := make([][]map[string]interface{}, 200)
  238. index_update := 0
  239. for {
  240. select {
  241. case v := <-DataUpdateCache:
  242. heartarr[index_update] = v
  243. index_update++
  244. if index_update == 20 {
  245. DataUpdateThreads <- true
  246. go func(tmp [][]map[string]interface{}) {
  247. defer func() {
  248. <-DataUpdateThreads
  249. }()
  250. MgoB.UpdateBulk("bidding", tmp...)
  251. }(heartarr)
  252. heartarr = make([][]map[string]interface{}, 200)
  253. index_update = 0
  254. }
  255. case <-time.After(5 * time.Second):
  256. if index_update > 0 {
  257. DataUpdateThreads <- true
  258. go func(tmp [][]map[string]interface{}) {
  259. defer func() {
  260. <-DataUpdateThreads
  261. }()
  262. MgoB.UpSertBulk("bidding", tmp...)
  263. }(heartarr[:index_update])
  264. heartarr = make([][]map[string]interface{}, 200)
  265. index_update = 0
  266. }
  267. }
  268. }
  269. }