queueimpl.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package main
  2. import (
  3. "app.yhyue.com/BP/queued/proto"
  4. "log"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. CHAN_MAX_SIZE = 500
  11. PUBLISH_CHANNEL_ALL = iota //发送给本频道下所有人
  12. PUBLISH_CHANNEL_RANDOM //发送给本频道下随机1个人
  13. PUBLISH_CHANNEL_SEQ //发送给本频道下顺序1个人
  14. )
  15. //数据存储结构
  16. var cache *sync.Map
  17. //消费者
  18. type Consumer struct {
  19. Id string
  20. ReciveOutput chan *proto.PubReq
  21. Cancel chan bool
  22. IsUse bool
  23. }
  24. //频道
  25. type Channel struct {
  26. Id string
  27. Consumeres *sync.Map //id:Consumer
  28. }
  29. //
  30. type QueueImpl struct {
  31. }
  32. //初始化
  33. func init() {
  34. cache = new(sync.Map)
  35. }
  36. //
  37. func getChannel(key string) *Channel {
  38. if v, ok := cache.Load(key); ok {
  39. c := v.(*Channel)
  40. return c
  41. } else {
  42. c := &Channel{
  43. Id: key,
  44. Consumeres: new(sync.Map),
  45. }
  46. cache.Store(key, c)
  47. return c
  48. }
  49. }
  50. //发布任务
  51. func (q QueueImpl) Publish(server proto.QueueService_PublishServer) error {
  52. //TODO 生成用户ID
  53. //消息处理
  54. for {
  55. msg, err := server.Recv()
  56. if err != nil {
  57. break
  58. }
  59. err = deliverMsg(msg)
  60. if err != nil {
  61. break
  62. }
  63. }
  64. _ = server.SendAndClose(&proto.PubResp{
  65. Code: 500,
  66. Msg: "终止",
  67. })
  68. return nil
  69. }
  70. func (q QueueImpl) Receive(req *proto.RecvReq, server proto.QueueService_ReceiveServer) error {
  71. channel := getChannel(req.ChannelId)
  72. var consumer *Consumer
  73. if v, ok := channel.Consumeres.Load(req.Sender); ok {
  74. consumer, _ = v.(*Consumer)
  75. } else {
  76. consumer = &Consumer{
  77. Id: req.Sender,
  78. ReciveOutput: make(chan *proto.PubReq, CHAN_MAX_SIZE),
  79. }
  80. channel.Consumeres.Store(req.Sender, consumer)
  81. }
  82. var msg *proto.PubReq
  83. var err error
  84. Lab:
  85. for {
  86. select {
  87. case msg = <-consumer.ReciveOutput:
  88. err = server.Send(msg)
  89. if err != nil {
  90. break Lab
  91. }
  92. }
  93. }
  94. channel.Consumeres.Delete(consumer.Id)
  95. //消息重回队列,直到发送成功为止
  96. if msg != nil && err != nil {
  97. log.Println(" 发送失败,尝试重新发送")
  98. for {
  99. if err = deliverMsg(msg); err == nil {
  100. break
  101. }
  102. time.Sleep(1 * time.Second)
  103. }
  104. }
  105. return nil
  106. }
  107. //消息投递,发送给消息消费者
  108. func deliverMsg(msg *proto.PubReq) error {
  109. sendSuccess := false
  110. channel := getChannel(msg.ChannelId)
  111. if msg.PublishType == PUBLISH_CHANNEL_ALL {
  112. channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
  113. sendSuccess = true
  114. c := v.(*Consumer)
  115. c.ReciveOutput <- msg
  116. return true
  117. })
  118. } else if msg.PublishType == PUBLISH_CHANNEL_RANDOM {
  119. //随机
  120. keys := make([]string, 0, 0)
  121. channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
  122. key, _ := k.(string)
  123. keys = append(keys, key)
  124. return true
  125. })
  126. //机选1个消费者
  127. key := keys[rand.Intn(len(keys))]
  128. if v, ok := channel.Consumeres.Load(key); ok {
  129. c := v.(*Consumer)
  130. c.ReciveOutput <- msg
  131. sendSuccess = true
  132. }
  133. } else if msg.PublishType == PUBLISH_CHANNEL_SEQ {
  134. var consumer *Consumer
  135. channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
  136. c := v.(*Consumer)
  137. if !c.IsUse {
  138. consumer = c
  139. return false
  140. }
  141. return true
  142. })
  143. first := true
  144. if consumer == nil {
  145. channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
  146. c := v.(*Consumer)
  147. if first {
  148. c.IsUse = true
  149. consumer = c
  150. first = false
  151. } else {
  152. c.IsUse = false
  153. }
  154. return true
  155. })
  156. }
  157. if consumer != nil {
  158. consumer.ReciveOutput <- msg
  159. sendSuccess = true
  160. }
  161. }
  162. if !sendSuccess {
  163. log.Printf("msg id %s 发送失败,找不到合适的消费者\n", msg.Id)
  164. }
  165. return nil
  166. }