renzheng 3 lat temu
rodzic
commit
16f9acd9e6
2 zmienionych plików z 56 dodań i 16 usunięć
  1. 45 10
      gonsq/gonsq_test.go
  2. 11 6
      gonsq/producer.go

+ 45 - 10
gonsq/gonsq_test.go

@@ -9,18 +9,11 @@ import (
 
 func Test_nsq(t *testing.T) {
 
-	p, e2 := NewProducer("192.168.3.207:4150", "tt", false)
-	go func() {
-		log.Println(p, e2)
-		time.Sleep(3 * time.Second)
-		p.Publish([]byte("aaaa1111"))
-	}()
-
 	c, e1 := NewConsumer(&Cconfig{
 		IsJsonEncode: false,
 		Addr:         "192.168.3.207:4150",
-		Topic:        "tt",
-		Channel:      "cc",
+		Topic:        "jyalert",
+		Channel:      "jyalert",
 		Concurrent:   1,
 	})
 	log.Println(c, e1)
@@ -33,7 +26,14 @@ func Test_nsq(t *testing.T) {
 		}
 	}()
 
-	select {}
+	p, e2 := NewProducer("192.168.3.207:4150", "jyalert", false)
+	go func() {
+		log.Println(p, e2)
+		time.Sleep(3 * time.Second)
+		p.Publish([]byte("aaaa1111"))
+	}()
+
+	time.Sleep(15 * time.Second)
 
 }
 
@@ -42,4 +42,39 @@ func Test_arr(t *testing.T) {
 	var n []byte
 	json.Unmarshal(v, &n)
 	log.Println(v, n, []byte{0xFF})
+
+	p, _ := NewProducer("192.168.3.207:4150", "jyalert", false)
+	type Msg struct {
+		Id         string                 `json:"id"` //用于标识这个告警分组,及告警方式、告警人
+		Text       string                 `json:"text"`
+		Title      string                 `json:"title"`      //非必填项
+		AppendInfo map[string]interface{} `json:"appendinfo"` //附加信息,非必填项
+	}
+	m := &Msg{"invoice_alert", "首页console错误请查看", "你有新的告警消息处理", map[string]interface{}{"ip": "127.0.0.1", "service": "sword"}}
+	bs, _ := json.Marshal(m)
+	p.Publish(bs)
+
+	var b Msg
+	json.Unmarshal(bs, &b)
+	log.Println(string(bs), b)
+	time.Sleep(1 * time.Second)
+
+}
+
+func Test_consumer(t *testing.T) {
+	c, e1 := NewConsumer(&Cconfig{
+		IsJsonEncode: false,
+		Addr:         "192.168.3.207:4150",
+		Topic:        "jyalert",
+		Channel:      "jyalert",
+		Concurrent:   1,
+	})
+	log.Println(c, e1)
+	select {
+	case obj := <-c.Ch:
+		log.Println("ccc", obj)
+	case <-time.After(4 * time.Second):
+		break
+	}
+
 }

+ 11 - 6
gonsq/producer.go

@@ -2,7 +2,7 @@ package gonsq
 
 import (
 	"encoding/json"
-	"log"
+	"errors"
 
 	"github.com/nsqio/go-nsq"
 )
@@ -33,14 +33,19 @@ func (p *Producer) Publish(msg interface{}) error {
 		default:
 			infoType = 0x00
 		}
-		log.Println(infoType)
 		data, err := json.Marshal(msg)
-		if err == nil && len(data) > 0 { //头部插入类型,用于解码[]byte
+		if err != nil {
+			return err
+		} else if len(data) > 0 { //头部插入类型,用于解码[]byte
 			data = append([]byte{infoType}, data...)
+			return p.P.Publish(p.Topic, data)
+		} else {
+			return errors.New("producer msg err")
 		}
-		return p.P.Publish(p.Topic, data)
 	} else { //必须传入[]byte
-		return p.P.Publish(p.Topic, msg.([]byte))
+		if bs, ok := msg.([]byte); ok {
+			return p.P.Publish(p.Topic, bs)
+		}
+		return errors.New("producer msg err: no []byte")
 	}
-
 }