瀏覽代碼

添加golang封装

Tao Zhang 5 年之前
父節點
當前提交
2c5472afe5
共有 8 個文件被更改,包括 230 次插入141 次删除
  1. 19 39
      demo/client/main.go
  2. 3 3
      demo/ocr_client/main.go
  3. 13 89
      demo/service/main.go
  4. 0 5
      proto_src/heartbeat.proto
  5. 0 5
      proto_src/serverload.proto
  6. 68 0
      util/clientutil.go
  7. 123 0
      util/serviceutil.go
  8. 4 0
      util/util.go

+ 19 - 39
demo/client/main.go

@@ -5,6 +5,7 @@ package main
 */
 import (
 	"app.yhyue.com/BP/servicerd/proto"
+	"app.yhyue.com/BP/servicerd/util"
 	"context"
 	"flag"
 	"google.golang.org/grpc"
@@ -14,61 +15,40 @@ import (
 
 //服务地址配置
 var (
-	rdserver    = flag.String("rd", "127.0.0.1:10021", "服务治理地址")
+	rdserver    = flag.String("rd", "192.168.3.240:10021", "服务治理地址")
 	balancetype = flag.Int("balance", 0, "负载策略 0=随机 1=服务器压力均衡 2=轮训占用")
 	threads     = flag.Int("threads", 20, "压力并发数")
 	requests    = flag.Int("reqs", 100, "单个线程服务请求次数")
 )
+var cu *util.ClientUtil
 
 func init() {
 	flag.Parse()
-
-}
-
-func run(thread int, wg *sync.WaitGroup) {
-	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
+	var err error
+	cu, err = util.NewClient(*rdserver)
 	if err != nil {
-		return
+		panic(err)
 	}
+}
 
+func run(thread int, wg *sync.WaitGroup) {
 	defer func(wg *sync.WaitGroup) {
-		conn.Close()
 		wg.Done()
 	}(wg)
-	var client proto.ServiceClient
-	client = proto.NewServiceClient(conn)
 
 	for i := 0; i < *requests; i++ {
-		repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "demo", Balance: int32(*balancetype)})
-		if err != nil {
-			log.Println("出错了")
-			log.Fatalln(err.Error())
-		}
-		log.Println("结果", thread, repl.Addr, repl.ResourceId)
-		//TODO 业务调用
-		conn, err := grpc.Dial(repl.Addr, grpc.WithInsecure())
-		if err != nil {
-			return
-		}
-		defer conn.Close()
-		demo_client := proto.NewDemoServiceClient(conn)
-		demo_repl, err := demo_client.Say(context.Background(), &proto.DemoReq{
-			Name: "张三",
+		_, _ = cu.Run("demo", 10, func(conn *grpc.ClientConn, args ...interface{}) (interface{}, error) {
+			demo_client := proto.NewDemoServiceClient(conn)
+			demo_repl, err := demo_client.Say(context.Background(), &proto.DemoReq{
+				Name: "张三",
+			})
+			if err != nil {
+				log.Println(err.Error())
+			} else {
+				log.Println("back::", thread, demo_repl.Data)
+			}
+			return nil, nil
 		})
-		if err != nil {
-			log.Println(err.Error())
-		} else {
-			log.Println("back::", thread, demo_repl.Data)
-		}
-		//time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
-		//只有使用SEQ负载模式,需要调用释放资源
-		release_repl, err := client.Release(context.Background(), &proto.StringReqData{Data: repl.ResourceId})
-		if err != nil {
-			log.Println("出错了")
-			log.Fatalln(err.Error())
-		} else {
-			log.Println(thread, release_repl.Data)
-		}
 	}
 }
 

+ 3 - 3
demo/ocr_client/main.go

@@ -65,7 +65,7 @@ func ocr() {
 }
 
 func main() {
-	for i := 0; i < 20; i++ {
-		ocr()
-	}
+	//for i := 0; i < 20; i++ {
+	ocr()
+	//}
 }

+ 13 - 89
demo/service/main.go

@@ -5,25 +5,24 @@ package main
 
 import (
 	"app.yhyue.com/BP/servicerd/proto"
+	"app.yhyue.com/BP/servicerd/util"
 	"context"
 	"flag"
-	"fmt"
 	"google.golang.org/grpc"
 	"log"
-	"net"
 	"os"
 	"os/signal"
-	"time"
 )
 
 //服务地址配置
 var (
 	ip          = flag.String("ip", "192.168.20.100", "本机ip")
 	port        = flag.Int("port", 20153, "服务端口")
-	rdserver    = flag.String("rd", "127.0.0.1:10021", "服务治理地址")
+	rdserver    = flag.String("rd", "192.168.3.240:10021", "服务治理地址")
 	serviceName = flag.String("name", "demo", "服务名称")
 )
 
+//实际业务实现
 type DemoService struct {
 }
 
@@ -32,98 +31,23 @@ func (s *DemoService) Say(ctx context.Context, in *proto.DemoReq) (*proto.DemoRe
 	return &proto.DemoRep{Data: in.Name}, nil
 }
 
-//
-func heartbeat() {
-	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
-	if err != nil {
-		return
-	}
-	defer conn.Close()
-	client := proto.NewHeartBeatClient(conn)
-	//调用服务端推送流
-	resp, _ := client.PutStream(context.Background())
-	tm := time.NewTicker(20 * time.Second)
-	for {
-		select {
-		case <-tm.C:
-			reqstreamData := &proto.StreamReqData{ServiceName: *serviceName, ServiceAddr: fmt.Sprintf("%s:%d", *ip, *port)}
-			_ = resp.Send(reqstreamData)
-		}
-	}
-}
-
-//服务注册
-func registe() {
-	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
-	if err != nil {
-		return
-	}
-	defer conn.Close()
-	client := proto.NewServiceClient(conn)
-	ret, err := client.Registe(context.TODO(), &proto.ServiceMeta{
-		Name:    *serviceName,
-		Ip:      *ip,
-		Port:    int32(*port),
-		Balance: 0,
-		Workers: 5,
-	})
-	if err != nil {
-		log.Println(err.Error())
-	} else {
-		log.Println(ret)
-	}
-
-}
-
-//服务注销
-func destory() {
-	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
-	if err != nil {
-		return
-	}
-	defer conn.Close()
-	client := proto.NewServiceClient(conn)
-	ret, err := client.Destory(context.TODO(), &proto.ServiceMeta{
-		Name:    *serviceName,
-		Ip:      *ip,
-		Port:    int32(*port),
-		Balance: 0,
-		Workers: 5,
-	})
-	if err != nil {
-		log.Println(err.Error())
-	} else {
-		log.Println(ret)
-	}
-}
-
-//服务启动
-func startServer() {
-	//监听端口
-	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
-	if err != nil {
-		log.Fatalln(err.Error())
-		return
-	}
-	//创建一个grpc 服务器
-	s := grpc.NewServer()
-	//注册事件
-	proto.RegisterDemoServiceServer(s, &DemoService{})
-	//处理链接
-	_ = s.Serve(lis)
-}
-
 func init() {
 	flag.Parse()
 }
 
 func main() {
-	go startServer()
-	go heartbeat() //心跳
-	registe()
+	bs := util.NewService(*rdserver, *serviceName,
+		*ip, *port,
+		20, 10)
+	bs.Run(func(s *grpc.Server) error { //服务注册,服务心跳,都在这个里边维护
+		//TODO 服务注册 ,当然,这里可以注册多个协议实现
+		proto.RegisterDemoServiceServer(s, new(DemoService))
+		return nil
+	})
 	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt, os.Kill)
 	<-c
+	//执行清理操作
 	log.Println("服务注销")
-	destory()
+	bs.Destory()
 }

+ 0 - 5
proto_src/heartbeat.proto

@@ -5,12 +5,7 @@ package proto;
 
 //心跳检测
 service HeartBeat {
-  /*
-  以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
-  */
-  //rpc GetStream (StreamReqData) returns (stream StreamResData){}
   rpc PutStream (stream StreamReqData) returns (StreamResData){}
-  //rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
 }
 
 

+ 0 - 5
proto_src/serverload.proto

@@ -5,12 +5,7 @@ package proto;
 
 //服务器负载数据上报
 service ServerLoad {
-  /*
-  以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
-  */
-  //rpc GetStream (StreamReqData) returns (stream StreamResData){}
   rpc PutStream (stream ServerLoadStreamReqData) returns (ServerLoadStreamResData){}
-  //rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
 }
 
 

+ 68 - 0
util/clientutil.go

@@ -0,0 +1,68 @@
+/**
+客户端封装
+*/
+package util
+
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"context"
+	"errors"
+	"google.golang.org/grpc"
+	"log"
+	"time"
+)
+
+const (
+	//TODO 目前实现前2中
+	ERROR_ERTRY_TYPE_FIRST    = iota //1次,找到第一个服务提供者,出错即返回
+	ERROR_ERTRY_TYPE_FIRST_5         //5次尝试,找到第一个服务提供者
+	ERROR_ERTRY_TYPE_OVER_ALL        //尝试遍历所有服务,直至又一个提供正常返回
+)
+
+//调用
+type CallFn func(conn *grpc.ClientConn, args ...interface{}) (interface{}, error)
+
+type ClientUtil struct {
+	RdServer      string
+	serviceConn   *grpc.ClientConn
+	serviceClient proto.ServiceClient
+}
+
+func NewClient(rdServer string) (*ClientUtil, error) {
+	c := &ClientUtil{RdServer: rdServer}
+	conn, err := grpc.Dial(rdServer, grpc.WithInsecure())
+	if err != nil {
+		return nil, err
+	}
+	client := proto.NewServiceClient(conn)
+	c.serviceConn = conn
+	c.serviceClient = client
+	return c, nil
+}
+
+func (cu *ClientUtil) Run(serviceName string, errRetryTime int, fn CallFn, args ...interface{}) (interface{}, error) {
+	defer func() {
+		if err := recover(); err != nil {
+			log.Println("捕获异常:", err)
+		}
+	}()
+	for i := 0; i < errRetryTime; i++ {
+		repl, err := cu.serviceClient.Apply(context.Background(), &proto.ApplyReqData{Name: serviceName, Balance: 0})
+		if err != nil {
+			log.Println("client-util", "无可用服务", err.Error())
+			time.Sleep(1 * time.Second)
+			continue
+		}
+		//TODO 业务调用
+		conn, err := grpc.Dial(repl.Addr, grpc.WithInsecure())
+		if err != nil {
+			log.Println("client-util", "连接到服务节点失败:", repl.Addr, err.Error())
+			time.Sleep(1 * time.Second)
+			continue
+		}
+		ret, err := fn(conn, args)
+		conn.Close()
+		return ret, err
+	}
+	return nil, errors.New("远程服务调用失败")
+}

