瀏覽代碼

添加服务占用回收策略

Tao Zhang 5 年之前
父節點
當前提交
e2c6caba9f
共有 3 個文件被更改,包括 48 次插入32 次删除
  1. 5 3
      server/main.go
  2. 1 1
      server/serviceService.go
  3. 42 28
      server/serviceStore.go

+ 5 - 3
server/main.go

@@ -9,8 +9,9 @@ import (
 )
 
 var (
-	addr       = flag.String("addr", ":10021", "监听地址")
-	serviceTtl = flag.Int64("ttl", 30, "服务失效时间(秒)")
+	addr                = flag.String("addr", ":10021", "监听地址")
+	serviceTtl          = flag.Int64("ttl", 30, "服务失效时间(秒)")
+	resourceUsedTimeout = flag.Int64("ruto", 120, "服务资源占用失效时间,过期自动收回(秒)")
 )
 
 func init() {
@@ -23,6 +24,7 @@ func main() {
 	InitDb()
 	//失效服务检查
 	go ClearTimeoutService(*serviceTtl)
+	go ClearTimeoutUsedResource(*resourceUsedTimeout)
 	//监听端口
 	lis, err := net.Listen("tcp", *addr)
 	if err != nil {
@@ -40,4 +42,4 @@ func main() {
 	proto.RegisterServiceServer(s, &Service{})
 	//处理链接
 	_ = s.Serve(lis)
-}
+}

+ 1 - 1
server/serviceService.go

@@ -36,7 +36,7 @@ func (s *Service) Apply(ctx context.Context, in *proto.ApplyReqData) (*proto.App
 		addr, id, err = ApplyWithRandom(in.Name)
 	}
 	if err != nil {
-		log.Fatalln(err.Error())
+		log.Println(err.Error())
 		return &proto.ApplyRepData{Addr: "", ResourceId: ""}, err
 	} else {
 		return &proto.ApplyRepData{Addr: addr, ResourceId: id}, nil

+ 42 - 28
server/serviceStore.go

@@ -286,31 +286,45 @@ func seqFilterFactory() func(interface{}) bool {
 	}
 }
 
-//
-//func main() {
-//	InitDb()
-//	txn := mdb.Txn(true)
-//	for i := 0; i < 20; i++ {
-//		sm := &ServiceMeta{Name: "ocr", Ip: "192.168.20.100", Port: 50050, Workers: 20, BalanceType: LOAD}
-//		sm.Id = fmt.Sprintf("%s:%d_%d", sm.Ip, sm.Port, i+1)
-//		sm.Addr = fmt.Sprintf("%s:%d", sm.Ip, sm.Port)
-//		err := txn.Insert("servicemeta", sm)
-//		if err != nil {
-//			fmt.Println(err.Error())
-//		}
-//	}
-//	txn.Commit()
-//	txn = mdb.Txn(false)
-//	defer txn.Abort()
-//
-//	rs, err := txn.Get("servicemeta", "name", "ocr")
-//	if err != nil {
-//		fmt.Println(err.Error())
-//	}
-//
-//	for obj := rs.Next(); obj != nil; obj = rs.Next() {
-//		meta := obj.(*ServiceMeta)
-//		fmt.Println(meta.Name, meta.Addr, meta.Id)
-//	}
-//
-//}
+//过期使用资源过滤器
+func usedTimeoutFilterFactory(timeout int64) func(interface{}) bool {
+	limit := timeout
+	return func(raw interface{}) bool {
+		obj, ok := raw.(*ServiceMeta)
+		if !ok {
+			return false
+		}
+		return !(obj.Used && time.Now().Unix()-obj.LastUseTime > limit)
+	}
+}
+
+//重置长期占用资源状态
+func resetResource4Timeout(timeout int64) {
+	txn := mdb.Txn(true)
+	defer txn.Commit()
+	rs, err := txn.Get("servicemeta", "name", "")
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	filter := memdb.NewFilterIterator(rs, usedTimeoutFilterFactory(timeout))
+	for obj := filter.Next(); obj != nil; obj = filter.Next() {
+		meta := obj.(*ServiceMeta)
+		meta.Used = false
+		_ = txn.Insert("servicemeta", meta)
+	}
+
+}
+
+//过期资源占用
+func ClearTimeoutUsedResource(ttl int64) {
+	timeout := ttl * 2
+	tm := time.NewTicker(time.Duration(ttl) * time.Second)
+	for {
+		select {
+		case <-tm.C:
+			//TODO 过滤过期服务
+			resetResource4Timeout(timeout)
+		}
+	}
+}