package main /** 服务管理,存储服务 TODO go-memdb这个内存数据库没有排序???? */ import ( "errors" "fmt" "github.com/hashicorp/go-memdb" "log" "math/rand" "time" ) const ( RANDOM = iota //随机 LOAD //按服务器压力分配 SEQ //顺序执行 ,根据服务支持的执行单元数 ) type ServiceMeta struct { Name string Addr string Workers int32 BalanceType int32 Load float64 Ip string Port int32 Id string Used bool LastUseTime int64 //最后一次使用 LastTtlTime int64 //心跳时间 } // func (sm *ServiceMeta) String() string { return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id) } var mdb *memdb.MemDB func InitDb() { // Create the DB schema schema := &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ "servicemeta": &memdb.TableSchema{ Name: "servicemeta", Indexes: map[string]*memdb.IndexSchema{ "id": &memdb.IndexSchema{ Name: "id", Unique: true, Indexer: &memdb.StringFieldIndex{Field: "Id"}, }, "ip": &memdb.IndexSchema{ Name: "ip", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Ip"}, }, "addr": &memdb.IndexSchema{ Name: "addr", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Addr"}, }, "name": &memdb.IndexSchema{ Name: "name", Unique: false, Indexer: &memdb.StringFieldIndex{Field: "Name"}, }, "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引 Name: "name_used", Unique: false, Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "Name"}, &memdb.BoolFieldIndex{Field: "Used"}, }}, }, "ip_port": &memdb.IndexSchema{ Name: "ip_port", Unique: false, Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "Ip"}, &memdb.IntFieldIndex{Field: "Port"}, }}, }, }, }, }, } // Create a new data base var err error mdb, err = memdb.NewMemDB(schema) if err != nil { panic(err) } } //添加服务 func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) { now := time.Now().Unix() txn := mdb.Txn(true) defer txn.Commit() for i := 0; i < int(workers); i++ { sm := &ServiceMeta{ Name: serviceName, Ip: ip, Port: port, Workers: workers, BalanceType: balance, LastTtlTime: now, Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1), Addr: fmt.Sprintf("%s:%d", ip, port), } log.Println("data:::", *sm) err := txn.Insert("servicemeta", sm) if err != nil { log.Fatalln("insert service meta error", err.Error()) } log.Println("insert service meta", sm.String()) } } //更新服务有效期 func UpdateServiceMetaTtl(serviceAddr string) { now := time.Now().Unix() txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "addr", serviceAddr) if err != nil { log.Fatalln("update service ttl error", err.Error()) return } if rs == nil { log.Println("serviceAddr", serviceAddr, "找不到") return } for obj := rs.Next(); obj != nil; obj = rs.Next() { sm := obj.(*ServiceMeta) sm.LastTtlTime = now _ = txn.Insert("servicemeta", sm) log.Println("update service meta ttl", sm.String()) } } //注销服务 func DestoryServiceMeta(ip string, port int32) { txn := mdb.Txn(true) defer txn.Commit() _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port) if err != nil { log.Fatalln("destory service error", err.Error()) } log.Println("remove service meta", ip, port) } //申请调用,找未使用的那个资源 func ApplyWithNotUse(serviceName string) (string, string, error) { txn := mdb.Txn(true) defer txn.Commit() item, err := txn.First("servicemeta", "name_used", serviceName, false) if err != nil { log.Fatalln("applywithnotuse error ", err.Error()) return "", "", errors.New("没有可用服务") } sm := item.(*ServiceMeta) sm.Used = true sm.LastUseTime = time.Now().Unix() _ = txn.Insert("servicemeta", sm) return sm.Addr, sm.Id, nil } //申请调用,找负载最低的那个服务器 func ApplyWithLoad(serviceName string) (string, string, error) { txn := mdb.Txn(false) defer txn.Abort() rs, err := txn.Get("servicemeta", "name", serviceName) if err != nil { return "", "", errors.New("没有可用服务") } var addr, id string var load float64 = 1000 for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) if meta.Load < load { //找负载最小的那个 load, addr, id = meta.Load, meta.Addr, meta.Id } } return addr, id, nil } //申请调用,随机找一个服务器 func ApplyWithRandom(serviceName string) (string, string, error) { txn := mdb.Txn(false) defer txn.Abort() rs, err := txn.Get("servicemeta", "name", serviceName) if err != nil { return "", "", errors.New("没有可用服务") } addrs := map[string]bool{} for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) addrs[meta.Addr] = true } addrArray := make([]string, 0, 0) for k, _ := range addrs { addrArray = append(addrArray, k) } index := rand.Intn(len(addrArray)) return addrArray[index], "", nil } //释放资源 func Release(serviceId string) { txn := mdb.Txn(true) defer txn.Commit() item, err := txn.First("servicemeta", "id", serviceId) if err != nil { log.Fatalln("release error", err.Error()) return } sm := item.(*ServiceMeta) sm.Used = false sm.LastUseTime = time.Now().Unix() _ = txn.Insert("servicemeta", sm) } //更新所有服务负载 func UpdateServerLoad(ip string, load float64) { txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "ip", ip) if err != nil { log.Fatalln(err.Error()) return } for obj := rs.Next(); obj != nil; obj = rs.Next() { meta := obj.(*ServiceMeta) meta.Load = load _ = txn.Insert("servicemeta", meta) } } // func RemoveTimeoutService(timeout int64) { txn := mdb.Txn(true) defer txn.Commit() rs, err := txn.Get("servicemeta", "id", "") if err != nil { log.Fatalln(err.Error()) return } filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout)) ids := make([]string, 0, 0) for obj := filter.Next(); obj != nil; obj = filter.Next() { meta := obj.(*ServiceMeta) ids = append(ids, meta.Id) } log.Println("清理的服务", ids) for _, id := range ids { _, _ = txn.DeleteAll("servicemeta", "id", id) } } //过滤器工厂 func timeoutFilterFactory(timeout int64) func(interface{}) bool { limit := timeout return func(raw interface{}) bool { obj, ok := raw.(*ServiceMeta) if !ok { return false } return time.Now().Unix()-obj.LastTtlTime < limit } } // //func main() { // InitDb() // txn := mdb.Txn(true) // for i := 0; i < 20; i++ { // sm := &ServiceMeta{Name: "ocr", Ip: "192.168.20.100", Port: 50050, Workers: 20, BalanceType: LOAD} // sm.Id = fmt.Sprintf("%s:%d_%d", sm.Ip, sm.Port, i+1) // sm.Addr = fmt.Sprintf("%s:%d", sm.Ip, sm.Port) // err := txn.Insert("servicemeta", sm) // if err != nil { // fmt.Println(err.Error()) // } // } // txn.Commit() // txn = mdb.Txn(false) // defer txn.Abort() // // rs, err := txn.Get("servicemeta", "name", "ocr") // if err != nil { // fmt.Println(err.Error()) // } // // for obj := rs.Next(); obj != nil; obj = rs.Next() { // meta := obj.(*ServiceMeta) // fmt.Println(meta.Name, meta.Addr, meta.Id) // } // //}