connmux.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright 2014 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redisx
  15. import (
  16. "errors"
  17. "sync"
  18. "github.com/gomodule/redigo/redis"
  19. )
  20. // ConnMux multiplexes one or more connections to a single underlying
  21. // connection. The ConnMux connections do not support concurrency, commands
  22. // that associate server side state with the connection or commands that put
  23. // the connection in a special mode.
  24. type ConnMux struct {
  25. c redis.Conn
  26. sendMu sync.Mutex
  27. sendID uint
  28. recvMu sync.Mutex
  29. recvID uint
  30. recvWait map[uint]chan struct{}
  31. }
  32. func NewConnMux(c redis.Conn) *ConnMux {
  33. return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})}
  34. }
  35. // Get gets a connection. The application must close the returned connection.
  36. func (p *ConnMux) Get() redis.Conn {
  37. c := &muxConn{p: p}
  38. c.ids = c.buf[:0]
  39. return c
  40. }
  41. // Close closes the underlying connection.
  42. func (p *ConnMux) Close() error {
  43. return p.c.Close()
  44. }
  45. type muxConn struct {
  46. p *ConnMux
  47. ids []uint
  48. buf [8]uint
  49. }
  50. func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error {
  51. if lookupCommandInfo(cmd).notMuxable {
  52. return errors.New("command not supported by mux pool")
  53. }
  54. p := c.p
  55. p.sendMu.Lock()
  56. id := p.sendID
  57. c.ids = append(c.ids, id)
  58. p.sendID++
  59. err := p.c.Send(cmd, args...)
  60. if flush {
  61. err = p.c.Flush()
  62. }
  63. p.sendMu.Unlock()
  64. return err
  65. }
  66. func (c *muxConn) Send(cmd string, args ...interface{}) error {
  67. return c.send(false, cmd, args...)
  68. }
  69. func (c *muxConn) Flush() error {
  70. p := c.p
  71. p.sendMu.Lock()
  72. err := p.c.Flush()
  73. p.sendMu.Unlock()
  74. return err
  75. }
  76. func (c *muxConn) Receive() (interface{}, error) {
  77. if len(c.ids) == 0 {
  78. return nil, errors.New("mux pool underflow")
  79. }
  80. id := c.ids[0]
  81. c.ids = c.ids[1:]
  82. if len(c.ids) == 0 {
  83. c.ids = c.buf[:0]
  84. }
  85. p := c.p
  86. p.recvMu.Lock()
  87. if p.recvID != id {
  88. ch := make(chan struct{})
  89. p.recvWait[id] = ch
  90. p.recvMu.Unlock()
  91. <-ch
  92. p.recvMu.Lock()
  93. if p.recvID != id {
  94. panic("out of sync")
  95. }
  96. }
  97. v, err := p.c.Receive()
  98. id++
  99. p.recvID = id
  100. ch, ok := p.recvWait[id]
  101. if ok {
  102. delete(p.recvWait, id)
  103. }
  104. p.recvMu.Unlock()
  105. if ok {
  106. ch <- struct{}{}
  107. }
  108. return v, err
  109. }
  110. func (c *muxConn) Close() error {
  111. var err error
  112. if len(c.ids) == 0 {
  113. return nil
  114. }
  115. c.Flush()
  116. for _ = range c.ids {
  117. _, err = c.Receive()
  118. }
  119. return err
  120. }
  121. func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) {
  122. if err := c.send(true, cmd, args...); err != nil {
  123. return nil, err
  124. }
  125. return c.Receive()
  126. }
  127. func (c *muxConn) Err() error {
  128. return c.p.c.Err()
  129. }