serviceStore.go 8.3 KB


  1. package main
  2. /**
  3. 服务管理,存储服务
  4. TODO go-memdb这个内存数据库没有排序????
  5. */
  6. import (
  7. "errors"
  8. "fmt"
  9. "github.com/hashicorp/go-memdb"
  10. "log"
  11. "math/rand"
  12. "time"
  13. )
  14. const (
  15. RANDOM = iota //随机
  16. LOAD //按服务器压力分配
  17. SEQ //顺序执行 ,根据服务支持的执行单元数
  18. )
  19. type ServiceMeta struct {
  20. Name string
  21. Addr string
  22. Workers int32
  23. BalanceType int32
  24. Load float64
  25. Ip string
  26. Port int32
  27. Id string
  28. Used bool
  29. LastUseTime int64 //最后一次使用
  30. LastTtlTime int64 //心跳时间
  31. }
  32. //
  33. func (sm *ServiceMeta) String() string {
  34. return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
  35. }
  36. var mdb *memdb.MemDB
  37. func InitDb() {
  38. // Create the DB schema
  39. schema := &memdb.DBSchema{
  40. Tables: map[string]*memdb.TableSchema{
  41. "servicemeta": &memdb.TableSchema{
  42. Name: "servicemeta",
  43. Indexes: map[string]*memdb.IndexSchema{
  44. "id": &memdb.IndexSchema{
  45. Name: "id",
  46. Unique: true,
  47. Indexer: &memdb.StringFieldIndex{Field: "Id"},
  48. },
  49. "ip": &memdb.IndexSchema{
  50. Name: "ip",
  51. Unique: false,
  52. Indexer: &memdb.StringFieldIndex{Field: "Ip"},
  53. },
  54. "addr": &memdb.IndexSchema{
  55. Name: "addr",
  56. Unique: false,
  57. Indexer: &memdb.StringFieldIndex{Field: "Addr"},
  58. },
  59. "name": &memdb.IndexSchema{
  60. Name: "name",
  61. Unique: false,
  62. Indexer: &memdb.StringFieldIndex{Field: "Name"},
  63. },
  64. "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
  65. Name: "name_used",
  66. Unique: false,
  67. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  68. &memdb.StringFieldIndex{Field: "Name"},
  69. &memdb.BoolFieldIndex{Field: "Used"},
  70. }},
  71. },
  72. "ip_port": &memdb.IndexSchema{
  73. Name: "ip_port",
  74. Unique: false,
  75. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  76. &memdb.StringFieldIndex{Field: "Ip"},
  77. &memdb.IntFieldIndex{Field: "Port"},
  78. }},
  79. },
  80. },
  81. },
  82. },
  83. }
  84. // Create a new data base
  85. var err error
  86. mdb, err = memdb.NewMemDB(schema)
  87. if err != nil {
  88. panic(err)
  89. }
  90. }
  91. //添加服务
  92. func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
  93. now := time.Now().Unix()
  94. txn := mdb.Txn(true)
  95. defer txn.Commit()
  96. for i := 0; i < int(workers); i++ {
  97. sm := &ServiceMeta{
  98. Name: serviceName,
  99. Ip: ip,
  100. Port: port,
  101. Workers: workers,
  102. BalanceType: balance,
  103. LastTtlTime: now,
  104. Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1),
  105. Addr: fmt.Sprintf("%s:%d", ip, port),
  106. }
  107. log.Println("data:::", *sm)
  108. err := txn.Insert("servicemeta", sm)
  109. if err != nil {
  110. log.Fatalln("insert service meta error", err.Error())
  111. }
  112. log.Println("insert service meta", sm.String())
  113. }
  114. }
  115. //更新服务有效期
  116. func UpdateServiceMetaTtl(serviceAddr string) {
  117. now := time.Now().Unix()
  118. txn := mdb.Txn(true)
  119. defer txn.Commit()
  120. rs, err := txn.Get("servicemeta", "addr", serviceAddr)
  121. if err != nil {
  122. log.Fatalln("update service ttl error", err.Error())
  123. return
  124. }
  125. if rs == nil {
  126. log.Println("serviceAddr", serviceAddr, "找不到")
  127. return
  128. }
  129. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  130. sm := obj.(*ServiceMeta)
  131. sm.LastTtlTime = now
  132. _ = txn.Insert("servicemeta", sm)
  133. log.Println("update service meta ttl", sm.String())
  134. }
  135. }
  136. //注销服务
  137. func DestoryServiceMeta(ip string, port int32) {
  138. txn := mdb.Txn(true)
  139. defer txn.Commit()
  140. _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
  141. if err != nil {
  142. log.Fatalln("destory service error", err.Error())
  143. }
  144. log.Println("remove service meta", ip, port)
  145. }
  146. //申请调用,找未使用的那个资源
  147. func ApplyWithNotUse(serviceName string) (string, string, error) {
  148. txn := mdb.Txn(true)
  149. defer txn.Commit()
  150. //TODO 这个框架的First,多列索引有问题,换种思路实现
  151. rs, err := txn.Get("servicemeta", "name", serviceName)
  152. if err != nil {
  153. log.Fatalln("applywithnotuse error ", err.Error())
  154. return "", "", errors.New("没有可用服务")
  155. }
  156. filter := memdb.NewFilterIterator(rs, seqFilterFactory())
  157. item := filter.Next()
  158. if item == nil {
  159. log.Println("applywithnotuse error 找不到可用服务")
  160. return "", "", errors.New("没有可用服务")
  161. }
  162. sm := item.(*ServiceMeta)
  163. sm.Used = true
  164. sm.LastUseTime = time.Now().Unix()
  165. _ = txn.Insert("servicemeta", sm)
  166. return sm.Addr, sm.Id, nil
  167. }
  168. //申请调用,找负载最低的那个服务器
  169. func ApplyWithLoad(serviceName string) (string, string, error) {
  170. txn := mdb.Txn(false)
  171. defer txn.Abort()
  172. rs, err := txn.Get("servicemeta", "name", serviceName)
  173. if err != nil {
  174. return "", "", errors.New("没有可用服务")
  175. }
  176. var addr, id string
  177. var load float64 = 1000
  178. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  179. meta := obj.(*ServiceMeta)
  180. if meta.Load < load { //找负载最小的那个
  181. load, addr, id = meta.Load, meta.Addr, meta.Id
  182. }
  183. }
  184. return addr, id, nil
  185. }
  186. //申请调用,随机找一个服务器
  187. func ApplyWithRandom(serviceName string) (string, string, error) {
  188. txn := mdb.Txn(false)
  189. defer txn.Abort()
  190. rs, err := txn.Get("servicemeta", "name", serviceName)
  191. if err != nil {
  192. return "", "", errors.New("没有可用服务")
  193. }
  194. addrs := make([][2]string, 0, 0)
  195. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  196. meta := obj.(*ServiceMeta)
  197. addrs = append(addrs, [2]string{meta.Addr, meta.Id})
  198. }
  199. index := rand.Intn(len(addrs))
  200. return addrs[index][0], addrs[index][1], nil
  201. }
  202. //释放资源
  203. func Release(serviceId string) {
  204. txn := mdb.Txn(true)
  205. defer txn.Commit()
  206. item, err := txn.First("servicemeta", "id", serviceId)
  207. if err != nil || item == nil {
  208. return
  209. }
  210. sm := item.(*ServiceMeta)
  211. sm.Used = false
  212. sm.LastUseTime = time.Now().Unix()
  213. _ = txn.Insert("servicemeta", sm)
  214. }
  215. //更新所有服务负载
  216. func UpdateServerLoad(ip string, load float64) {
  217. txn := mdb.Txn(true)
  218. defer txn.Commit()
  219. rs, err := txn.Get("servicemeta", "ip", ip)
  220. if err != nil {
  221. log.Fatalln(err.Error())
  222. return
  223. }
  224. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  225. meta := obj.(*ServiceMeta)
  226. meta.Load = load
  227. _ = txn.Insert("servicemeta", meta)
  228. }
  229. }
  230. //
  231. func removeTimeoutService(timeout int64) {
  232. txn := mdb.Txn(true)
  233. defer txn.Commit()
  234. rs, err := txn.Get("servicemeta", "name", "")
  235. if err != nil {
  236. log.Fatalln(err.Error())
  237. return
  238. }
  239. filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
  240. ids := make([]string, 0, 0)
  241. for obj := filter.Next(); obj != nil; obj = filter.Next() {
  242. meta := obj.(*ServiceMeta)
  243. ids = append(ids, meta.Id)
  244. }
  245. log.Println("清理的服务", ids)
  246. for _, id := range ids {
  247. _, _ = txn.DeleteAll("servicemeta", "id", id)
  248. }
  249. }
  250. //过滤器工厂
  251. func timeoutFilterFactory(timeout int64) func(interface{}) bool {
  252. limit := timeout
  253. return func(raw interface{}) bool {
  254. obj, ok := raw.(*ServiceMeta)
  255. if !ok {
  256. return false
  257. }
  258. return !(time.Now().Unix()-obj.LastTtlTime > limit)
  259. }
  260. }
  261. //过滤器工厂
  262. func seqFilterFactory() func(interface{}) bool {
  263. return func(raw interface{}) bool {
  264. obj, ok := raw.(*ServiceMeta)
  265. if !ok {
  266. return false
  267. }
  268. return obj.Used
  269. }
  270. }
  271. //过期使用资源过滤器
  272. func usedTimeoutFilterFactory(timeout int64) func(interface{}) bool {
  273. limit := timeout
  274. return func(raw interface{}) bool {
  275. obj, ok := raw.(*ServiceMeta)
  276. if !ok {
  277. return false
  278. }
  279. return !(obj.Used && time.Now().Unix()-obj.LastUseTime > limit)
  280. }
  281. }
  282. //重置长期占用资源状态
  283. func resetResource4Timeout(timeout int64) {
  284. txn := mdb.Txn(true)
  285. defer txn.Commit()
  286. rs, err := txn.Get("servicemeta", "name", "")
  287. if err != nil {
  288. log.Fatalln(err.Error())
  289. return
  290. }
  291. filter := memdb.NewFilterIterator(rs, usedTimeoutFilterFactory(timeout))
  292. for obj := filter.Next(); obj != nil; obj = filter.Next() {
  293. meta := obj.(*ServiceMeta)
  294. meta.Used = false
  295. _ = txn.Insert("servicemeta", meta)
  296. }
  297. }
  298. //过期资源占用
  299. func ClearTimeoutUsedResource(ttl int64) {
  300. timeout := ttl * 2
  301. tm := time.NewTicker(time.Duration(ttl) * time.Second)
  302. for {
  303. select {
  304. case <-tm.C:
  305. //TODO 过滤过期服务
  306. resetResource4Timeout(timeout)
  307. }
  308. }
  309. }
  310. //过期服务清理
  311. func ClearTimeoutService(ttl int64) {
  312. timeout := ttl * 2
  313. tm := time.NewTicker(time.Duration(ttl) * time.Second)
  314. for {
  315. select {
  316. case <-tm.C:
  317. //TODO 过滤过期服务
  318. removeTimeoutService(timeout)
  319. }
  320. }
  321. }