main.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package main
  2. import (
  3. "dataflow/util"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/nats-io/nats.go"
  8. mgo "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. )
  10. var (
  11. conf util.Conf
  12. url string
  13. ThreadsLimit int
  14. DB *mgo.MongodbSim
  15. ProMsgInfo = make(chan *util.MsgInfo, 50000)
  16. ErrMsgInfo = make(chan *util.MsgInfo, 50000)
  17. )
  18. func init() {
  19. conf = util.GetConf()
  20. ThreadsLimit = conf.Config.Threads
  21. url = conf.Config.Natsurl
  22. if url == "" {
  23. url = nats.DefaultURL
  24. }
  25. log.Println("url", url)
  26. DB = &mgo.MongodbSim{
  27. MongodbAddr: conf.Config.Mongodb.Addr,
  28. DbName: conf.Config.Mongodb.Dbname,
  29. Size: 2,
  30. }
  31. DB.InitPool()
  32. go ErrBulkSave()
  33. //go ErrPublish()
  34. //go SendAlarm()
  35. }
  36. func main() {
  37. nc, _ := nats.Connect(url)
  38. defer nc.Drain()
  39. step := conf.Config.Process
  40. subject := step.Subject
  41. //消息体
  42. for i := 0; i < 100; i++ {
  43. //消息体
  44. info := &util.MsgInfo{
  45. Id: fmt.Sprint(i),
  46. Data: map[string]interface{}{
  47. "title": "测试消息" + fmt.Sprint(i),
  48. "detail": "信息源码",
  49. },
  50. }
  51. ProMsgInfo <- info
  52. }
  53. for i := 0; i < ThreadsLimit; i++ {
  54. go func() {
  55. for {
  56. info := <-ProMsgInfo
  57. //按照步骤发布消息
  58. for _, st := range step.Steps {
  59. newinfo, err := util.SendRequest(nc, subject, st, info, 1*time.Second)
  60. if err != nil {
  61. //TODO 1、消息重发 2、告警
  62. log.Println("err", info.Id, err)
  63. ErrMsgInfo <- info
  64. break
  65. } else {
  66. info = newinfo
  67. //log.Println(info.Id, st, info.Etime-info.Stime)
  68. }
  69. }
  70. }
  71. }()
  72. }
  73. ck := make(chan int, 0)
  74. ck <- 0
  75. }
  76. // 错误消息重发
  77. func ErrPublish() {
  78. // TODO 读取msg_err重新发送
  79. }
  80. // 错误告警
  81. func SendAlarm() {
  82. // TODO
  83. }
  84. // 错误数据保存
  85. func ErrBulkSave() {
  86. elist := []map[string]interface{}{}
  87. for {
  88. select {
  89. case data, ok := <-ErrMsgInfo:
  90. if ok {
  91. tmp := map[string]interface{}{}
  92. tmp["id"] = data.Id
  93. tmp["currsetp"] = data.CurrSetp
  94. tmp["comeintime"] = time.Now().Unix()
  95. tmp["data"] = data.Data
  96. elist = append(elist, tmp)
  97. if len(elist) >= 20 {
  98. DB.SaveBulk("msg_err", elist...)
  99. elist = []map[string]interface{}{}
  100. }
  101. }
  102. case <-time.After(5 * time.Second): // 设置超时时间秒
  103. if len(elist) > 0 {
  104. DB.SaveBulk("msg_err", elist...)
  105. elist = []map[string]interface{}{}
  106. }
  107. default:
  108. }
  109. }
  110. }