1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- package gonsq
- import (
- "encoding/json"
- "strings"
- "time"
- "github.com/nsqio/go-nsq"
- )
- type Consumer struct {
- Ch chan interface{}
- C *nsq.Consumer
- Topic string
- Channel string
- IsJsonEncode bool
- Conf *Cconfig
- }
- type Cconfig struct {
- IsJsonEncode bool //是否进行json序列化,解码也进行序列化,默认不进行json序列化
- ConnectType int //连接类型 0连nsqd 1连nsqlookup
- Interval int //设置服务发现的轮询时间,例如新的nsq出现,默认10秒
- Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道
- Concurrent int //并发数,默认为1
- }
- //处理消息
- func (c *Consumer) HandleMessage(msg *nsq.Message) error {
- if c.IsJsonEncode {
- if len(msg.Body) > 1 {
- var err error
- switch msg.Body[0] {
- case 0x00:
- var obj interface{}
- err = json.Unmarshal(msg.Body[1:], &obj)
- if err == nil && obj != nil {
- c.Ch <- obj
- }
- case 0x01: //[]byte数组
- var obj []byte
- err = json.Unmarshal(msg.Body[1:], &obj)
- if err == nil && obj != nil {
- c.Ch <- obj
- }
- }
- return err
- }
- } else {
- c.Ch <- msg.Body
- }
- return nil
- }
- func NewConsumer(cc *Cconfig) (*Consumer, error) {
- cfg := nsq.NewConfig()
- if cc.Interval == 0 {
- cc.Interval = 10
- }
- cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现
- c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg) // 新建一个消费者
- if err != nil {
- return nil, err
- }
- if cc.Concurrent == 0 {
- cc.Concurrent = 1
- }
- consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode, cc}
- c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
- addrs := strings.Split(cc.Addr, ",")
- var err1 error
- if cc.ConnectType == 0 {
- err1 = c.ConnectToNSQDs(addrs)
- } else if cc.ConnectType == 1 {
- err1 = c.ConnectToNSQLookupds(addrs)
- }
- return consumer, err1
- }
- //处理消息
- func (c *Consumer) Close(msg *nsq.Message) error {
- if c.Conf.ConnectType == 1 {
- return c.C.DisconnectFromNSQLookupd(c.Conf.Addr)
- }
- return c.C.DisconnectFromNSQD(c.Conf.Addr)
- }
|