1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package a2s
- import (
- "context"
- "errors"
- "fmt"
- "log"
- "net"
- "sync/atomic"
- "time"
- "github.com/cespare/xxhash/v2"
- "google.golang.org/grpc"
- "google.golang.org/grpc/reflection"
- )
- type (
- //GRPC 服务端
- Server struct{}
- )
- var (
- reqAmout = make([]int32, 9999, 9999)
- )
- // Call 远程调用
- func (s *Server) Call(ctx context.Context,
- in *Request) (*Response, error) {
- topicIndex := int(xxhash.Sum64([]byte(in.GetTopic()))) % 9999
- if topicIndex < 0 {
- topicIndex *= -1
- }
- atomic.AddInt32(&(reqAmout[topicIndex]), 1)
- defer func() {
- fmt.Print(".")
- atomic.AddInt32(&(reqAmout[topicIndex]), -1)
- }()
- fmt.Print(">")
- msgId, ch, err := SendMessage2Nats(in.GetTopic(), in.GetTimeout(), in.GetData())
- if err != nil {
- return &Response{}, err
- }
- //等候
- select {
- case <-ctx.Done():
- w.Del(msgId)
- return &Response{Code: 203, Msg: "客户端主动关闭连接"}, errors.New("client close the connection")
- case <-time.After(time.Duration(in.Timeout) * time.Second):
- //超时
- w.Del(msgId)
- return &Response{Code: 500, Msg: "远程方法调用超时"}, errors.New("call remote method timeout")
- case ret := <-ch:
- w.Del(msgId)
- return &Response{Code: 200, Msg: "ok", Data: ret}, nil
- }
- return &Response{Code: 503, Msg: "未知错误"}, errors.New("unknow error")
- }
- // ViewState 查看状态
- func (s *Server) ViewState(ctx context.Context, req *StateReq) (*StateResp, error) {
- topicIndex := int(xxhash.Sum64([]byte(req.GetTopic()))) % 9999
- if topicIndex < 0 {
- topicIndex *= -1
- }
- return &StateResp{CurrentRequest: reqAmout[topicIndex]}, nil
- }
- // startServer 开启RPC服务
- func StartServer(listen string) {
- // 监听本地端口
- lis, err := net.Listen("tcp", listen)
- if err != nil {
- log.Printf("监听端口失败: %s\n", err)
- return
- } else {
- log.Printf("监听端口:%s\n", listen)
- }
- // 创建gRPC服务器
- s := grpc.NewServer()
- // 注册服务
- RegisterCallerServer(s, new(Server))
- reflection.Register(s)
- err = s.Serve(lis)
- if err != nil {
- log.Printf("开启服务失败: %s\n", err)
- return
- }
- }
|