123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package a2s
- import (
- "fmt"
- "log"
- "math/rand"
- "strings"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/nats-io/nats.go"
- )
- const (
- ID_LENGTH = 32
- )
- type (
- Bytes []byte
- //观察结果
- WatchResponse struct {
- lock *sync.RWMutex
- cache map[string]chan Bytes
- }
- //坚听返回队列
- WatchNatsQueue struct {
- lock *sync.RWMutex
- cache map[string]bool
- }
- )
- var (
- noice = "abcdefghijklmnopqrstuvwxyz0123456789"
- nc *nats.Conn
- w = &WatchResponse{new(sync.RWMutex), make(map[string]chan Bytes)}
- wnq = &WatchNatsQueue{new(sync.RWMutex), make(map[string]bool)}
- )
- // id
- func id() string {
- sb := new(strings.Builder)
- for i := 0; i < ID_LENGTH; i++ {
- sb.WriteByte(noice[rand.Intn(36)])
- }
- return sb.String()
- }
- // Add 方法调用时,添加观察
- func (wr *WatchResponse) Add(msgId string, ch chan Bytes) {
- wr.lock.Lock()
- defer wr.lock.Unlock()
- wr.cache[msgId] = ch
- }
- // Del 超时/成功时,删除观察
- func (wr *WatchResponse) Del(msgId string) {
- wr.lock.Lock()
- defer wr.lock.Unlock()
- if v, ok := wr.cache[msgId]; ok {
- close(v)
- delete(wr.cache, msgId)
- }
- }
- // Put 写入channel,select观察会有响应
- func (wr *WatchResponse) Put(msgId string, data Bytes) {
- wr.lock.Lock()
- defer wr.lock.Unlock()
- if v, ok := wr.cache[msgId]; ok {
- v <- data
- }
- }
- // Watch
- func (wnq *WatchNatsQueue) Watch(topic string) {
- wnq.lock.Lock()
- defer wnq.lock.Unlock()
- if _, ok := wnq.cache[topic]; ok {
- return
- } else {
- wnq.cache[topic] = true
- rawTopic := fmt.Sprintf("%s_resp", topic)
- nc.QueueSubscribe(rawTopic, rawTopic, func(msg *nats.Msg) {
- obj := new(NatsResponse)
- err := proto.Unmarshal(msg.Data, obj)
- if err == nil {
- w.Put(obj.GetMsgId(), obj.GetData())
- }
- })
- }
- }
- // ConnectNats
- func ConnectNats(addr string) error {
- var err error
- natsAddr := fmt.Sprintf("nats://%s", addr)
- log.Println(natsAddr)
- nc, err = nats.Connect(natsAddr)
- return err
- }
- // SendMessage2Nats
- func SendMessage2Nats(topic string,
- timeout int64,
- data []byte) (string, <-chan Bytes, error) {
- rawTopic := fmt.Sprintf("%s_req", topic)
- //
- msgId := id()
- rawData := &NatsRequest{
- MsgId: msgId,
- Timestamp: time.Now().Unix(),
- Timeout: timeout,
- Data: data,
- }
- //
- bs, err := proto.Marshal(rawData)
- if err != nil {
- return "", nil, err
- }
- //发布消息到指定队列
- err = nc.Publish(rawTopic, bs)
- if err != nil {
- return "", nil, err
- }
- ch := make(chan Bytes, 1)
- //
- wnq.Watch(topic)
- //
- w.Add(msgId, ch)
- return msgId, ch, nil
- }
|