123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- /**
- 服务工具包,提供对服务的封装
- */
- package util
- import (
- "app.yhyue.com/BP/servicerd/proto"
- "context"
- "fmt"
- "google.golang.org/grpc"
- "log"
- "net"
- "time"
- )
- type GrpcRegisteFn func(s *grpc.Server) error
- type BaseService struct {
- RdServer string //服务治理中心节点地址
- ServiceIp, ServiceName string //
- ServicePort, HeartbeatInterval, Workers int //心跳间隔时间
- meta *proto.ServiceMeta //服务描述
- }
- //
- func NewService(rdServer, serviceName, serviceIp string, servicePort, heartbeatInterval, workers int) *BaseService {
- return &BaseService{
- RdServer: rdServer,
- ServiceIp: serviceIp,
- ServicePort: servicePort,
- HeartbeatInterval: heartbeatInterval,
- ServiceName: serviceName,
- Workers: workers,
- meta: &proto.ServiceMeta{
- Name: serviceName,
- Ip: serviceIp,
- Port: int32(servicePort),
- Workers: int32(workers),
- Balance: 0,
- },
- }
- }
- //
- func (bs *BaseService) heartbeat() {
- conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
- if err != nil {
- return
- }
- defer conn.Close()
- client := proto.NewHeartBeatClient(conn)
- //调用服务端推送流
- resp, _ := client.PutStream(context.Background())
- tm := time.NewTicker(time.Duration(bs.HeartbeatInterval) * time.Second)
- for {
- select {
- case <-tm.C:
- reqstreamData := &proto.StreamReqData{ServiceName: bs.ServiceName, ServiceAddr: fmt.Sprintf("%s:%d", bs.ServiceIp, bs.ServicePort)}
- _ = resp.Send(reqstreamData)
- }
- }
- }
- //服务注册
- func (bs *BaseService) registe() {
- conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
- if err != nil {
- return
- }
- defer conn.Close()
- client := proto.NewServiceClient(conn)
- ret, err := client.Registe(context.TODO(), bs.meta)
- if err != nil {
- log.Println(err.Error())
- } else {
- log.Println(ret)
- }
- }
- //服务注销
- func (bs *BaseService) Destory() {
- conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
- if err != nil {
- return
- }
- defer conn.Close()
- client := proto.NewServiceClient(conn)
- ret, err := client.Destory(context.TODO(), bs.meta)
- if err != nil {
- log.Println(err.Error())
- } else {
- log.Println(ret)
- }
- }
- //
- func (bs *BaseService) startServer(fn GrpcRegisteFn) {
- //监听端口
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", bs.ServicePort))
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- //创建一个grpc 服务器
- s := grpc.NewServer()
- //注册事件
- err = fn(s)
- //
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- //处理链接
- _ = s.Serve(lis)
- }
- //服务启动
- func (bs *BaseService) Run(fn GrpcRegisteFn) {
- go bs.startServer(fn) //启动服务
- go bs.heartbeat() //心跳
- bs.registe()
- }
|