consumer.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package gonsq
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "time"
  6. "github.com/nsqio/go-nsq"
  7. )
  8. type Consumer struct {
  9. Ch chan interface{}
  10. C *nsq.Consumer
  11. Topic string
  12. Channel string
  13. IsJsonEncode bool
  14. }
  15. type Cconfig struct {
  16. IsJsonEncode bool //是否进行json序列化,解码也进行序列化,默认不进行json序列化
  17. ConnectType int //连接类型 0连nsqd 1连nsqlookup
  18. Interval int //设置服务发现的轮询时间,例如新的nsq出现,默认10秒
  19. Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道
  20. Concurrent int //并发数,默认为1
  21. }
  22. //处理消息
  23. func (c *Consumer) HandleMessage(msg *nsq.Message) error {
  24. if c.IsJsonEncode {
  25. if len(msg.Body) > 1 {
  26. var err error
  27. switch msg.Body[0] {
  28. case 0x00:
  29. var obj interface{}
  30. err = json.Unmarshal(msg.Body[1:], &obj)
  31. if err == nil && obj != nil {
  32. c.Ch <- obj
  33. }
  34. case 0x01: //[]byte数组
  35. var obj []byte
  36. err = json.Unmarshal(msg.Body[1:], &obj)
  37. if err == nil && obj != nil {
  38. c.Ch <- obj
  39. }
  40. }
  41. return err
  42. }
  43. } else {
  44. c.Ch <- msg.Body
  45. }
  46. return nil
  47. }
  48. func NewConsumer(cc *Cconfig) (*Consumer, error) {
  49. cfg := nsq.NewConfig()
  50. if cc.Interval == 0 {
  51. cc.Interval = 10
  52. }
  53. cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现
  54. c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg) // 新建一个消费者
  55. if err != nil {
  56. return nil, err
  57. }
  58. if cc.Concurrent == 0 {
  59. cc.Concurrent = 1
  60. }
  61. consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode}
  62. c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
  63. addrs := strings.Split(cc.Addr, ",")
  64. var err1 error
  65. if cc.ConnectType == 0 {
  66. err1 = c.ConnectToNSQDs(addrs)
  67. } else if cc.ConnectType == 1 {
  68. err1 = c.ConnectToNSQLookupds(addrs)
  69. }
  70. return consumer, err1
  71. }