main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package main
  2. import (
  3. "dataflow/util"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "time"
  8. "app.yhyue.com/moapp/jybase/mongodb"
  9. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  10. )
  11. var (
  12. conf util.Conf
  13. ThreadsLimit int
  14. Bidding *mongodb.MongodbSim
  15. jn *jnats.Jnats
  16. ProMsgInfo = make(chan *util.MsgInfo, 50000)
  17. ErrMsgInfo = make(chan *util.MsgInfo, 50000)
  18. )
  19. func init() {
  20. conf = util.GetConf()
  21. jn = jnats.NewJnats(conf.Config.Natsurl)
  22. Bidding = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
  23. go PushInfo()
  24. go ErrBulkSave()
  25. }
  26. func main() {
  27. // 5秒定时 考虑增加流程暂停
  28. //消息体
  29. // for i := 0; i < 2; i++ {
  30. // //消息体
  31. // info := &util.MsgInfo{
  32. // Id: fmt.Sprint(i),
  33. // Data: map[string]interface{}{
  34. // "title": "测试消息" + fmt.Sprint(i),
  35. // "detail": "信息源码",
  36. // "time": time.Now().Unix(),
  37. // },
  38. // }
  39. // ProMsgInfo <- info
  40. // }
  41. data, ok := Bidding.FindById("bidding", "6007c2af7cdc9beb5638ede5", nil)
  42. if ok && data != nil {
  43. // datastr, _ := bson.Marshal(*data)
  44. info := &util.MsgInfo{
  45. Id: fmt.Sprint("1"),
  46. Data: *data,
  47. Stime: time.Now().Unix(),
  48. }
  49. ProMsgInfo <- info
  50. }
  51. // session := Bidding.GetMgoConn()
  52. // query := map[string]interface{}{}
  53. // defer Bidding.DestoryMongoConn(session)
  54. // iter := session.DB("").C("").Find(&query).Sort("-_id").Iter()
  55. // thisData := map[string]interface{}{}
  56. // for {
  57. // if !iter.Next(&thisData) {
  58. // break
  59. // }
  60. // thisData = map[string]interface{}{}
  61. // }
  62. select {}
  63. }
  64. func PushInfo() {
  65. step := conf.Config.Process
  66. subject := step.Subject
  67. ThreadsLimit = conf.Config.Threads
  68. for i := 0; i < ThreadsLimit; i++ {
  69. go func() {
  70. for {
  71. select {
  72. case info := <-ProMsgInfo:
  73. var err error
  74. for _, st := range step.Steps {
  75. if strings.Contains(st, "|") {
  76. st1 := strings.Split(st, "|")[0]
  77. st2 := strings.Split(st, "|")[1]
  78. if info.Extend.File.IsFile == 1 {
  79. info, err = sendMsg(jn, subject, st1, info)
  80. if err != nil {
  81. break
  82. }
  83. } else {
  84. info, err = sendMsg(jn, subject, st2, info)
  85. if err != nil {
  86. break
  87. }
  88. }
  89. } else {
  90. info, err = sendMsg(jn, subject, st, info)
  91. if err != nil {
  92. break
  93. }
  94. }
  95. }
  96. }
  97. }
  98. }()
  99. }
  100. }
  101. func sendMsg(jn *jnats.Jnats, subject, st string, info *util.MsgInfo) (*util.MsgInfo, error) {
  102. newinfo, err := util.SendRequest(jn, subject, st, info, 5*time.Minute)
  103. if err != nil {
  104. log.Println("err", info.Id, err)
  105. // reMsg(info, subject, st) //消息重发 是否需要?
  106. util.Send("发布消息失败告警" + info.Id + err.Error()) //告警 频次考虑控制
  107. info.CurrSetp = st //保存错误
  108. ErrMsgInfo <- info
  109. return info, err
  110. } else {
  111. if newinfo.Err != "" {
  112. util.Send("消息返回错误告警" + info.Id + newinfo.Err) //告警
  113. info.CurrSetp = st //保存错误
  114. ErrMsgInfo <- info
  115. } else {
  116. log.Println("消息发送成功")
  117. info = newinfo
  118. ErrMsgInfo <- info
  119. }
  120. return info, nil
  121. }
  122. }
  123. func reMsg(msg *util.MsgInfo, subject, sts string) {
  124. isOk := false
  125. for _, st := range conf.Config.Process.Steps {
  126. if !isOk {
  127. if st == sts {
  128. isOk = true
  129. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  130. if err != nil {
  131. log.Println("err", msg.Id, err)
  132. util.Send("消息重发失败告警" + msg.Id + err.Error()) //告警
  133. break
  134. } else {
  135. msg = newinfo
  136. }
  137. }
  138. } else {
  139. newinfo, err := util.SendRequest(jn, subject, st, msg, 5*time.Minute)
  140. if err != nil {
  141. log.Println("err", msg.Id, err)
  142. util.Send("发布消息失败告警" + msg.Id + err.Error()) //告警
  143. ErrMsgInfo <- msg //保存错误
  144. break
  145. } else {
  146. msg = newinfo
  147. }
  148. }
  149. }
  150. }
  151. // 错误数据保存
  152. func ErrBulkSave() {
  153. for {
  154. select {
  155. case data := <-ErrMsgInfo:
  156. log.Println("接收到错误数据")
  157. // dataMap := map[string]interface{}{}
  158. // bson.Unmarshal(data.Data, &dataMap)
  159. tmp := map[string]interface{}{
  160. "id": data.Id,
  161. "currsetp": data.CurrSetp,
  162. "nextsetp": data.NextSetp,
  163. "err": data.Err,
  164. "extend": data.Extend,
  165. "comeintime": time.Now().Unix(),
  166. "sTime": data.Stime,
  167. "eTime": data.Etime,
  168. "data": data.Data,
  169. }
  170. tmpid := Bidding.Save("msg_err", tmp)
  171. if tmpid != "" {
  172. log.Println("保存成功")
  173. } else {
  174. log.Println("保存失败~~~")
  175. }
  176. }
  177. }
  178. }