package gonsq import ( "encoding/json" "errors" "github.com/nsqio/go-nsq" ) type Producer struct { //Ch chan interface{} P *nsq.Producer Topic string IsJsonEncode bool //是否进行json序列化,如果否则必须以[]byte传递,如果是则必须用对应的消费者对象[也设置了序列化]处理 } func NewProducer(addr, toppic string, IsJsonEncode bool) (*Producer, error) { config := nsq.NewConfig() producer, err := nsq.NewProducer(addr, config) if err != nil { return nil, err } else { return &Producer{producer, toppic, IsJsonEncode}, nil } } func (p *Producer) Publish(msg interface{}) error { if p.IsJsonEncode { //var infoType byte //switch msg.(type) { //case []byte: //原本就是byte数组 // infoType = 0x01 //default: // infoType = 0x00 //} data, err := json.Marshal(msg) if err != nil { return err } else if len(data) > 0 { //头部插入类型,用于解码[]byte //data = append([]byte{infoType}, data...) return p.P.Publish(p.Topic, data) } else { return errors.New("producer msg err") } } else { //必须传入[]byte if bs, ok := msg.([]byte); ok { return p.P.Publish(p.Topic, bs) } return errors.New("producer msg err: no []byte") } }