123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- package main
- /**
- 服务管理,存储服务
- TODO go-memdb这个内存数据库没有排序????
- */
- import (
- "errors"
- "fmt"
- "github.com/hashicorp/go-memdb"
- "log"
- "math/rand"
- "time"
- )
- const (
- RANDOM = iota //随机
- LOAD //按服务器压力分配
- SEQ //顺序执行 ,根据服务支持的执行单元数
- )
- type ServiceMeta struct {
- Name string
- Addr string
- Workers int32
- BalanceType int32
- Load float64
- Ip string
- Port int32
- Id string
- Used bool
- LastUseTime int64 //最后一次使用
- LastTtlTime int64 //心跳时间
- }
- //
- func (sm *ServiceMeta) String() string {
- return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
- }
- var mdb *memdb.MemDB
- func InitDb() {
- // Create the DB schema
- schema := &memdb.DBSchema{
- Tables: map[string]*memdb.TableSchema{
- "servicemeta": &memdb.TableSchema{
- Name: "servicemeta",
- Indexes: map[string]*memdb.IndexSchema{
- "id": &memdb.IndexSchema{
- Name: "id",
- Unique: true,
- Indexer: &memdb.StringFieldIndex{Field: "Id"},
- },
- "ip": &memdb.IndexSchema{
- Name: "ip",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Ip"},
- },
- "addr": &memdb.IndexSchema{
- Name: "addr",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Addr"},
- },
- "name": &memdb.IndexSchema{
- Name: "name",
- Unique: false,
- Indexer: &memdb.StringFieldIndex{Field: "Name"},
- },
- "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
- Name: "name_used",
- Unique: false,
- Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
- &memdb.StringFieldIndex{Field: "Name"},
- &memdb.BoolFieldIndex{Field: "Used"},
- }},
- },
- "ip_port": &memdb.IndexSchema{
- Name: "ip_port",
- Unique: false,
- Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
- &memdb.StringFieldIndex{Field: "Ip"},
- &memdb.IntFieldIndex{Field: "Port"},
- }},
- },
- },
- },
- },
- }
- // Create a new data base
- var err error
- mdb, err = memdb.NewMemDB(schema)
- if err != nil {
- panic(err)
- }
- }
- //添加服务
- func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
- now := time.Now().Unix()
- txn := mdb.Txn(true)
- defer txn.Commit()
- for i := 0; i < int(workers); i++ {
- sm := &ServiceMeta{
- Name: serviceName,
- Ip: ip,
- Port: port,
- Workers: workers,
- BalanceType: balance,
- LastTtlTime: now,
- Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1),
- Addr: fmt.Sprintf("%s:%d", ip, port),
- }
- log.Println("data:::", *sm)
- err := txn.Insert("servicemeta", sm)
- if err != nil {
- log.Fatalln("insert service meta error", err.Error())
- }
- log.Println("insert service meta", sm.String())
- }
- }
- //更新服务有效期
- func UpdateServiceMetaTtl(serviceAddr string) {
- now := time.Now().Unix()
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "addr", serviceAddr)
- if err != nil {
- log.Fatalln("update service ttl error", err.Error())
- return
- }
- if rs == nil {
- log.Println("serviceAddr", serviceAddr, "找不到")
- return
- }
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- sm := obj.(*ServiceMeta)
- sm.LastTtlTime = now
- _ = txn.Insert("servicemeta", sm)
- log.Println("update service meta ttl", sm.String())
- }
- }
- //注销服务
- func DestoryServiceMeta(ip string, port int32) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
- if err != nil {
- log.Fatalln("destory service error", err.Error())
- }
- log.Println("remove service meta", ip, port)
- }
- //申请调用,找未使用的那个资源
- func ApplyWithNotUse(serviceName string) (string, string, error) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- item, err := txn.First("servicemeta", "name_used", serviceName, false)
- if err != nil {
- log.Fatalln("applywithnotuse error ", err.Error())
- return "", "", errors.New("没有可用服务")
- }
- sm := item.(*ServiceMeta)
- sm.Used = true
- sm.LastUseTime = time.Now().Unix()
- _ = txn.Insert("servicemeta", sm)
- return sm.Addr, sm.Id, nil
- }
- //申请调用,找负载最低的那个服务器
- func ApplyWithLoad(serviceName string) (string, string, error) {
- txn := mdb.Txn(false)
- defer txn.Abort()
- rs, err := txn.Get("servicemeta", "name", serviceName)
- if err != nil {
- return "", "", errors.New("没有可用服务")
- }
- var addr, id string
- var load float64 = 1000
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- if meta.Load < load { //找负载最小的那个
- load, addr, id = meta.Load, meta.Addr, meta.Id
- }
- }
- return addr, id, nil
- }
- //申请调用,随机找一个服务器
- func ApplyWithRandom(serviceName string) (string, string, error) {
- txn := mdb.Txn(false)
- defer txn.Abort()
- rs, err := txn.Get("servicemeta", "name", serviceName)
- if err != nil {
- return "", "", errors.New("没有可用服务")
- }
- addrs := map[string]bool{}
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- addrs[meta.Addr] = true
- }
- addrArray := make([]string, 0, 0)
- for k, _ := range addrs {
- addrArray = append(addrArray, k)
- }
- index := rand.Intn(len(addrArray))
- return addrArray[index], "", nil
- }
- //释放资源
- func Release(serviceId string) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- item, err := txn.First("servicemeta", "id", serviceId)
- if err != nil {
- log.Fatalln("release error", err.Error())
- return
- }
- sm := item.(*ServiceMeta)
- sm.Used = false
- sm.LastUseTime = time.Now().Unix()
- _ = txn.Insert("servicemeta", sm)
- }
- //更新所有服务负载
- func UpdateServerLoad(ip string, load float64) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "ip", ip)
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- for obj := rs.Next(); obj != nil; obj = rs.Next() {
- meta := obj.(*ServiceMeta)
- meta.Load = load
- _ = txn.Insert("servicemeta", meta)
- }
- }
- //
- func RemoveTimeoutService(timeout int64) {
- txn := mdb.Txn(true)
- defer txn.Commit()
- rs, err := txn.Get("servicemeta", "id", "")
- if err != nil {
- log.Fatalln(err.Error())
- return
- }
- filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
- ids := make([]string, 0, 0)
- for obj := filter.Next(); obj != nil; obj = filter.Next() {
- meta := obj.(*ServiceMeta)
- ids = append(ids, meta.Id)
- }
- log.Println("清理的服务", ids)
- for _, id := range ids {
- _, _ = txn.DeleteAll("servicemeta", "id", id)
- }
- }
- //过滤器工厂
- func timeoutFilterFactory(timeout int64) func(interface{}) bool {
- limit := timeout
- return func(raw interface{}) bool {
- obj, ok := raw.(*ServiceMeta)
- if !ok {
- return false
- }
- return time.Now().Unix()-obj.LastTtlTime < limit
- }
- }
- //
- //func main() {
- // InitDb()
- // txn := mdb.Txn(true)
- // for i := 0; i < 20; i++ {
- // sm := &ServiceMeta{Name: "ocr", Ip: "192.168.20.100", Port: 50050, Workers: 20, BalanceType: LOAD}
- // sm.Id = fmt.Sprintf("%s:%d_%d", sm.Ip, sm.Port, i+1)
- // sm.Addr = fmt.Sprintf("%s:%d", sm.Ip, sm.Port)
- // err := txn.Insert("servicemeta", sm)
- // if err != nil {
- // fmt.Println(err.Error())
- // }
- // }
- // txn.Commit()
- // txn = mdb.Txn(false)
- // defer txn.Abort()
- //
- // rs, err := txn.Get("servicemeta", "name", "ocr")
- // if err != nil {
- // fmt.Println(err.Error())
- // }
- //
- // for obj := rs.Next(); obj != nil; obj = rs.Next() {
- // meta := obj.(*ServiceMeta)
- // fmt.Println(meta.Name, meta.Addr, meta.Id)
- // }
- //
- //}
|