|
@@ -14,6 +14,7 @@ type Consumer struct {
|
|
|
Topic string
|
|
|
Channel string
|
|
|
IsJsonEncode bool
|
|
|
+ Conf *Cconfig
|
|
|
}
|
|
|
|
|
|
type Cconfig struct {
|
|
@@ -64,7 +65,7 @@ func NewConsumer(cc *Cconfig) (*Consumer, error) {
|
|
|
if cc.Concurrent == 0 {
|
|
|
cc.Concurrent = 1
|
|
|
}
|
|
|
- consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode}
|
|
|
+ consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode, cc}
|
|
|
c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
|
|
|
addrs := strings.Split(cc.Addr, ",")
|
|
|
var err1 error
|
|
@@ -75,3 +76,11 @@ func NewConsumer(cc *Cconfig) (*Consumer, error) {
|
|
|
}
|
|
|
return consumer, err1
|
|
|
}
|
|
|
+
|
|
|
+//处理消息
|
|
|
+func (c *Consumer) Close(msg *nsq.Message) error {
|
|
|
+ if c.Conf.ConnectType == 1 {
|
|
|
+ return c.C.DisconnectFromNSQLookupd(c.Conf.Addr)
|
|
|
+ }
|
|
|
+ return c.C.DisconnectFromNSQD(c.Conf.Addr)
|
|
|
+}
|