Tao Zhang 5 년 전
부모
커밋
b374a0c1e0
4개의 변경된 파일46개의 추가작업 그리고 27개의 파일을 삭제
  1. 1 0
      .gitignore
  2. 20 16
      demo/client/main.go
  3. 1 0
      server/serviceService.go
  4. 24 11
      server/serviceStore.go

+ 1 - 0
.gitignore

@@ -1,3 +1,4 @@
 ##
 .idea
 bin
+dbtest

+ 20 - 16
demo/client/main.go

@@ -16,23 +16,27 @@ import (
 var (
 	rdserver = flag.String("rd", "127.0.0.1:10021", "服务治理地址")
 )
-var client proto.ServiceClient
 
 func init() {
 	flag.Parse()
+
+}
+
+func run(thread int, wg *sync.WaitGroup) {
 	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
 	if err != nil {
 		return
 	}
-	client = proto.NewServiceClient(conn)
-}
 
-func run(thread int, wg *sync.WaitGroup) {
 	defer func(wg *sync.WaitGroup) {
+		conn.Close()
 		wg.Done()
 	}(wg)
-	for i := 0; i < 200; i++ {
-		repl, err := client.Apply(context.TODO(), &proto.ApplyReqData{Name: "demo", Balance: 0})
+	var client proto.ServiceClient
+	client = proto.NewServiceClient(conn)
+
+	for i := 0; i < 20; i++ {
+		repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "demo", Balance: 2})
 		if err != nil {
 			log.Println("出错了")
 			log.Fatalln(err.Error())
@@ -45,7 +49,7 @@ func run(thread int, wg *sync.WaitGroup) {
 		}
 		defer conn.Close()
 		demo_client := proto.NewDemoServiceClient(conn)
-		demo_repl, err := demo_client.Say(context.TODO(), &proto.DemoReq{
+		demo_repl, err := demo_client.Say(context.Background(), &proto.DemoReq{
 			Name: "张三",
 		})
 		if err != nil {
@@ -53,21 +57,21 @@ func run(thread int, wg *sync.WaitGroup) {
 		} else {
 			log.Println("back::", thread, demo_repl.Data)
 		}
+		//time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
 		//只有使用SEQ负载模式,需要调用释放资源
-		/*
-			release_repl, err := client.Release(context.TODO(), &proto.StringReqData{Data: repl.ResourceId})
-			if err != nil {
-				log.Println("出错了")
-				log.Fatalln(err.Error())
-			} else {
-				log.Println(thread, release_repl.Data)
-			}*/
+		release_repl, err := client.Release(context.Background(), &proto.StringReqData{Data: repl.ResourceId})
+		if err != nil {
+			log.Println("出错了")
+			log.Fatalln(err.Error())
+		} else {
+			log.Println(thread, release_repl.Data)
+		}
 	}
 }
 
 func main() {
 	wg := new(sync.WaitGroup)
-	for i := 0; i < 5; i++ {
+	for i := 0; i < 3; i++ {
 		wg.Add(1)
 		go run(i, wg)
 	}

+ 1 - 0
server/serviceService.go

@@ -28,6 +28,7 @@ func (s *Service) Destory(ctx context.Context, meta *proto.ServiceMeta) (*proto.
 func (s *Service) Apply(ctx context.Context, in *proto.ApplyReqData) (*proto.ApplyRepData, error) {
 	var addr, id string
 	var err error
+	log.Println("------------")
 	if in.Balance == LOAD {
 		addr, id, err = ApplyWithLoad(in.Name)
 	} else if in.Balance == SEQ {

+ 24 - 11
server/serviceStore.go

@@ -157,11 +157,18 @@ func DestoryServiceMeta(ip string, port int32) {
 func ApplyWithNotUse(serviceName string) (string, string, error) {
 	txn := mdb.Txn(true)
 	defer txn.Commit()
-	item, err := txn.First("servicemeta", "name_used", serviceName, false)
+	//TODO 这个框架的First,多列索引有问题,换种思路实现
+	rs, err := txn.Get("servicemeta", "name", serviceName)
 	if err != nil {
 		log.Fatalln("applywithnotuse error ", err.Error())
 		return "", "", errors.New("没有可用服务")
 	}
+	filter := memdb.NewFilterIterator(rs, seqFilterFactory())
+	item := filter.Next()
+	if item == nil {
+		log.Fatalln("applywithnotuse error 找不到可用服务")
+		return "", "", errors.New("没有可用服务")
+	}
 	sm := item.(*ServiceMeta)
 	sm.Used = true
 	sm.LastUseTime = time.Now().Unix()
@@ -196,17 +203,13 @@ func ApplyWithRandom(serviceName string) (string, string, error) {
 	if err != nil {
 		return "", "", errors.New("没有可用服务")
 	}
-	addrs := map[string]bool{}
+	addrs := make([][2]string, 0, 0)
 	for obj := rs.Next(); obj != nil; obj = rs.Next() {
 		meta := obj.(*ServiceMeta)
-		addrs[meta.Addr] = true
-	}
-	addrArray := make([]string, 0, 0)
-	for k, _ := range addrs {
-		addrArray = append(addrArray, k)
+		addrs = append(addrs, [2]string{meta.Addr, meta.Id})
 	}
-	index := rand.Intn(len(addrArray))
-	return addrArray[index], "", nil
+	index := rand.Intn(len(addrs))
+	return addrs[index][0], addrs[index][1], nil
 }
 
 //释放资源
@@ -214,8 +217,7 @@ func Release(serviceId string) {
 	txn := mdb.Txn(true)
 	defer txn.Commit()
 	item, err := txn.First("servicemeta", "id", serviceId)
-	if err != nil {
-		log.Fatalln("release error", err.Error())
+	if err != nil || item == nil {
 		return
 	}
 	sm := item.(*ServiceMeta)
@@ -273,6 +275,17 @@ func timeoutFilterFactory(timeout int64) func(interface{}) bool {
 	}
 }
 
+//过滤器工厂
+func seqFilterFactory() func(interface{}) bool {
+	return func(raw interface{}) bool {
+		obj, ok := raw.(*ServiceMeta)
+		if !ok {
+			return false
+		}
+		return obj.Used
+	}
+}
+
 //
 //func main() {
 //	InitDb()