main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package main
  2. import (
  3. "dataflow/util"
  4. "fmt"
  5. "log"
  6. "time"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  9. )
  10. var (
  11. conf util.Conf
  12. ThreadsLimit int
  13. Bidding *mongodb.MongodbSim
  14. jn *jnats.Jnats
  15. ProMsgInfo = make(chan *util.MsgInfo, 50000)
  16. ErrMsgInfo = make(chan *util.MsgInfo, 50000)
  17. )
  18. func init() {
  19. conf = util.GetConf()
  20. ThreadsLimit = conf.Config.Threads
  21. jn = jnats.NewJnats(conf.Config.Natsurl)
  22. Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.UserName, conf.Config.Mongodb.Password, conf.Config.Mongodb.DbSize)
  23. go ErrBulkSave()
  24. }
  25. func main() {
  26. step := conf.Config.Process
  27. subject := step.Subject
  28. //TODO 改造为读临时库 待确定读库模式
  29. //消息体
  30. for i := 0; i < 2; i++ {
  31. //消息体
  32. info := &util.MsgInfo{
  33. Id: fmt.Sprint(i),
  34. Data: map[string]interface{}{
  35. "title": "测试消息" + fmt.Sprint(i),
  36. "detail": "信息源码",
  37. },
  38. }
  39. ProMsgInfo <- info
  40. }
  41. for i := 0; i < ThreadsLimit; i++ {
  42. go func() {
  43. for {
  44. info := <-ProMsgInfo
  45. //按照步骤发布消息
  46. for _, st := range step.Steps {
  47. newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute)
  48. if err != nil {
  49. log.Println("err", info.Id, err)
  50. reMsg(info, subject, st) //消息重发
  51. util.Send("发布消息失败告警" + info.Id + err.Error()) //告警
  52. info.CurrSetp = st //保存错误
  53. ErrMsgInfo <- info
  54. break
  55. } else {
  56. if newinfo.Err != nil {
  57. util.Send("消息返回错误告警" + info.Id + newinfo.Err.Error()) //告警
  58. info.CurrSetp = st //保存错误
  59. ErrMsgInfo <- info
  60. } else {
  61. log.Println("消息发送成功", newinfo)
  62. info = newinfo
  63. }
  64. }
  65. }
  66. }
  67. }()
  68. }
  69. //订阅消息处理中间阶段
  70. select {}
  71. }
  72. func reMsg(msg *util.MsgInfo, subject, sts string) {
  73. isOk := false
  74. for _, st := range conf.Config.Process.Steps {
  75. if !isOk {
  76. if st == sts {
  77. isOk = true
  78. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  79. if err != nil {
  80. log.Println("err", msg.Id, err)
  81. util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警
  82. break
  83. } else {
  84. msg = newinfo
  85. }
  86. }
  87. } else {
  88. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  89. if err != nil {
  90. log.Println("err", msg.Id, err)
  91. util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警
  92. ErrMsgInfo <- msg //保存错误
  93. break
  94. } else {
  95. msg = newinfo
  96. }
  97. }
  98. }
  99. }
  100. // 错误数据保存
  101. func ErrBulkSave() {
  102. elist := []map[string]interface{}{}
  103. for {
  104. select {
  105. case data, ok := <-ErrMsgInfo:
  106. if ok {
  107. tmp := map[string]interface{}{}
  108. tmp["id"] = data.Id
  109. tmp["currsetp"] = data.CurrSetp
  110. tmp["comeintime"] = time.Now().Unix()
  111. tmp["data"] = data.Data
  112. elist = append(elist, tmp)
  113. if len(elist) >= 20 {
  114. Bidding.SaveBulk("msg_err", elist...)
  115. elist = []map[string]interface{}{}
  116. }
  117. }
  118. case <-time.After(5 * time.Second): // 设置超时时间秒
  119. if len(elist) > 0 {
  120. Bidding.SaveBulk("msg_err", elist...)
  121. elist = []map[string]interface{}{}
  122. }
  123. default:
  124. }
  125. }
  126. }