package main import ( "app.yhyue.com/BP/queued/proto" "log" "math/rand" "sync" "time" ) const ( CHAN_MAX_SIZE = 500 PUBLISH_CHANNEL_ALL = iota //发送给本频道下所有人 PUBLISH_CHANNEL_RANDOM //发送给本频道下随机1个人 PUBLISH_CHANNEL_SEQ //发送给本频道下顺序1个人 ) //数据存储结构 var cache *sync.Map //消费者 type Consumer struct { Id string ReciveOutput chan *proto.PubReq Cancel chan bool IsUse bool } //频道 type Channel struct { Id string Consumeres *sync.Map //id:Consumer } // type QueueImpl struct { } //初始化 func init() { cache = new(sync.Map) } // func getChannel(key string) *Channel { if v, ok := cache.Load(key); ok { c := v.(*Channel) return c } else { c := &Channel{ Id: key, Consumeres: new(sync.Map), } cache.Store(key, c) return c } } //发布任务 func (q QueueImpl) Publish(server proto.QueueService_PublishServer) error { //TODO 生成用户ID //消息处理 for { msg, err := server.Recv() if err != nil { break } err = deliverMsg(msg) if err != nil { break } } _ = server.SendAndClose(&proto.PubResp{ Code: 500, Msg: "终止", }) return nil } func (q QueueImpl) Receive(req *proto.RecvReq, server proto.QueueService_ReceiveServer) error { channel := getChannel(req.ChannelId) var consumer *Consumer if v, ok := channel.Consumeres.Load(req.Sender); ok { consumer, _ = v.(*Consumer) } else { consumer = &Consumer{ Id: req.Sender, ReciveOutput: make(chan *proto.PubReq, CHAN_MAX_SIZE), } channel.Consumeres.Store(req.Sender, consumer) } var msg *proto.PubReq var err error Lab: for { select { case msg = <-consumer.ReciveOutput: err = server.Send(msg) if err != nil { break Lab } } } channel.Consumeres.Delete(consumer.Id) //消息重回队列,直到发送成功为止 if msg != nil && err != nil { log.Println(" 发送失败,尝试重新发送") for { if err = deliverMsg(msg); err == nil { break } time.Sleep(1 * time.Second) } } return nil } //消息投递,发送给消息消费者 func deliverMsg(msg *proto.PubReq) error { sendSuccess := false channel := getChannel(msg.ChannelId) if msg.PublishType == PUBLISH_CHANNEL_ALL { channel.Consumeres.Range(func(k interface{}, v interface{}) bool { sendSuccess = true c := v.(*Consumer) c.ReciveOutput <- msg return true }) } else if msg.PublishType == PUBLISH_CHANNEL_RANDOM { //随机 keys := make([]string, 0, 0) channel.Consumeres.Range(func(k interface{}, v interface{}) bool { key, _ := k.(string) keys = append(keys, key) return true }) //机选1个消费者 key := keys[rand.Intn(len(keys))] if v, ok := channel.Consumeres.Load(key); ok { c := v.(*Consumer) c.ReciveOutput <- msg sendSuccess = true } } else if msg.PublishType == PUBLISH_CHANNEL_SEQ { var consumer *Consumer channel.Consumeres.Range(func(k interface{}, v interface{}) bool { c := v.(*Consumer) if !c.IsUse { consumer = c return false } return true }) first := true if consumer == nil { channel.Consumeres.Range(func(k interface{}, v interface{}) bool { c := v.(*Consumer) if first { c.IsUse = true consumer = c first = false } else { c.IsUse = false } return true }) } if consumer != nil { consumer.ReciveOutput <- msg sendSuccess = true } } if !sendSuccess { log.Printf("msg id %s 发送失败,找不到合适的消费者\n", msg.Id) } return nil }