package main /** 服务管理,存储服务 TODO go-memdb这个内存数据库没有排序???? */ import ( "errors" "fmt" "github.com/hashicorp/go-memdb" "log" "math/rand" "time" ) const ( RANDOM = iota //随机 LOAD //按服务器压力分配 SEQ //顺序执行 ,根据服务支持的执行单元数 ) type ServiceMeta struct { Name string Addr string Workers int32 BalanceType int32 Load float64 Ip string Port int32 Id string Used bool LastUseTime int64 //最后一次使用 LastTtlTime int64 //心跳时间 } // func (sm *ServiceMeta) String() string { return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id) } var mdb *memdb.MemDB func InitDb() { // Create the DB schema schema := &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ "servicemeta": &memdb.TableSchema{ Name: "servicemeta", Indexes: map[string]*memdb.IndexSchema{ "id": &memdb.IndexSchema{ Name: "id", Unique: true, Indexer: &memdb.StringFieldIndex{Field: "Id"}, }, "ip": &memdb.IndexSchema{ Name: "ip", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Ip"}, }, "addr": &memdb.IndexSchema{ Name: "addr", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Addr"}, }, "name": &memdb.IndexSchema{ Name: "name", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Name"}, }, "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引 Name: "name_used", Unique: false, Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "Name"}, &memdb.BoolFieldIndex{Field: "Used"}, }}, }, "ip_port": &memdb.IndexSchema{ Name: "ip_port", Unique: false, Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "Ip"}, &memdb.IntFieldIndex{Field: "Port"}, }}, }, }, }, }, } // Create a new data base var err error mdb, err = memdb.NewMemDB(schema) if err != nil { panic(err) } } //添加服务 func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) { now := time.Now().Unix() txn := mdb.Txn(true) defer txn.Commit() for i := 0; i < int(workers); i++ { sm := &ServiceMeta{ Name: serviceName, Ip: ip, Port: port, Workers: workers, BalanceType: balance, LastTtlTime: now, Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1), Addr: fmt.Sprintf("%s:%d", ip, port), } err := txn.Insert("servicemeta", sm) if err != nil { log.Fatalln("insert service meta error", err.Error()) } log.Println("insert service meta", sm.String()) } } //更新服务有效期 func UpdateServiceMetaTtl(serviceAddr string) { now := time.Now().Unix() txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "addr", serviceAddr) if err != nil { log.Fatalln("update service ttl error", err.Error()) return } if rs == nil { log.Println("serviceAddr", serviceAddr, "找不到") return } for obj := rs.Next(); obj != nil; obj = rs.Next() { sm := obj.(*ServiceMeta) sm.LastTtlTime = now _ = txn.Insert("servicemeta", sm) log.Println("update service meta ttl", sm.String()) } } //注销服务 func DestoryServiceMeta(ip string, port int32) { txn := mdb.Txn(true) defer txn.Commit() _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port) if err != nil { log.Fatalln("destory service error", err.Error()) } log.Println("remove service meta", ip, port) } //申请调用,找未使用的那个资源 func ApplyWithNotUse(serviceName string) (string, string, error) { txn := mdb.Txn(true) defer txn.Commit() //TODO 这个框架的First,多列索引有问题,换种思路实现 rs, err := txn.Get("servicemeta", "name", serviceName) if err != nil { log.Fatalln("applywithnotuse error ", err.Error()) return "", "", errors.New("没有可用服务") } filter := memdb.NewFilterIterator(rs, seqFilterFactory()) item := filter.Next() if item == nil { log.Println("applywithnotuse error 找不到可用服务") return "", "", errors.New("没有可用服务") } sm := item.(*ServiceMeta) sm.Used = true sm.LastUseTime = time.Now().Unix() _ = txn.Insert("servicemeta", sm) return sm.Addr, sm.Id, nil } //申请调用,找负载最低的那个服务器 func ApplyWithLoad(serviceName string) (string, string, error) { txn := mdb.Txn(false) defer txn.Abort() rs, err := txn.Get("servicemeta", "name", serviceName) if err != nil { return "", "", errors.New("没有可用服务") } var addr, id string var load float64 = 1000 for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) if meta.Load < load { //找负载最小的那个 load, addr, id = meta.Load, meta.Addr, meta.Id } } return addr, id, nil } //申请调用,随机找一个服务器 func ApplyWithRandom(serviceName string) (string, string, error) { txn := mdb.Txn(false) defer txn.Abort() rs, err := txn.Get("servicemeta", "name", serviceName) if err != nil { return "", "", errors.New("没有可用服务") } addrs := make([][2]string, 0, 0) for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) addrs = append(addrs, [2]string{meta.Addr, meta.Id}) } index := rand.Intn(len(addrs)) return addrs[index][0], addrs[index][1], nil } //释放资源 func Release(serviceId string) { txn := mdb.Txn(true) defer txn.Commit() item, err := txn.First("servicemeta", "id", serviceId) if err != nil || item == nil { return } sm := item.(*ServiceMeta) sm.Used = false sm.LastUseTime = time.Now().Unix() _ = txn.Insert("servicemeta", sm) } //更新所有服务负载 func UpdateServerLoad(ip string, load float64) { txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "ip", ip) if err != nil { log.Fatalln(err.Error()) return } for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) meta.Load = load _ = txn.Insert("servicemeta", meta) } } // func removeTimeoutService(timeout int64) { txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "name", "") if err != nil { log.Fatalln(err.Error()) return } filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout)) ids := make([]string, 0, 0) for obj := filter.Next(); obj != nil; obj = filter.Next() { meta := obj.(*ServiceMeta) ids = append(ids, meta.Id) } log.Println("清理的服务", ids) for _, id := range ids { _, _ = txn.DeleteAll("servicemeta", "id", id) } } //过滤器工厂 func timeoutFilterFactory(timeout int64) func(interface{}) bool { limit := timeout return func(raw interface{}) bool { obj, ok := raw.(*ServiceMeta) if !ok { return false } return !(time.Now().Unix()-obj.LastTtlTime > limit) } } //过滤器工厂 func seqFilterFactory() func(interface{}) bool { return func(raw interface{}) bool { obj, ok := raw.(*ServiceMeta) if !ok { return false } return obj.Used } } //过期使用资源过滤器 func usedTimeoutFilterFactory(timeout int64) func(interface{}) bool { limit := timeout return func(raw interface{}) bool { obj, ok := raw.(*ServiceMeta) if !ok { return false } return !(obj.Used && time.Now().Unix()-obj.LastUseTime > limit) } } //重置长期占用资源状态 func resetResource4Timeout(timeout int64) { txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "name", "") if err != nil { log.Fatalln(err.Error()) return } filter := memdb.NewFilterIterator(rs, usedTimeoutFilterFactory(timeout)) for obj := filter.Next(); obj != nil; obj = filter.Next() { meta := obj.(*ServiceMeta) meta.Used = false _ = txn.Insert("servicemeta", meta) } } //过期资源占用 func ClearTimeoutUsedResource(ttl int64) { timeout := ttl * 2 tm := time.NewTicker(time.Duration(ttl) * time.Second) for { select { case <-tm.C: //TODO 过滤过期服务 resetResource4Timeout(timeout) } } } //过期服务清理 func ClearTimeoutService(ttl int64) { timeout := ttl * 2 tm := time.NewTicker(time.Duration(ttl) * time.Second) for { select { case <-tm.C: //TODO 过滤过期服务 removeTimeoutService(timeout) } } }