renzheng преди 3 години
родител
ревизия
0d129f774b
променени са 8 файла, в които са добавени 251 реда и са изтрити 0 реда
  1. 45 0
      README.md
  2. 28 0
      docker/docker-compose.yml
  3. 5 0
      go.mod
  4. 4 0
      go.sum
  5. 77 0
      gonsq/consumer.go
  6. 45 0
      gonsq/gonsq_test.go
  7. 46 0
      gonsq/producer.go
  8. 1 0
      jynsq.go

+ 45 - 0
README.md

@@ -0,0 +1,45 @@
+# jynsq  
+> nsq工具类
+
+### 实现功能  
+
+- 生产者,发布消息,支持配置nsqd地址,主题,是否json序列化(默认不序列化)
+
+- 消费者,消费消息,支持配置nsqd或nsqlookupd,主题,通道,并发,是否json序列化
+
+### start
+
+```
+//使用go get
+go get bp.jydev.jianyu360.cn/BP/jynsq
+
+//使用 go mod 
+import "bp.jydev.jianyu360.cn/BP/jynsq/gonsq"
+
+
+//生产者
+p, err := NewProducer("192.168.3.207:4150", "tt", true)
+p.Publish("test")
+p.Publish(123)
+p.Publish(map[string]interface{}{"key":1})
+p.Publish([]byte("aaa"))   //如果只传递[]byte可不进行序列化
+
+//消费者
+c, err := NewConsumer(&Cconfig{
+	IsJsonEncode: true,   //与生产者配置对应,设为true会取第1个字节进行类型判断
+	Addr:         "192.168.3.207:4150", //默认连接nsqd
+	ConnectType:  0,//默认连接nsqd
+	Topic:        "tt", //主题
+	Channel:      "cc", //通道
+	Concurrent:   1,    //并发数
+})
+
+for {
+	select {
+	case obj := <-c.Ch:  //从通道读取即可
+		log.Println("ccc", obj)
+	}
+}
+ 
+
+```

+ 28 - 0
docker/docker-compose.yml

@@ -0,0 +1,28 @@
+version: '3'
+services:
+  nsqlookupd:
+    image: nsqio/nsq
+    command: /nsqlookupd
+    restart: always
+    ports:
+      - 4160:4160
+      - 4161:4161
+  nsqd:
+    image: nsqio/nsq
+    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --data-path=/data
+    restart: always
+    depends_on:
+      - nsqlookupd
+    volumes:
+      - ./data:/data
+    ports:
+      - 4150:4150
+      - 4151:4151
+  nsqadmin:
+    image: nsqio/nsq
+    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
+    restart: always
+    depends_on:
+      - nsqlookupd
+    ports:
+      - 4171:4171

+ 5 - 0
go.mod

@@ -0,0 +1,5 @@
+module bp.jydev.jianyu360.cn/BP/jynsq
+
+go 1.16
+
+require github.com/nsqio/go-nsq v1.1.0

+ 4 - 0
go.sum

@@ -0,0 +1,4 @@
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=

+ 77 - 0
gonsq/consumer.go

