serviceutil.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /**
  2. 服务工具包,提供对服务的封装
  3. */
  4. package util
  5. import (
  6. "app.yhyue.com/BP/servicerd/proto"
  7. "context"
  8. "fmt"
  9. "google.golang.org/grpc"
  10. "log"
  11. "net"
  12. "time"
  13. )
  14. type GrpcRegisteFn func(s *grpc.Server) error
  15. type BaseService struct {
  16. RdServer string //服务治理中心节点地址
  17. ServiceIp, ServiceName string //
  18. ServicePort, HeartbeatInterval, Workers int //心跳间隔时间
  19. meta *proto.ServiceMeta //服务描述
  20. }
  21. //
  22. func NewService(rdServer, serviceName, serviceIp string, servicePort, heartbeatInterval, workers int) *BaseService {
  23. return &BaseService{
  24. RdServer: rdServer,
  25. ServiceIp: serviceIp,
  26. ServicePort: servicePort,
  27. HeartbeatInterval: heartbeatInterval,
  28. ServiceName: serviceName,
  29. Workers: workers,
  30. meta: &proto.ServiceMeta{
  31. Name: serviceName,
  32. Ip: serviceIp,
  33. Port: int32(servicePort),
  34. Workers: int32(workers),
  35. Balance: 0,
  36. },
  37. }
  38. }
  39. //
  40. func (bs *BaseService) heartbeat() {
  41. conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
  42. if err != nil {
  43. return
  44. }
  45. defer conn.Close()
  46. client := proto.NewHeartBeatClient(conn)
  47. //调用服务端推送流
  48. resp, _ := client.PutStream(context.Background())
  49. tm := time.NewTicker(time.Duration(bs.HeartbeatInterval) * time.Second)
  50. for {
  51. select {
  52. case <-tm.C:
  53. reqstreamData := &proto.StreamReqData{ServiceName: bs.ServiceName, ServiceAddr: fmt.Sprintf("%s:%d", bs.ServiceIp, bs.ServicePort)}
  54. _ = resp.Send(reqstreamData)
  55. }
  56. }
  57. }
  58. //服务注册
  59. func (bs *BaseService) registe() {
  60. conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
  61. if err != nil {
  62. return
  63. }
  64. defer conn.Close()
  65. client := proto.NewServiceClient(conn)
  66. ret, err := client.Registe(context.TODO(), bs.meta)
  67. if err != nil {
  68. log.Println(err.Error())
  69. } else {
  70. log.Println(ret)
  71. }
  72. }
  73. //服务注销
  74. func (bs *BaseService) Destory() {
  75. conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
  76. if err != nil {
  77. return
  78. }
  79. defer conn.Close()
  80. client := proto.NewServiceClient(conn)
  81. ret, err := client.Destory(context.TODO(), bs.meta)
  82. if err != nil {
  83. log.Println(err.Error())
  84. } else {
  85. log.Println(ret)
  86. }
  87. }
  88. //
  89. func (bs *BaseService) startServer(fn GrpcRegisteFn) {
  90. //监听端口
  91. lis, err := net.Listen("tcp", fmt.Sprintf(":%d", bs.ServicePort))
  92. if err != nil {
  93. log.Fatalln(err.Error())
  94. return
  95. }
  96. //创建一个grpc 服务器
  97. s := grpc.NewServer()
  98. //注册事件
  99. err = fn(s)
  100. //
  101. if err != nil {
  102. log.Fatalln(err.Error())
  103. return
  104. }
  105. //处理链接
  106. _ = s.Serve(lis)
  107. }
  108. //服务启动
  109. func (bs *BaseService) Run(fn GrpcRegisteFn) {
  110. go bs.startServer(fn) //启动服务
  111. go bs.heartbeat() //心跳
  112. bs.registe()
  113. }