+ 123 - 0
util/serviceutil.go

@@ -0,0 +1,123 @@
+/**
+服务工具包,提供对服务的封装
+*/
+package util
+
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"context"
+	"fmt"
+	"google.golang.org/grpc"
+	"log"
+	"net"
+	"time"
+)
+
+type GrpcRegisteFn func(s *grpc.Server) error
+
+type BaseService struct {
+	RdServer                                string             //服务治理中心节点地址
+	ServiceIp, ServiceName                  string             //
+	ServicePort, HeartbeatInterval, Workers int                //心跳间隔时间
+	meta                                    *proto.ServiceMeta //服务描述
+}
+
+//
+func NewService(rdServer, serviceName, serviceIp string, servicePort, heartbeatInterval, workers int) *BaseService {
+	return &BaseService{
+		RdServer:          rdServer,
+		ServiceIp:         serviceIp,
+		ServicePort:       servicePort,
+		HeartbeatInterval: heartbeatInterval,
+		ServiceName:       serviceName,
+		Workers:           workers,
+		meta: &proto.ServiceMeta{
+			Name:    serviceName,
+			Ip:      serviceIp,
+			Port:    int32(servicePort),
+			Workers: int32(workers),
+			Balance: 0,
+		},
+	}
+}
+
+//
+func (bs *BaseService) heartbeat() {
+	conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewHeartBeatClient(conn)
+	//调用服务端推送流
+	resp, _ := client.PutStream(context.Background())
+	tm := time.NewTicker(time.Duration(bs.HeartbeatInterval) * time.Second)
+	for {
+		select {
+		case <-tm.C:
+			reqstreamData := &proto.StreamReqData{ServiceName: bs.ServiceName, ServiceAddr: fmt.Sprintf("%s:%d", bs.ServiceIp, bs.ServicePort)}
+			_ = resp.Send(reqstreamData)
+		}
+	}
+}
+
+//服务注册
+func (bs *BaseService) registe() {
+	conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewServiceClient(conn)
+	ret, err := client.Registe(context.TODO(), bs.meta)
+	if err != nil {
+		log.Println(err.Error())
+	} else {
+		log.Println(ret)
+	}
+
+}
+
+//服务注销
+func (bs *BaseService) Destory() {
+	conn, err := grpc.Dial(bs.RdServer, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewServiceClient(conn)
+	ret, err := client.Destory(context.TODO(), bs.meta)
+	if err != nil {
+		log.Println(err.Error())
+	} else {
+		log.Println(ret)
+	}
+}
+
+//
+func (bs *BaseService) startServer(fn GrpcRegisteFn) {
+	//监听端口
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", bs.ServicePort))
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	//创建一个grpc 服务器
+	s := grpc.NewServer()
+	//注册事件
+	err = fn(s)
+	//
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	//处理链接
+	_ = s.Serve(lis)
+}
+
+//服务启动
+func (bs *BaseService) Run(fn GrpcRegisteFn) {
+	go bs.startServer(fn) //启动服务
+	go bs.heartbeat()     //心跳
+	bs.registe()
+}

+ 4 - 0
util/util.go

@@ -0,0 +1,4 @@
+/**
+服务治理工具包
+*/
+package util