consumer.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package gonsq
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "time"
  6. "github.com/nsqio/go-nsq"
  7. )
  8. type Consumer struct {
  9. Ch chan interface{}
  10. C *nsq.Consumer
  11. Topic string
  12. Channel string
  13. IsJsonEncode bool
  14. Conf *Cconfig
  15. }
  16. type Cconfig struct {
  17. IsJsonEncode bool //是否进行json序列化,解码也进行序列化,默认不进行json序列化
  18. ConnectType int //连接类型 0连nsqd 1连nsqlookup
  19. Interval int //设置服务发现的轮询时间,例如新的nsq出现,默认10秒
  20. Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道
  21. Concurrent int //并发数,默认为1
  22. }
  23. //处理消息
  24. func (c *Consumer) HandleMessage(msg *nsq.Message) error {
  25. if c.IsJsonEncode {
  26. if len(msg.Body) > 1 {
  27. var err error
  28. switch msg.Body[0] {
  29. case 0x00:
  30. var obj interface{}
  31. err = json.Unmarshal(msg.Body[1:], &obj)
  32. if err == nil && obj != nil {
  33. c.Ch <- obj
  34. }
  35. case 0x01: //[]byte数组
  36. var obj []byte
  37. err = json.Unmarshal(msg.Body[1:], &obj)
  38. if err == nil && obj != nil {
  39. c.Ch <- obj
  40. }
  41. }
  42. return err
  43. }
  44. } else {
  45. c.Ch <- msg.Body
  46. }
  47. return nil
  48. }
  49. func NewConsumer(cc *Cconfig) (*Consumer, error) {
  50. cfg := nsq.NewConfig()
  51. if cc.Interval == 0 {
  52. cc.Interval = 10
  53. }
  54. cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现
  55. c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg) // 新建一个消费者
  56. if err != nil {
  57. return nil, err
  58. }
  59. if cc.Concurrent == 0 {
  60. cc.Concurrent = 1
  61. }
  62. consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode, cc}
  63. c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
  64. addrs := strings.Split(cc.Addr, ",")
  65. var err1 error
  66. if cc.ConnectType == 0 {
  67. err1 = c.ConnectToNSQDs(addrs)
  68. } else if cc.ConnectType == 1 {
  69. err1 = c.ConnectToNSQLookupds(addrs)
  70. }
  71. return consumer, err1
  72. }
  73. //处理消息
  74. func (c *Consumer) Close(msg *nsq.Message) error {
  75. if c.Conf.ConnectType == 1 {
  76. return c.C.DisconnectFromNSQLookupd(c.Conf.Addr)
  77. }
  78. return c.C.DisconnectFromNSQD(c.Conf.Addr)
  79. }