main.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package main
  2. import (
  3. "dataflow/util"
  4. "log"
  5. "strings"
  6. "time"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. "github.com/nats-io/nats.go"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  11. )
  12. var (
  13. conf util.Conf
  14. ThreadsLimit int
  15. Bidding *mongodb.MongodbSim
  16. jn *jnats.Jnats
  17. ProMsgInfo = make(chan *util.MsgInfo, 50000)
  18. ErrMsgInfo = make(chan *util.MsgInfo, 50000)
  19. )
  20. func init() {
  21. conf = util.GetConf()
  22. jn = jnats.NewJnats(conf.Config.Natsurl)
  23. Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
  24. go PushInfo()
  25. go ErrBulkSave()
  26. }
  27. func main() {
  28. step := conf.Config.Process
  29. subject := step.Subject
  30. jn.SubZip(subject+"."+step.Step, func(msg *nats.Msg) {
  31. data := &util.MsgInfo{}
  32. err := bson.Unmarshal(msg.Data, &data)
  33. if err != nil {
  34. log.Println(err)
  35. } else {
  36. ProMsgInfo <- data
  37. }
  38. })
  39. select {}
  40. }
  41. func PushInfo() {
  42. step := conf.Config.Process
  43. subject := step.Subject
  44. ThreadsLimit = conf.Config.Threads
  45. for i := 0; i < ThreadsLimit; i++ {
  46. go func() {
  47. for {
  48. select {
  49. case info := <-ProMsgInfo:
  50. var err error
  51. for _, st := range step.Steps {
  52. if strings.Contains(st, "|") {
  53. st1 := strings.Split(st, "|")[0]
  54. st2 := strings.Split(st, "|")[1]
  55. if info.Extend.File.IsFile == 1 {
  56. info, err = sendMsg(jn, subject, st1, info)
  57. if err != nil {
  58. break
  59. }
  60. } else {
  61. info, err = sendMsg(jn, subject, st2, info)
  62. if err != nil {
  63. break
  64. }
  65. }
  66. } else {
  67. info, err = sendMsg(jn, subject, st, info)
  68. if err != nil {
  69. break
  70. }
  71. }
  72. }
  73. }
  74. }
  75. }()
  76. }
  77. }
  78. func sendMsg(jn *jnats.Jnats, subject, st string, info *util.MsgInfo) (*util.MsgInfo, error) {
  79. newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute)
  80. if err != nil {
  81. log.Println("err", info.Id, err)
  82. // reMsg(info, subject, st) //消息重发 是否需要?
  83. util.Send("发布消息失败告警" + info.Id + err.Error()) //告警 频次考虑控制
  84. info.CurrSetp = st //保存错误
  85. ErrMsgInfo <- info
  86. return info, err
  87. } else {
  88. if newinfo.Err != "" {
  89. util.Send("消息返回错误告警" + info.Id + newinfo.Err) //告警
  90. info.CurrSetp = st //保存错误
  91. ErrMsgInfo <- info
  92. } else {
  93. log.Println("消息发送成功")
  94. info = newinfo
  95. ErrMsgInfo <- info
  96. }
  97. return info, nil
  98. }
  99. }
  100. func reMsg(msg *util.MsgInfo, subject, sts string) {
  101. isOk := false
  102. for _, st := range conf.Config.Process.Steps {
  103. if !isOk {
  104. if st == sts {
  105. isOk = true
  106. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  107. if err != nil {
  108. log.Println("err", msg.Id, err)
  109. util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警
  110. break
  111. } else {
  112. msg = newinfo
  113. }
  114. }
  115. } else {
  116. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  117. if err != nil {
  118. log.Println("err", msg.Id, err)
  119. util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警
  120. ErrMsgInfo <- msg //保存错误
  121. break
  122. } else {
  123. msg = newinfo
  124. }
  125. }
  126. }
  127. }
  128. // 错误数据保存
  129. func ErrBulkSave() {
  130. for {
  131. select {
  132. case data := <-ErrMsgInfo:
  133. log.Println("接收到错误数据")
  134. // dataMap := map[string]interface{}{}
  135. // bson.Unmarshal(data.Data, &dataMap)
  136. tmp := map[string]interface{}{
  137. "id": data.Id,
  138. "currsetp": data.CurrSetp,
  139. "nextsetp": data.NextSetp,
  140. "err": data.Err,
  141. "extend": data.Extend,
  142. "comeintime": time.Now().Unix(),
  143. "sTime": data.Stime,
  144. "eTime": data.Etime,
  145. "data": data.Data,
  146. }
  147. tmpid := Bidding.Save("msg_err", tmp)
  148. if tmpid != "" {
  149. log.Println("保存成功")
  150. } else {
  151. log.Println("保存失败~~~")
  152. }
  153. }
  154. }
  155. }