123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- package main
- /**
- 服务管理,存储服务
- TODO go-memdb这个内存数据库没有排序????
- */
- import (
- "errors"
- "fmt"
- "github.com/hashicorp/go-memdb"
- "log"
- "math/rand"
- "time"
- )
- const (
- RANDOM = iota //随机
- LOAD //按服务器压力分配
- SEQ //顺序执行 ,根据服务支持的执行单元数
- )
- type ServiceMeta struct {
- Name string
- Addr string
- Workers int32
- BalanceType int32
- Load float64
- Ip string
- Port int32
- Id string
- Used bool
- LastUseTime int64 //最后一次使用
- LastTtlTime int64 //心跳时间
- }
- //
- func (sm *ServiceMeta) String() string {
- return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
- }
- var mdb *memdb.MemDB
- func InitDb() {
- // Create the DB schema
- schema := &memdb.DBSchema{
- Tables: map[string]*memdb.TableSchema{
- "servicemeta": &memdb.TableSchema{
- Name: "servicemeta",
- Indexes: map[string]*memdb.IndexSchema{
- "id": &memdb.IndexSchema{
- Name: "id",
- Unique: true,
- Indexer: &memdb.StringFieldIndex{Field: "Id"},
- },
- "ip": &memdb.IndexSchema{
- Name: "ip",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Ip"},
- },
- "addr": &memdb.IndexSchema{
- Name: "addr",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Addr"},
- },
- "name": &memdb.IndexSchema{
- Name: "name",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Name"},
- },
- "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
- Name: "name_used",
- Unique: false,
- Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
- &memdb.StringFieldIndex{Field: "Name"},
- &memdb.BoolFieldIndex{Field: "Used"},
- }},
- },
- "ip_port": &memdb.IndexSchema{
- Name: "ip_port",
- Unique: false,
- Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
- &memdb.StringFieldIndex{Field: "Ip"},
- &memdb.IntFieldIndex{Field: "Port"},
- }},
- },
- },
- },
- },
- }
- // Create a new data base
- var err error
- mdb, err = memdb.NewMemDB(schema)
- if err != nil {
- panic(err)
- }
- }
- //添加服务
- func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
- now := time.Now().Unix()
- txn := mdb.Txn(true)
- defer txn.Commit()
- for i := 0; i < int(workers); i++ {
- sm := &ServiceMeta{
- Name: serviceName,
- Ip: ip,
- Port: port,
- Workers: workers,
- BalanceType: balance,
- LastTtlTime: now,
- Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1),
- Addr: fmt.Sprintf("%s:%d", ip, port),
- }
- err := txn.Insert("servicemeta", sm)
- if err != nil {
- log.Fatalln("insert service meta error", err.Error())
- }
- log.Println("insert service meta", sm.String())
- }
- }
- //更新服务有效期
- func UpdateServiceMetaTtl(serviceAddr string) {
- now := time.Now().Unix()
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "addr", serviceAddr)
- if err != nil {
- log.Fatalln("update service ttl error", err.Error())
- return
- }
- if rs == nil {
- log.Println("serviceAddr", serviceAddr, "找不到")
- return
- }
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- sm := obj.(*ServiceMeta)
- sm.LastTtlTime = now
- _ = txn.Insert("servicemeta", sm)
- log.Println("update service meta ttl", sm.String())
- }
- }
- //注销服务
- func DestoryServiceMeta(ip string, port int32) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
- if err != nil {
- log.Fatalln("destory service error", err.Error())
- }
- log.Println("remove service meta", ip, port)
- }
- //申请调用,找未使用的那个资源
- func ApplyWithNotUse(serviceName string) (string, string, error) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- //TODO 这个框架的First,多列索引有问题,换种思路实现
- rs, err := txn.Get("servicemeta", "name", serviceName)
- if err != nil {
- log.Fatalln("applywithnotuse error ", err.Error())
- return "", "", errors.New("没有可用服务")
- }
- filter := memdb.NewFilterIterator(rs, seqFilterFactory())
- item := filter.Next()
- if item == nil {
- log.Println("applywithnotuse error 找不到可用服务")
- return "", "", errors.New("没有可用服务")
- }
- sm := item.(*ServiceMeta)
- sm.Used = true
- sm.LastUseTime = time.Now().Unix()
- _ = txn.Insert("servicemeta", sm)
- return sm.Addr, sm.Id, nil
- }
- //申请调用,找负载最低的那个服务器
- func ApplyWithLoad(serviceName string) (string, string, error) {
- txn := mdb.Txn(false)
- defer txn.Abort()
- rs, err := txn.Get("servicemeta", "name", serviceName)
- if err != nil {
- return "", "", errors.New("没有可用服务")
- }
- var addr, id string
- var load float64 = 1000
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- if meta.Load < load { //找负载最小的那个
- load, addr, id = meta.Load, meta.Addr, meta.Id
- }
- }
- return addr, id, nil
- }
- //申请调用,随机找一个服务器
- func ApplyWithRandom(serviceName string) (string, string, error) {
- txn := mdb.Txn(false)
- defer txn.Abort()
- rs, err := txn.Get("servicemeta", "name", serviceName)
- if err != nil {
- return "", "", errors.New("没有可用服务")
- }
- addrs := make([][2]string, 0, 0)
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- addrs = append(addrs, [2]string{meta.Addr, meta.Id})
- }
- index := rand.Intn(len(addrs))
- return addrs[index][0], addrs[index][1], nil
- }
- //释放资源
- func Release(serviceId string) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- item, err := txn.First("servicemeta", "id", serviceId)
- if err != nil || item == nil {
- return
- }
- sm := item.(*ServiceMeta)
- sm.Used = false
- sm.LastUseTime = time.Now().Unix()
- _ = txn.Insert("servicemeta", sm)
- }
- //更新所有服务负载
- func UpdateServerLoad(ip string, load float64) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "ip", ip)
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- meta.Load = load
- _ = txn.Insert("servicemeta", meta)
- }
- }
- //
- func removeTimeoutService(timeout int64) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "name", "")
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
- ids := make([]string, 0, 0)
- for obj := filter.Next(); obj != nil; obj = filter.Next() {
- meta := obj.(*ServiceMeta)
- ids = append(ids, meta.Id)
- }
- log.Println("清理的服务", ids)
- for _, id := range ids {
- _, _ = txn.DeleteAll("servicemeta", "id", id)
- }
- }
- //过滤器工厂
- func timeoutFilterFactory(timeout int64) func(interface{}) bool {
- limit := timeout
- return func(raw interface{}) bool {
- obj, ok := raw.(*ServiceMeta)
- if !ok {
- return false
- }
- return !(time.Now().Unix()-obj.LastTtlTime > limit)
- }
- }
- //过滤器工厂
- func seqFilterFactory() func(interface{}) bool {
- return func(raw interface{}) bool {
- obj, ok := raw.(*ServiceMeta)
- if !ok {
- return false
- }
- return obj.Used
- }
- }
- //过期使用资源过滤器
- func usedTimeoutFilterFactory(timeout int64) func(interface{}) bool {
- limit := timeout
- return func(raw interface{}) bool {
- obj, ok := raw.(*ServiceMeta)
- if !ok {
- return false
- }
- return !(obj.Used && time.Now().Unix()-obj.LastUseTime > limit)
- }
- }
- //重置长期占用资源状态
- func resetResource4Timeout(timeout int64) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "name", "")
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- filter := memdb.NewFilterIterator(rs, usedTimeoutFilterFactory(timeout))
- for obj := filter.Next(); obj != nil; obj = filter.Next() {
- meta := obj.(*ServiceMeta)
- meta.Used = false
- _ = txn.Insert("servicemeta", meta)
- }
- }
- //过期资源占用
- func ClearTimeoutUsedResource(ttl int64) {
- timeout := ttl * 2
- tm := time.NewTicker(time.Duration(ttl) * time.Second)
- for {
- select {
- case <-tm.C:
- //TODO 过滤过期服务
- resetResource4Timeout(timeout)
- }
- }
- }
- //过期服务清理
- func ClearTimeoutService(ttl int64) {
- timeout := ttl * 2
- tm := time.NewTicker(time.Duration(ttl) * time.Second)
- for {
- select {
- case <-tm.C:
- //TODO 过滤过期服务
- removeTimeoutService(timeout)
- }
- }
- }
|