producer.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package gonsq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "github.com/nsqio/go-nsq"
  6. )
  7. type Producer struct {
  8. //Ch chan interface{}
  9. P *nsq.Producer
  10. Topic string
  11. IsJsonEncode bool //是否进行json序列化,如果否则必须以[]byte传递,如果是则必须用对应的消费者对象[也设置了序列化]处理
  12. }
  13. func NewProducer(addr, toppic string, IsJsonEncode bool) (*Producer, error) {
  14. config := nsq.NewConfig()
  15. producer, err := nsq.NewProducer(addr, config)
  16. if err != nil {
  17. return nil, err
  18. } else {
  19. return &Producer{producer, toppic, IsJsonEncode}, nil
  20. }
  21. }
  22. func (p *Producer) Publish(msg interface{}) error {
  23. if p.IsJsonEncode {
  24. //var infoType byte
  25. //switch msg.(type) {
  26. //case []byte: //原本就是byte数组
  27. // infoType = 0x01
  28. //default:
  29. // infoType = 0x00
  30. //}
  31. data, err := json.Marshal(msg)
  32. if err != nil {
  33. return err
  34. } else if len(data) > 0 { //头部插入类型,用于解码[]byte
  35. //data = append([]byte{infoType}, data...)
  36. return p.P.Publish(p.Topic, data)
  37. } else {
  38. return errors.New("producer msg err")
  39. }
  40. } else { //必须传入[]byte
  41. if bs, ok := msg.([]byte); ok {
  42. return p.P.Publish(p.Topic, bs)
  43. }
  44. return errors.New("producer msg err: no []byte")
  45. }
  46. }