package a2s import ( "fmt" "log" "math/rand" "strings" "sync" "time" "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" ) const ( ID_LENGTH = 32 ) type ( Bytes []byte //观察结果 WatchResponse struct { lock *sync.RWMutex cache map[string]chan Bytes } //坚听返回队列 WatchNatsQueue struct { lock *sync.RWMutex cache map[string]bool } ) var ( noice = "abcdefghijklmnopqrstuvwxyz0123456789" nc *nats.Conn w = &WatchResponse{new(sync.RWMutex), make(map[string]chan Bytes)} wnq = &WatchNatsQueue{new(sync.RWMutex), make(map[string]bool)} ) // id func id() string { sb := new(strings.Builder) for i := 0; i < ID_LENGTH; i++ { sb.WriteByte(noice[rand.Intn(36)]) } return sb.String() } // Add 方法调用时,添加观察 func (wr *WatchResponse) Add(msgId string, ch chan Bytes) { wr.lock.Lock() defer wr.lock.Unlock() wr.cache[msgId] = ch } // Del 超时/成功时,删除观察 func (wr *WatchResponse) Del(msgId string) { wr.lock.Lock() defer wr.lock.Unlock() if v, ok := wr.cache[msgId]; ok { close(v) delete(wr.cache, msgId) } } // Put 写入channel,select观察会有响应 func (wr *WatchResponse) Put(msgId string, data Bytes) { wr.lock.Lock() defer wr.lock.Unlock() if v, ok := wr.cache[msgId]; ok { v <- data } } // Watch func (wnq *WatchNatsQueue) Watch(topic string) { wnq.lock.Lock() defer wnq.lock.Unlock() if _, ok := wnq.cache[topic]; ok { return } else { wnq.cache[topic] = true rawTopic := fmt.Sprintf("%s_resp", topic) nc.QueueSubscribe(rawTopic, rawTopic, func(msg *nats.Msg) { obj := new(NatsResponse) err := proto.Unmarshal(msg.Data, obj) if err == nil { w.Put(obj.GetMsgId(), obj.GetData()) } }) } } // ConnectNats func ConnectNats(addr string) error { var err error natsAddr := fmt.Sprintf("nats://%s", addr) log.Println(natsAddr) nc, err = nats.Connect(natsAddr) return err } // SendMessage2Nats func SendMessage2Nats(topic string, timeout int64, data []byte) (string, <-chan Bytes, error) { rawTopic := fmt.Sprintf("%s_req", topic) // msgId := id() rawData := &NatsRequest{ MsgId: msgId, Timestamp: time.Now().Unix(), Timeout: timeout, Data: data, } // bs, err := proto.Marshal(rawData) if err != nil { return "", nil, err } //发布消息到指定队列 err = nc.Publish(rawTopic, bs) if err != nil { return "", nil, err } ch := make(chan Bytes, 1) // wnq.Watch(topic) // w.Add(msgId, ch) return msgId, ch, nil }