main.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package main
  2. import (
  3. "dataflow/util"
  4. "fmt"
  5. "log"
  6. "time"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  9. )
  10. var (
  11. conf util.Conf
  12. ThreadsLimit int
  13. Bidding *mongodb.MongodbSim
  14. jn *jnats.Jnats
  15. ProMsgInfo = make(chan *util.MsgInfo, 50000)
  16. ErrMsgInfo = make(chan *util.MsgInfo, 50000)
  17. )
  18. func init() {
  19. conf = util.GetConf()
  20. jn = jnats.NewJnats(conf.Config.Natsurl)
  21. Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
  22. go PushInfo()
  23. go ErrBulkSave()
  24. }
  25. func main() {
  26. // 5秒定时 考虑增加流程暂停
  27. //消息体
  28. // for i := 0; i < 2; i++ {
  29. // //消息体
  30. // info := &util.MsgInfo{
  31. // Id: fmt.Sprint(i),
  32. // Data: map[string]interface{}{
  33. // "title": "测试消息" + fmt.Sprint(i),
  34. // "detail": "信息源码",
  35. // "time": time.Now().Unix(),
  36. // },
  37. // }
  38. // ProMsgInfo <- info
  39. // }
  40. data, ok := Bidding.FindById("bidding", "6007c2af7cdc9beb5638ede5", nil)
  41. if ok && data != nil {
  42. // datastr, _ := bson.Marshal(*data)
  43. info := &util.MsgInfo{
  44. Id: fmt.Sprint("1"),
  45. Data: *data,
  46. Stime: time.Now().Unix(),
  47. }
  48. ProMsgInfo <- info
  49. }
  50. // session := Bidding.GetMgoConn()
  51. // query := map[string]interface{}{}
  52. // defer Bidding.DestoryMongoConn(session)
  53. // iter := session.DB("").C("").Find(&query).Sort("-_id").Iter()
  54. // thisData := map[string]interface{}{}
  55. // for {
  56. // if !iter.Next(&thisData) {
  57. // break
  58. // }
  59. // thisData = map[string]interface{}{}
  60. // }
  61. select {}
  62. }
  63. func PushInfo() {
  64. step := conf.Config.Process
  65. subject := step.Subject
  66. ThreadsLimit = conf.Config.Threads
  67. for i := 0; i < ThreadsLimit; i++ {
  68. go func() {
  69. for {
  70. select {
  71. case info := <-ProMsgInfo:
  72. var err error
  73. for _, st := range step.Steps {
  74. info, err = sendMsg(jn, subject, st, info)
  75. if err != nil {
  76. break
  77. }
  78. }
  79. }
  80. }
  81. }()
  82. }
  83. }
  84. func sendMsg(jn *jnats.Jnats, subject, st string, info *util.MsgInfo) (*util.MsgInfo, error) {
  85. newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute)
  86. if err != nil {
  87. log.Println("err", info.Id, err)
  88. // reMsg(info, subject, st) //消息重发 是否需要?
  89. util.Send("发布消息失败告警" + info.Id + err.Error()) //告警 频次考虑控制
  90. info.CurrSetp = st //保存错误
  91. ErrMsgInfo <- info
  92. return info, err
  93. } else {
  94. if newinfo.Err != "" {
  95. util.Send("消息返回错误告警" + info.Id + newinfo.Err) //告警
  96. info.CurrSetp = st //保存错误
  97. ErrMsgInfo <- info
  98. } else {
  99. log.Println("消息发送成功")
  100. info = newinfo
  101. ErrMsgInfo <- info
  102. }
  103. return info, nil
  104. }
  105. }
  106. func reMsg(msg *util.MsgInfo, subject, sts string) {
  107. isOk := false
  108. for _, st := range conf.Config.Process.Steps {
  109. if !isOk {
  110. if st == sts {
  111. isOk = true
  112. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  113. if err != nil {
  114. log.Println("err", msg.Id, err)
  115. util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警
  116. break
  117. } else {
  118. msg = newinfo
  119. }
  120. }
  121. } else {
  122. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  123. if err != nil {
  124. log.Println("err", msg.Id, err)
  125. util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警
  126. ErrMsgInfo <- msg //保存错误
  127. break
  128. } else {
  129. msg = newinfo
  130. }
  131. }
  132. }
  133. }
  134. // 错误数据保存
  135. func ErrBulkSave() {
  136. for {
  137. select {
  138. case data := <-ErrMsgInfo:
  139. log.Println("接收到错误数据")
  140. // dataMap := map[string]interface{}{}
  141. // bson.Unmarshal(data.Data, &dataMap)
  142. tmp := map[string]interface{}{
  143. "id": data.Id,
  144. "currsetp": data.CurrSetp,
  145. "nextsetp": data.NextSetp,
  146. "err": data.Err,
  147. "extend": data.Extend,
  148. "comeintime": time.Now().Unix(),
  149. "sTime": data.Stime,
  150. "eTime": data.Etime,
  151. "data": data.Data,
  152. }
  153. tmpid := Bidding.Save("msg_err", tmp)
  154. if tmpid != "" {
  155. log.Println("保存成功")
  156. } else {
  157. log.Println("保存失败~~~")
  158. }
  159. }
  160. }
  161. }