123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- package gonsq
- import (
- "encoding/json"
- "errors"
- "github.com/nsqio/go-nsq"
- )
- type Producer struct {
- //Ch chan interface{}
- P *nsq.Producer
- Topic string
- IsJsonEncode bool //是否进行json序列化,如果否则必须以[]byte传递,如果是则必须用对应的消费者对象[也设置了序列化]处理
- }
- func NewProducer(addr, toppic string, IsJsonEncode bool) (*Producer, error) {
- config := nsq.NewConfig()
- producer, err := nsq.NewProducer(addr, config)
- if err != nil {
- return nil, err
- } else {
- return &Producer{producer, toppic, IsJsonEncode}, nil
- }
- }
- func (p *Producer) Publish(msg interface{}) error {
- if p.IsJsonEncode {
- //var infoType byte
- //switch msg.(type) {
- //case []byte: //原本就是byte数组
- // infoType = 0x01
- //default:
- // infoType = 0x00
- //}
- data, err := json.Marshal(msg)
- if err != nil {
- return err
- } else if len(data) > 0 { //头部插入类型,用于解码[]byte
- //data = append([]byte{infoType}, data...)
- return p.P.Publish(p.Topic, data)
- } else {
- return errors.New("producer msg err")
- }
- } else { //必须传入[]byte
- if bs, ok := msg.([]byte); ok {
- return p.P.Publish(p.Topic, bs)
- }
- return errors.New("producer msg err: no []byte")
- }
- }
|