publisher.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package util
  2. import (
  3. "app.yhyue.com/BP/queued/proto"
  4. "context"
  5. "google.golang.org/grpc"
  6. "log"
  7. "time"
  8. )
  9. /*
  10. 任务发布封装
  11. */
  12. type Publisher struct {
  13. Id string
  14. QueueServer string
  15. Channel string
  16. C <-chan *proto.PubReq
  17. }
  18. //
  19. func NewPublisher(server string, channel string, ch <-chan *proto.PubReq) *Publisher {
  20. uuid, _ := makeUuid()
  21. return &Publisher{
  22. Id: uuid,
  23. C: ch,
  24. QueueServer: server,
  25. Channel: channel,
  26. }
  27. }
  28. //
  29. func (pb *Publisher) Run() {
  30. for {
  31. conn, err := grpc.Dial(pb.QueueServer, grpc.WithInsecure())
  32. if err != nil {
  33. time.Sleep(1 * time.Second)
  34. log.Fatalf("did not connect: %v", err)
  35. }
  36. c := proto.NewQueueServiceClient(conn)
  37. stream, _ := c.Publish(context.Background())
  38. //
  39. Lab:
  40. for {
  41. select {
  42. case msg := <-pb.C:
  43. uuid, _ := makeUuid()
  44. msg.Sender = pb.Id
  45. msg.Id = uuid
  46. msg.ChannelId = pb.Channel
  47. err := stream.Send(msg)
  48. if err != nil {
  49. break Lab
  50. }
  51. }
  52. }
  53. _ = conn.Close()
  54. time.Sleep(3 * time.Second)
  55. }
  56. }