123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package main
- import (
- "app.yhyue.com/BP/queued/proto"
- "log"
- "math/rand"
- "sync"
- "time"
- )
- const (
- CHAN_MAX_SIZE = 500
- PUBLISH_CHANNEL_ALL = iota //发送给本频道下所有人
- PUBLISH_CHANNEL_RANDOM //发送给本频道下随机1个人
- PUBLISH_CHANNEL_SEQ //发送给本频道下顺序1个人
- )
- //数据存储结构
- var cache *sync.Map
- //消费者
- type Consumer struct {
- Id string
- ReciveOutput chan *proto.PubReq
- Cancel chan bool
- IsUse bool
- }
- //频道
- type Channel struct {
- Id string
- Consumeres *sync.Map //id:Consumer
- }
- //
- type QueueImpl struct {
- }
- //初始化
- func init() {
- cache = new(sync.Map)
- }
- //
- func getChannel(key string) *Channel {
- if v, ok := cache.Load(key); ok {
- c := v.(*Channel)
- return c
- } else {
- c := &Channel{
- Id: key,
- Consumeres: new(sync.Map),
- }
- cache.Store(key, c)
- return c
- }
- }
- //发布任务
- func (q QueueImpl) Publish(server proto.QueueService_PublishServer) error {
- //TODO 生成用户ID
- //消息处理
- for {
- msg, err := server.Recv()
- if err != nil {
- break
- }
- err = deliverMsg(msg)
- if err != nil {
- break
- }
- }
- _ = server.SendAndClose(&proto.PubResp{
- Code: 500,
- Msg: "终止",
- })
- return nil
- }
- func (q QueueImpl) Receive(req *proto.RecvReq, server proto.QueueService_ReceiveServer) error {
- channel := getChannel(req.ChannelId)
- var consumer *Consumer
- if v, ok := channel.Consumeres.Load(req.Sender); ok {
- consumer, _ = v.(*Consumer)
- } else {
- consumer = &Consumer{
- Id: req.Sender,
- ReciveOutput: make(chan *proto.PubReq, CHAN_MAX_SIZE),
- }
- channel.Consumeres.Store(req.Sender, consumer)
- }
- var msg *proto.PubReq
- var err error
- Lab:
- for {
- select {
- case msg = <-consumer.ReciveOutput:
- err = server.Send(msg)
- if err != nil {
- break Lab
- }
- }
- }
- channel.Consumeres.Delete(consumer.Id)
- //消息重回队列,直到发送成功为止
- if msg != nil && err != nil {
- log.Println(" 发送失败,尝试重新发送")
- for {
- if err = deliverMsg(msg); err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- }
- }
- return nil
- }
- //消息投递,发送给消息消费者
- func deliverMsg(msg *proto.PubReq) error {
- sendSuccess := false
- channel := getChannel(msg.ChannelId)
- if msg.PublishType == PUBLISH_CHANNEL_ALL {
- channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
- sendSuccess = true
- c := v.(*Consumer)
- c.ReciveOutput <- msg
- return true
- })
- } else if msg.PublishType == PUBLISH_CHANNEL_RANDOM {
- //随机
- keys := make([]string, 0, 0)
- channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
- key, _ := k.(string)
- keys = append(keys, key)
- return true
- })
- //机选1个消费者
- key := keys[rand.Intn(len(keys))]
- if v, ok := channel.Consumeres.Load(key); ok {
- c := v.(*Consumer)
- c.ReciveOutput <- msg
- sendSuccess = true
- }
- } else if msg.PublishType == PUBLISH_CHANNEL_SEQ {
- var consumer *Consumer
- channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
- c := v.(*Consumer)
- if !c.IsUse {
- consumer = c
- return false
- }
- return true
- })
- first := true
- if consumer == nil {
- channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
- c := v.(*Consumer)
- if first {
- c.IsUse = true
- consumer = c
- first = false
- } else {
- c.IsUse = false
- }
- return true
- })
- }
- if consumer != nil {
- consumer.ReciveOutput <- msg
- sendSuccess = true
- }
- }
- if !sendSuccess {
- log.Printf("msg id %s 发送失败,找不到合适的消费者\n", msg.Id)
- }
- return nil
- }
|