/** 服务工具包,提供对服务的封装 */ package util import ( "app.yhyue.com/BP/servicerd/proto" "context" "fmt" "google.golang.org/grpc" "log" "net" "time" ) type GrpcRegisteFn func(s *grpc.Server) error type BaseService struct { RdServer string //服务治理中心节点地址 ServiceIp, ServiceName string // ServicePort, HeartbeatInterval, Workers int //心跳间隔时间 meta *proto.ServiceMeta //服务描述 } // func NewService(rdServer, serviceName, serviceIp string, servicePort, heartbeatInterval, workers int) *BaseService { return &BaseService{ RdServer: rdServer, ServiceIp: serviceIp, ServicePort: servicePort, HeartbeatInterval: heartbeatInterval, ServiceName: serviceName, Workers: workers, meta: &proto.ServiceMeta{ Name: serviceName, Ip: serviceIp, Port: int32(servicePort), Workers: int32(workers), Balance: 0, }, } } // func (bs *BaseService) heartbeat() { conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure()) if err != nil { return } defer conn.Close() client := proto.NewHeartBeatClient(conn) //调用服务端推送流 resp, _ := client.PutStream(context.Background()) tm := time.NewTicker(time.Duration(bs.HeartbeatInterval) * time.Second) for { select { case <-tm.C: reqstreamData := &proto.StreamReqData{ServiceName: bs.ServiceName, ServiceAddr: fmt.Sprintf("%s:%d", bs.ServiceIp, bs.ServicePort)} _ = resp.Send(reqstreamData) } } } //服务注册 func (bs *BaseService) registe() { conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure()) if err != nil { return } defer conn.Close() client := proto.NewServiceClient(conn) ret, err := client.Registe(context.TODO(), bs.meta) if err != nil { log.Println(err.Error()) } else { log.Println(ret) } } //服务注销 func (bs *BaseService) Destory() { conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure()) if err != nil { return } defer conn.Close() client := proto.NewServiceClient(conn) ret, err := client.Destory(context.TODO(), bs.meta) if err != nil { log.Println(err.Error()) } else { log.Println(ret) } } // func (bs *BaseService) startServer(fn GrpcRegisteFn) { //监听端口 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", bs.ServicePort)) if err != nil { log.Fatalln(err.Error()) return } //创建一个grpc 服务器 s := grpc.NewServer() //注册事件 err = fn(s) // if err != nil { log.Fatalln(err.Error()) return } //处理链接 _ = s.Serve(lis) } //服务启动 func (bs *BaseService) Run(fn GrpcRegisteFn) { go bs.startServer(fn) //启动服务 go bs.heartbeat() //心跳 bs.registe() }