serviceStore.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package main
  2. /**
  3. 服务管理,存储服务
  4. TODO go-memdb这个内存数据库没有排序????
  5. */
  6. import (
  7. "errors"
  8. "fmt"
  9. "github.com/hashicorp/go-memdb"
  10. "log"
  11. "math/rand"
  12. "time"
  13. )
  14. const (
  15. RANDOM = iota //随机
  16. LOAD //按服务器压力分配
  17. SEQ //顺序执行 ,根据服务支持的执行单元数
  18. )
  19. type ServiceMeta struct {
  20. Name string
  21. Addr string
  22. Workers int32
  23. BalanceType int32
  24. Load float64
  25. Ip string
  26. Port int32
  27. Id string
  28. Used bool
  29. LastUseTime int64 //最后一次使用
  30. LastTtlTime int64 //心跳时间
  31. }
  32. //
  33. func (sm *ServiceMeta) String() string {
  34. return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
  35. }
  36. var mdb *memdb.MemDB
  37. func InitDb() {
  38. // Create the DB schema
  39. schema := &memdb.DBSchema{
  40. Tables: map[string]*memdb.TableSchema{
  41. "servicemeta": &memdb.TableSchema{
  42. Name: "servicemeta",
  43. Indexes: map[string]*memdb.IndexSchema{
  44. "id": &memdb.IndexSchema{
  45. Name: "id",
  46. Unique: true,
  47. Indexer: &memdb.StringFieldIndex{Field: "Id"},
  48. },
  49. "ip": &memdb.IndexSchema{
  50. Name: "ip",
  51. Unique: false,
  52. Indexer: &memdb.StringFieldIndex{Field: "Ip"},
  53. },
  54. "addr": &memdb.IndexSchema{
  55. Name: "addr",
  56. Unique: false,
  57. Indexer: &memdb.StringFieldIndex{Field: "Addr"},
  58. },
  59. "name": &memdb.IndexSchema{
  60. Name: "name",
  61. Unique: false,
  62. Indexer: &memdb.StringFieldIndex{Field: "Name"},
  63. },
  64. "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
  65. Name: "name_used",
  66. Unique: false,
  67. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  68. &memdb.StringFieldIndex{Field: "Name"},
  69. &memdb.BoolFieldIndex{Field: "Used"},
  70. }},
  71. },
  72. "ip_port": &memdb.IndexSchema{
  73. Name: "ip_port",
  74. Unique: false,
  75. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  76. &memdb.StringFieldIndex{Field: "Ip"},
  77. &memdb.IntFieldIndex{Field: "Port"},
  78. }},
  79. },
  80. },
  81. },
  82. },
  83. }
  84. // Create a new data base
  85. var err error
  86. mdb, err = memdb.NewMemDB(schema)
  87. if err != nil {
  88. panic(err)
  89. }
  90. }
  91. //添加服务
  92. func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
  93. now := time.Now().Unix()
  94. txn := mdb.Txn(true)
  95. defer txn.Commit()
  96. for i := 0; i < int(workers); i++ {
  97. sm := &ServiceMeta{
  98. Name: serviceName,
  99. Ip: ip,
  100. Port: port,
  101. Workers: workers,
  102. BalanceType: balance,
  103. LastTtlTime: now,
  104. Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1),
  105. Addr: fmt.Sprintf("%s:%d", ip, port),
  106. }
  107. log.Println("data:::", *sm)
  108. err := txn.Insert("servicemeta", sm)
  109. if err != nil {
  110. log.Fatalln("insert service meta error", err.Error())
  111. }
  112. log.Println("insert service meta", sm.String())
  113. }
  114. }
  115. //更新服务有效期
  116. func UpdateServiceMetaTtl(serviceAddr string) {
  117. now := time.Now().Unix()
  118. txn := mdb.Txn(true)
  119. defer txn.Commit()
  120. rs, err := txn.Get("servicemeta", "addr", serviceAddr)
  121. if err != nil {
  122. log.Fatalln("update service ttl error", err.Error())
  123. return
  124. }
  125. if rs == nil {
  126. log.Println("serviceAddr", serviceAddr, "找不到")
  127. return
  128. }
  129. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  130. sm := obj.(*ServiceMeta)
  131. sm.LastTtlTime = now
  132. _ = txn.Insert("servicemeta", sm)
  133. log.Println("update service meta ttl", sm.String())
  134. }
  135. }
  136. //注销服务
  137. func DestoryServiceMeta(ip string, port int32) {
  138. txn := mdb.Txn(true)
  139. defer txn.Commit()
  140. _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
  141. if err != nil {
  142. log.Fatalln("destory service error", err.Error())
  143. }
  144. log.Println("remove service meta", ip, port)
  145. }
  146. //申请调用,找未使用的那个资源
  147. func ApplyWithNotUse(serviceName string) (string, string, error) {
  148. txn := mdb.Txn(true)
  149. defer txn.Commit()
  150. item, err := txn.First("servicemeta", "name_used", serviceName, false)
  151. if err != nil {
  152. log.Fatalln("applywithnotuse error ", err.Error())
  153. return "", "", errors.New("没有可用服务")
  154. }
  155. sm := item.(*ServiceMeta)
  156. sm.Used = true
  157. sm.LastUseTime = time.Now().Unix()
  158. _ = txn.Insert("servicemeta", sm)
  159. return sm.Addr, sm.Id, nil
  160. }
  161. //申请调用,找负载最低的那个服务器
  162. func ApplyWithLoad(serviceName string) (string, string, error) {
  163. txn := mdb.Txn(false)
  164. defer txn.Abort()
  165. rs, err := txn.Get("servicemeta", "name", serviceName)
  166. if err != nil {
  167. return "", "", errors.New("没有可用服务")
  168. }
  169. var addr, id string
  170. var load float64 = 1000
  171. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  172. meta := obj.(*ServiceMeta)
  173. if meta.Load < load { //找负载最小的那个
  174. load, addr, id = meta.Load, meta.Addr, meta.Id
  175. }
  176. }
  177. return addr, id, nil
  178. }
  179. //申请调用,随机找一个服务器
  180. func ApplyWithRandom(serviceName string) (string, string, error) {
  181. txn := mdb.Txn(false)
  182. defer txn.Abort()
  183. rs, err := txn.Get("servicemeta", "name", serviceName)
  184. if err != nil {
  185. return "", "", errors.New("没有可用服务")
  186. }
  187. addrs := map[string]bool{}
  188. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  189. meta := obj.(*ServiceMeta)
  190. addrs[meta.Addr] = true
  191. }
  192. addrArray := make([]string, 0, 0)
  193. for k, _ := range addrs {
  194. addrArray = append(addrArray, k)
  195. }
  196. index := rand.Intn(len(addrArray))
  197. return addrArray[index], "", nil
  198. }
  199. //释放资源
  200. func Release(serviceId string) {
  201. txn := mdb.Txn(true)
  202. defer txn.Commit()
  203. item, err := txn.First("servicemeta", "id", serviceId)
  204. if err != nil {
  205. log.Fatalln("release error", err.Error())
  206. return
  207. }
  208. sm := item.(*ServiceMeta)
  209. sm.Used = false
  210. sm.LastUseTime = time.Now().Unix()
  211. _ = txn.Insert("servicemeta", sm)
  212. }
  213. //更新所有服务负载
  214. func UpdateServerLoad(ip string, load float64) {
  215. txn := mdb.Txn(true)
  216. defer txn.Commit()
  217. rs, err := txn.Get("servicemeta", "ip", ip)
  218. if err != nil {
  219. log.Fatalln(err.Error())
  220. return
  221. }
  222. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  223. meta := obj.(*ServiceMeta)
  224. meta.Load = load
  225. _ = txn.Insert("servicemeta", meta)
  226. }
  227. }
  228. //
  229. func RemoveTimeoutService(timeout int64) {
  230. txn := mdb.Txn(true)
  231. defer txn.Commit()
  232. rs, err := txn.Get("servicemeta", "id", "")
  233. if err != nil {
  234. log.Fatalln(err.Error())
  235. return
  236. }
  237. filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
  238. ids := make([]string, 0, 0)
  239. for obj := filter.Next(); obj != nil; obj = filter.Next() {
  240. meta := obj.(*ServiceMeta)
  241. ids = append(ids, meta.Id)
  242. }
  243. log.Println("清理的服务", ids)
  244. for _, id := range ids {
  245. _, _ = txn.DeleteAll("servicemeta", "id", id)
  246. }
  247. }
  248. //过滤器工厂
  249. func timeoutFilterFactory(timeout int64) func(interface{}) bool {
  250. limit := timeout
  251. return func(raw interface{}) bool {
  252. obj, ok := raw.(*ServiceMeta)
  253. if !ok {
  254. return false
  255. }
  256. return time.Now().Unix()-obj.LastTtlTime < limit
  257. }
  258. }
  259. //
  260. //func main() {
  261. // InitDb()
  262. // txn := mdb.Txn(true)
  263. // for i := 0; i < 20; i++ {
  264. // sm := &ServiceMeta{Name: "ocr", Ip: "192.168.20.100", Port: 50050, Workers: 20, BalanceType: LOAD}
  265. // sm.Id = fmt.Sprintf("%s:%d_%d", sm.Ip, sm.Port, i+1)
  266. // sm.Addr = fmt.Sprintf("%s:%d", sm.Ip, sm.Port)
  267. // err := txn.Insert("servicemeta", sm)
  268. // if err != nil {
  269. // fmt.Println(err.Error())
  270. // }
  271. // }
  272. // txn.Commit()
  273. // txn = mdb.Txn(false)
  274. // defer txn.Abort()
  275. //
  276. // rs, err := txn.Get("servicemeta", "name", "ocr")
  277. // if err != nil {
  278. // fmt.Println(err.Error())
  279. // }
  280. //
  281. // for obj := rs.Next(); obj != nil; obj = rs.Next() {
  282. // meta := obj.(*ServiceMeta)
  283. // fmt.Println(meta.Name, meta.Addr, meta.Id)
  284. // }
  285. //
  286. //}