package gonsq import ( "encoding/json" "strings" "time" "github.com/nsqio/go-nsq" ) type Consumer struct { Ch chan interface{} C *nsq.Consumer Topic string Channel string IsJsonEncode bool } type Cconfig struct { IsJsonEncode bool //是否进行json序列化,解码也进行序列化,默认不进行json序列化 ConnectType int //连接类型 0连nsqd 1连nsqlookup Interval int //设置服务发现的轮询时间,例如新的nsq出现,默认10秒 Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道 Concurrent int //并发数,默认为1 } //处理消息 func (c *Consumer) HandleMessage(msg *nsq.Message) error { if c.IsJsonEncode { if len(msg.Body) > 1 { var err error switch msg.Body[0] { case 0x00: var obj interface{} err = json.Unmarshal(msg.Body[1:], &obj) if err == nil && obj != nil { c.Ch <- obj } case 0x01: //[]byte数组 var obj []byte err = json.Unmarshal(msg.Body[1:], &obj) if err == nil && obj != nil { c.Ch <- obj } } return err } } else { c.Ch <- msg.Body } return nil } func NewConsumer(cc *Cconfig) (*Consumer, error) { cfg := nsq.NewConfig() if cc.Interval == 0 { cc.Interval = 10 } cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现 c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg) // 新建一个消费者 if err != nil { return nil, err } if cc.Concurrent == 0 { cc.Concurrent = 1 } consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode} c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口 addrs := strings.Split(cc.Addr, ",") var err1 error if cc.ConnectType == 0 { err1 = c.ConnectToNSQDs(addrs) } else if cc.ConnectType == 1 { err1 = c.ConnectToNSQLookupds(addrs) } return consumer, err1 }