server.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package a2s
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. "github.com/cespare/xxhash/v2"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/reflection"
  13. )
  14. type (
  15. //GRPC 服务端
  16. Server struct{}
  17. )
  18. var (
  19. reqAmout = make([]int32, 9999, 9999)
  20. )
  21. // Call 远程调用
  22. func (s *Server) Call(ctx context.Context,
  23. in *Request) (*Response, error) {
  24. topicIndex := int(xxhash.Sum64([]byte(in.GetTopic()))) % 9999
  25. if topicIndex < 0 {
  26. topicIndex *= -1
  27. }
  28. atomic.AddInt32(&(reqAmout[topicIndex]), 1)
  29. defer func() {
  30. fmt.Print(".")
  31. atomic.AddInt32(&(reqAmout[topicIndex]), -1)
  32. }()
  33. fmt.Print(">")
  34. msgId, ch, err := SendMessage2Nats(in.GetTopic(), in.GetTimeout(), in.GetData())
  35. if err != nil {
  36. return &Response{}, err
  37. }
  38. //等候
  39. select {
  40. case <-ctx.Done():
  41. w.Del(msgId)
  42. return &Response{Code: 203, Msg: "客户端主动关闭连接"}, errors.New("client close the connection")
  43. case <-time.After(time.Duration(in.Timeout) * time.Second):
  44. //超时
  45. w.Del(msgId)
  46. return &Response{Code: 500, Msg: "远程方法调用超时"}, errors.New("call remote method timeout")
  47. case ret := <-ch:
  48. w.Del(msgId)
  49. return &Response{Code: 200, Msg: "ok", Data: ret}, nil
  50. }
  51. return &Response{Code: 503, Msg: "未知错误"}, errors.New("unknow error")
  52. }
  53. // ViewState 查看状态
  54. func (s *Server) ViewState(ctx context.Context, req *StateReq) (*StateResp, error) {
  55. topicIndex := int(xxhash.Sum64([]byte(req.GetTopic()))) % 9999
  56. if topicIndex < 0 {
  57. topicIndex *= -1
  58. }
  59. return &StateResp{CurrentRequest: reqAmout[topicIndex]}, nil
  60. }
  61. // startServer 开启RPC服务
  62. func StartServer(listen string) {
  63. // 监听本地端口
  64. lis, err := net.Listen("tcp", listen)
  65. if err != nil {
  66. log.Printf("监听端口失败: %s\n", err)
  67. return
  68. } else {
  69. log.Printf("监听端口:%s\n", listen)
  70. }
  71. // 创建gRPC服务器
  72. s := grpc.NewServer()
  73. // 注册服务
  74. RegisterCallerServer(s, new(Server))
  75. reflection.Register(s)
  76. err = s.Serve(lis)
  77. if err != nil {
  78. log.Printf("开启服务失败: %s\n", err)
  79. return
  80. }
  81. }