@@ -0,0 +1,77 @@
+package gonsq
+
+import (
+	"encoding/json"
+	"strings"
+	"time"
+
+	"github.com/nsqio/go-nsq"
+)
+
+type Consumer struct {
+	Ch           chan interface{}
+	C            *nsq.Consumer
+	Topic        string
+	Channel      string
+	IsJsonEncode bool
+}
+
+type Cconfig struct {
+	IsJsonEncode         bool   //是否进行json序列化,解码也进行序列化,默认不进行json序列化
+	ConnectType          int    //连接类型 0连nsqd 1连nsqlookup
+	Interval             int    //设置服务发现的轮询时间,例如新的nsq出现,默认10秒
+	Addr, Topic, Channel string //连接地址(支持逗号分割多个),主题,通道
+	Concurrent           int    //并发数,默认为1
+}
+
+//处理消息
+func (c *Consumer) HandleMessage(msg *nsq.Message) error {
+	if c.IsJsonEncode {
+		if len(msg.Body) > 1 {
+			var err error
+			switch msg.Body[0] {
+			case 0x00:
+				var obj interface{}
+				err = json.Unmarshal(msg.Body[1:], &obj)
+				if err == nil && obj != nil {
+					c.Ch <- obj
+				}
+			case 0x01: //[]byte数组
+				var obj []byte
+				err = json.Unmarshal(msg.Body[1:], &obj)
+				if err == nil && obj != nil {
+					c.Ch <- obj
+				}
+			}
+			return err
+		}
+	} else {
+		c.Ch <- msg.Body
+	}
+	return nil
+}
+
+func NewConsumer(cc *Cconfig) (*Consumer, error) {
+	cfg := nsq.NewConfig()
+	if cc.Interval == 0 {
+		cc.Interval = 10
+	}
+	cfg.LookupdPollInterval = time.Duration(cc.Interval) * time.Second //设置服务发现的轮询时间,例如新的nsq出现
+	c, err := nsq.NewConsumer(cc.Topic, cc.Channel, cfg)               // 新建一个消费者
+	if err != nil {
+		return nil, err
+	}
+	if cc.Concurrent == 0 {
+		cc.Concurrent = 1
+	}
+	consumer := &Consumer{make(chan interface{}, cc.Concurrent), c, cc.Topic, cc.Channel, cc.IsJsonEncode}
+	c.AddConcurrentHandlers(consumer, cc.Concurrent) // 添加消费者接口
+	addrs := strings.Split(cc.Addr, ",")
+	var err1 error
+	if cc.ConnectType == 0 {
+		err1 = c.ConnectToNSQDs(addrs)
+	} else if cc.ConnectType == 1 {
+		err1 = c.ConnectToNSQLookupds(addrs)
+	}
+	return consumer, err1
+}

+ 45 - 0
gonsq/gonsq_test.go

@@ -0,0 +1,45 @@
+package gonsq
+
+import (
+	"encoding/json"
+	"log"
+	"testing"
+	"time"
+)
+
+func Test_nsq(t *testing.T) {
+
+	p, e2 := NewProducer("192.168.3.207:4150", "tt", false)
+	go func() {
+		log.Println(p, e2)
+		time.Sleep(3 * time.Second)
+		p.Publish([]byte("aaaa1111"))
+	}()
+
+	c, e1 := NewConsumer(&Cconfig{
+		IsJsonEncode: false,
+		Addr:         "192.168.3.207:4150",
+		Topic:        "tt",
+		Channel:      "cc",
+		Concurrent:   1,
+	})
+	log.Println(c, e1)
+	go func() {
+		for {
+			select {
+			case obj := <-c.Ch:
+				log.Println("ccc", obj)
+			}
+		}
+	}()
+
+	select {}
+
+}
+
+func Test_arr(t *testing.T) {
+	v, _ := json.Marshal([]byte("aaaa1111"))
+	var n []byte
+	json.Unmarshal(v, &n)
+	log.Println(v, n, []byte{0xFF})
+}

+ 46 - 0
gonsq/producer.go

@@ -0,0 +1,46 @@
+package gonsq
+
+import (
+	"encoding/json"
+	"log"
+
+	"github.com/nsqio/go-nsq"
+)
+
+type Producer struct {
+	//Ch    chan interface{}
+	P            *nsq.Producer
+	Topic        string
+	IsJsonEncode bool //是否进行json序列化,如果否则必须以[]byte传递,如果是则必须用对应的消费者对象[也设置了序列化]处理
+}
+
+func NewProducer(addr, toppic string, IsJsonEncode bool) (*Producer, error) {
+	config := nsq.NewConfig()
+	producer, err := nsq.NewProducer(addr, config)
+	if err != nil {
+		return nil, err
+	} else {
+		return &Producer{producer, toppic, IsJsonEncode}, nil
+	}
+}
+
+func (p *Producer) Publish(msg interface{}) error {
+	if p.IsJsonEncode {
+		var infoType byte
+		switch msg.(type) {
+		case []byte: //原本就是byte数组
+			infoType = 0x01
+		default:
+			infoType = 0x00
+		}
+		log.Println(infoType)
+		data, err := json.Marshal(msg)
+		if err == nil && len(data) > 0 { //头部插入类型,用于解码[]byte
+			data = append([]byte{infoType}, data...)
+		}
+		return p.P.Publish(p.Topic, data)
+	} else { //必须传入[]byte
+		return p.P.Publish(p.Topic, msg.([]byte))
+	}
+
+}

+ 1 - 0
jynsq.go

@@ -0,0 +1 @@
+package jynsq