serviceStore.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package main
  2. /**
  3. 服务管理,存储服务
  4. TODO go-memdb这个内存数据库没有排序????
  5. */
  6. import (
  7. "errors"
  8. "fmt"
  9. "github.com/hashicorp/go-memdb"
  10. "log"
  11. "math/rand"
  12. "time"
  13. )
  14. const (
  15. RANDOM = iota //随机
  16. LOAD //按服务器压力分配
  17. SEQ //顺序执行 ,根据服务支持的执行单元数
  18. )
  19. type ServiceMeta struct {
  20. Name string
  21. Addr string
  22. Workers int32
  23. BalanceType int32
  24. Load float64
  25. Ip string
  26. Port int32
  27. Id string
  28. Used bool
  29. LastUseTime int64 //最后一次使用
  30. LastTtlTime int64 //心跳时间
  31. }
  32. //
  33. func (sm *ServiceMeta) String() string {
  34. return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
  35. }
  36. var mdb *memdb.MemDB
  37. func InitDb() {
  38. // Create the DB schema
  39. schema := &memdb.DBSchema{
  40. Tables: map[string]*memdb.TableSchema{
  41. "servicemeta": &memdb.TableSchema{
  42. Name: "servicemeta",
  43. Indexes: map[string]*memdb.IndexSchema{
  44. "id": &memdb.IndexSchema{
  45. Name: "id",
  46. Unique: true,
  47. Indexer: &memdb.StringFieldIndex{Field: "Id"},
  48. },
  49. "ip": &memdb.IndexSchema{
  50. Name: "ip",
  51. Unique: false,
  52. Indexer: &memdb.StringFieldIndex{Field: "Ip"},
  53. },
  54. "addr": &memdb.IndexSchema{
  55. Name: "addr",
  56. Unique: false,
  57. Indexer: &memdb.StringFieldIndex{Field: "Addr"},
  58. },
  59. "name": &memdb.IndexSchema{
  60. Name: "name",
  61. Unique: false,
  62. Indexer: &memdb.StringFieldIndex{Field: "Name"},
  63. },
  64. "name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
  65. Name: "name_used",
  66. Unique: false,
  67. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  68. &memdb.StringFieldIndex{Field: "Name"},
  69. &memdb.BoolFieldIndex{Field: "Used"},
  70. }},
  71. },
  72. "ip_port": &memdb.IndexSchema{
  73. Name: "ip_port",
  74. Unique: false,
  75. Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
  76. &memdb.StringFieldIndex{Field: "Ip"},
  77. &memdb.IntFieldIndex{Field: "Port"},
  78. }},
  79. },
  80. },
  81. },
  82. },
  83. }
  84. // Create a new data base
  85. var err error
  86. mdb, err = memdb.NewMemDB(schema)
  87. if err != nil {
  88. panic(err)
  89. }
  90. }
  91. //添加服务
  92. func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
  93. now := time.Now().Unix()
  94. txn := mdb.Txn(true)
  95. defer txn.Commit()
  96. for i := 0; i < int(workers); i++ {
  97. sm := &ServiceMeta{
  98. Name: serviceName,
  99. Ip: ip,
  100. Port: port,
  101. Workers: workers,
  102. BalanceType: balance,
  103. LastTtlTime: now,
  104. Id: fmt.Sprintf("%s:%d_%d", ip, port, i+1),
  105. Addr: fmt.Sprintf("%s:%d", ip, port),
  106. }
  107. err := txn.Insert("servicemeta", sm)
  108. if err != nil {
  109. log.Fatalln("insert service meta error", err.Error())
  110. }
  111. log.Println("insert service meta", sm.String())
  112. }
  113. }
  114. //更新服务有效期
  115. func UpdateServiceMetaTtl(serviceAddr string) {
  116. now := time.Now().Unix()
  117. txn := mdb.Txn(true)
  118. defer txn.Commit()
  119. rs, err := txn.Get("servicemeta", "addr", serviceAddr)
  120. if err != nil {
  121. log.Fatalln("update service ttl error", err.Error())
  122. return
  123. }
  124. if rs == nil {
  125. log.Println("serviceAddr", serviceAddr, "找不到")
  126. return
  127. }
  128. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  129. sm := obj.(*ServiceMeta)
  130. sm.LastTtlTime = now
  131. _ = txn.Insert("servicemeta", sm)
  132. log.Println("update service meta ttl", sm.String())
  133. }
  134. }
  135. //注销服务
  136. func DestoryServiceMeta(ip string, port int32) {
  137. txn := mdb.Txn(true)
  138. defer txn.Commit()
  139. _, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
  140. if err != nil {
  141. log.Fatalln("destory service error", err.Error())
  142. }
  143. log.Println("remove service meta", ip, port)
  144. }
  145. //申请调用,找未使用的那个资源
  146. func ApplyWithNotUse(serviceName string) (string, string, error) {
  147. txn := mdb.Txn(true)
  148. defer txn.Commit()
  149. //TODO 这个框架的First,多列索引有问题,换种思路实现
  150. rs, err := txn.Get("servicemeta", "name", serviceName)
  151. if err != nil {
  152. log.Fatalln("applywithnotuse error ", err.Error())
  153. return "", "", errors.New("没有可用服务")
  154. }
  155. filter := memdb.NewFilterIterator(rs, seqFilterFactory())
  156. item := filter.Next()
  157. if item == nil {
  158. log.Println("applywithnotuse error 找不到可用服务")
  159. return "", "", errors.New("没有可用服务")
  160. }
  161. sm := item.(*ServiceMeta)
  162. sm.Used = true
  163. sm.LastUseTime = time.Now().Unix()
  164. _ = txn.Insert("servicemeta", sm)
  165. return sm.Addr, sm.Id, nil
  166. }
  167. //申请调用,找负载最低的那个服务器
  168. func ApplyWithLoad(serviceName string) (string, string, error) {
  169. txn := mdb.Txn(false)
  170. defer txn.Abort()
  171. rs, err := txn.Get("servicemeta", "name", serviceName)
  172. if err != nil {
  173. return "", "", errors.New("没有可用服务")
  174. }
  175. var addr, id string
  176. var load float64 = 1000
  177. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  178. meta := obj.(*ServiceMeta)
  179. if meta.Load < load { //找负载最小的那个
  180. load, addr, id = meta.Load, meta.Addr, meta.Id
  181. }
  182. }
  183. return addr, id, nil
  184. }
  185. //申请调用,随机找一个服务器
  186. func ApplyWithRandom(serviceName string) (string, string, error) {
  187. txn := mdb.Txn(false)
  188. defer txn.Abort()
  189. rs, err := txn.Get("servicemeta", "name", serviceName)
  190. if err != nil {
  191. return "", "", errors.New("没有可用服务")
  192. }
  193. addrs := make([][2]string, 0, 0)
  194. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  195. meta := obj.(*ServiceMeta)
  196. addrs = append(addrs, [2]string{meta.Addr, meta.Id})
  197. }
  198. index := rand.Intn(len(addrs))
  199. return addrs[index][0], addrs[index][1], nil
  200. }
  201. //释放资源
  202. func Release(serviceId string) {
  203. txn := mdb.Txn(true)
  204. defer txn.Commit()
  205. item, err := txn.First("servicemeta", "id", serviceId)
  206. if err != nil || item == nil {
  207. return
  208. }
  209. sm := item.(*ServiceMeta)
  210. sm.Used = false
  211. sm.LastUseTime = time.Now().Unix()
  212. _ = txn.Insert("servicemeta", sm)
  213. }
  214. //更新所有服务负载
  215. func UpdateServerLoad(ip string, load float64) {
  216. txn := mdb.Txn(true)
  217. defer txn.Commit()
  218. rs, err := txn.Get("servicemeta", "ip", ip)
  219. if err != nil {
  220. log.Fatalln(err.Error())
  221. return
  222. }
  223. for obj := rs.Next(); obj != nil; obj = rs.Next() {
  224. meta := obj.(*ServiceMeta)
  225. meta.Load = load
  226. _ = txn.Insert("servicemeta", meta)
  227. }
  228. }
  229. //
  230. func removeTimeoutService(timeout int64) {
  231. txn := mdb.Txn(true)
  232. defer txn.Commit()
  233. rs, err := txn.Get("servicemeta", "name", "")
  234. if err != nil {
  235. log.Fatalln(err.Error())
  236. return
  237. }
  238. filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
  239. ids := make([]string, 0, 0)
  240. for obj := filter.Next(); obj != nil; obj = filter.Next() {
  241. meta := obj.(*ServiceMeta)
  242. ids = append(ids, meta.Id)
  243. }
  244. log.Println("清理的服务", ids)
  245. for _, id := range ids {
  246. _, _ = txn.DeleteAll("servicemeta", "id", id)
  247. }
  248. }
  249. //过滤器工厂
  250. func timeoutFilterFactory(timeout int64) func(interface{}) bool {
  251. limit := timeout
  252. return func(raw interface{}) bool {
  253. obj, ok := raw.(*ServiceMeta)
  254. if !ok {
  255. return false
  256. }
  257. return !(time.Now().Unix()-obj.LastTtlTime > limit)
  258. }
  259. }
  260. //过滤器工厂
  261. func seqFilterFactory() func(interface{}) bool {
  262. return func(raw interface{}) bool {
  263. obj, ok := raw.(*ServiceMeta)
  264. if !ok {
  265. return false
  266. }
  267. return obj.Used
  268. }
  269. }
  270. //过期使用资源过滤器
  271. func usedTimeoutFilterFactory(timeout int64) func(interface{}) bool {
  272. limit := timeout
  273. return func(raw interface{}) bool {
  274. obj, ok := raw.(*ServiceMeta)
  275. if !ok {
  276. return false
  277. }
  278. return !(obj.Used && time.Now().Unix()-obj.LastUseTime > limit)
  279. }
  280. }
  281. //重置长期占用资源状态
  282. func resetResource4Timeout(timeout int64) {
  283. txn := mdb.Txn(true)
  284. defer txn.Commit()
  285. rs, err := txn.Get("servicemeta", "name", "")
  286. if err != nil {
  287. log.Fatalln(err.Error())
  288. return
  289. }
  290. filter := memdb.NewFilterIterator(rs, usedTimeoutFilterFactory(timeout))
  291. for obj := filter.Next(); obj != nil; obj = filter.Next() {
  292. meta := obj.(*ServiceMeta)
  293. meta.Used = false
  294. _ = txn.Insert("servicemeta", meta)
  295. }
  296. }
  297. //过期资源占用
  298. func ClearTimeoutUsedResource(ttl int64) {
  299. timeout := ttl * 2
  300. tm := time.NewTicker(time.Duration(ttl) * time.Second)
  301. for {
  302. select {
  303. case <-tm.C:
  304. //TODO 过滤过期服务
  305. resetResource4Timeout(timeout)
  306. }
  307. }
  308. }
  309. //过期服务清理
  310. func ClearTimeoutService(ttl int64) {
  311. timeout := ttl * 2
  312. tm := time.NewTicker(time.Duration(ttl) * time.Second)
  313. for {
  314. select {
  315. case <-tm.C:
  316. //TODO 过滤过期服务
  317. removeTimeoutService(timeout)
  318. }
  319. }
  320. }