nsq工具类

renzheng ebc43af906 feat:add comsumer mathod close %!s(int64=3) %!d(string=hai) anos
docker 0d129f774b init %!s(int64=3) %!d(string=hai) anos
gonsq ebc43af906 feat:add comsumer mathod close %!s(int64=3) %!d(string=hai) anos
.gitignore b990e4e391 init %!s(int64=3) %!d(string=hai) anos
README.md 661ea8438d init %!s(int64=3) %!d(string=hai) anos
go.mod 0d129f774b init %!s(int64=3) %!d(string=hai) anos
go.sum 0d129f774b init %!s(int64=3) %!d(string=hai) anos
jynsq.go 0d129f774b init %!s(int64=3) %!d(string=hai) anos

README.md

jynsq

nsq工具类

实现功能

  • 生产者,发布消息,支持配置nsqd地址,主题,是否json序列化(默认不序列化)

  • 消费者,消费消息,支持配置nsqd或nsqlookupd,主题,通道,并发,是否json序列化

start

//使用go get
go get bp.jydev.jianyu360.cn/BP/jynsq

//使用 go mod 
import "bp.jydev.jianyu360.cn/BP/jynsq/gonsq"

生产者

p, err := NewProducer("192.168.3.207:4150", "tt", true) //nsqd地址,主题,是否进行json序列化与消费都配置对应、否则[]byte截取出错
p.Publish("test")
p.Publish(123)
p.Publish(map[string]interface{}{"key":1})
p.Publish([]byte("aaa"))   //如果只传递[]byte可不进行序列化

消费者

c, err := NewConsumer(&Cconfig{
	IsJsonEncode: true,   //与生产者配置对应,设为true会取第1个字节进行类型判断
	Addr:         "192.168.3.207:4150", //默认连接nsqd
	ConnectType:  0,//默认连接nsqd
	Topic:        "tt", //主题
	Channel:      "cc", //通道
	Concurrent:   1,    //并发数
})
for {
	select {
	case obj := <-c.Ch:  //从通道读取即可
		log.Println("ccc", obj)
	}
}

docker启动nsq

  • 在docker目录有 docker-compose.yml 文件,直接下载启动 docker-compose up -d

一个nsqd 一个nsqlookupd 一个nsqadmin,如果需要增加nsqd直接单独启nsqd即可,使用docker时用消费者连nsqlookupd返回的是docker内部地址,连接有问题