package a2s import ( "context" "errors" "fmt" "log" "net" "sync/atomic" "time" "github.com/cespare/xxhash/v2" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) type ( //GRPC 服务端 Server struct{} ) var ( reqAmout = make([]int32, 9999, 9999) ) // Call 远程调用 func (s *Server) Call(ctx context.Context, in *Request) (*Response, error) { topicIndex := int(xxhash.Sum64([]byte(in.GetTopic()))) % 9999 if topicIndex < 0 { topicIndex *= -1 } atomic.AddInt32(&(reqAmout[topicIndex]), 1) defer func() { fmt.Print(".") atomic.AddInt32(&(reqAmout[topicIndex]), -1) }() fmt.Print(">") msgId, ch, err := SendMessage2Nats(in.GetTopic(), in.GetTimeout(), in.GetData()) if err != nil { return &Response{}, err } //等候 select { case <-ctx.Done(): w.Del(msgId) return &Response{Code: 203, Msg: "客户端主动关闭连接"}, errors.New("client close the connection") case <-time.After(time.Duration(in.Timeout) * time.Second): //超时 w.Del(msgId) return &Response{Code: 500, Msg: "远程方法调用超时"}, errors.New("call remote method timeout") case ret := <-ch: w.Del(msgId) return &Response{Code: 200, Msg: "ok", Data: ret}, nil } return &Response{Code: 503, Msg: "未知错误"}, errors.New("unknow error") } // ViewState 查看状态 func (s *Server) ViewState(ctx context.Context, req *StateReq) (*StateResp, error) { topicIndex := int(xxhash.Sum64([]byte(req.GetTopic()))) % 9999 if topicIndex < 0 { topicIndex *= -1 } return &StateResp{CurrentRequest: reqAmout[topicIndex]}, nil } // startServer 开启RPC服务 func StartServer(listen string) { // 监听本地端口 lis, err := net.Listen("tcp", listen) if err != nil { log.Printf("监听端口失败: %s\n", err) return } else { log.Printf("监听端口:%s\n", listen) } // 创建gRPC服务器 s := grpc.NewServer() // 注册服务 RegisterCallerServer(s, new(Server)) reflection.Register(s) err = s.Serve(lis) if err != nil { log.Printf("开启服务失败: %s\n", err) return } }