tcpclient.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package mfw
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. //
  10. const (
  11. statusOnline = iota
  12. statusOffline = iota
  13. statusReconnecting = iota
  14. ErrMaxRetries Error = 0x01
  15. )
  16. //
  17. type Error int
  18. //
  19. type TCPClient struct {
  20. *net.TCPConn
  21. MyId string
  22. lock sync.RWMutex
  23. status int32
  24. maxRetries int
  25. retryInterval time.Duration
  26. OnConnectSuccess func() //重连成功后的回调
  27. OnRequestConnect func() //请求重连
  28. }
  29. //
  30. func (e Error) Error() string {
  31. switch e {
  32. case ErrMaxRetries:
  33. return "超过最大尝试次数"
  34. default:
  35. return "未知错误"
  36. }
  37. }
  38. //
  39. func Dial(network, addr string) (*TCPClient, error) {
  40. raddr, err := net.ResolveTCPAddr(network, addr)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return DialTCP(network, nil, raddr)
  45. }
  46. //
  47. func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPClient, error) {
  48. conn, err := net.DialTCP(network, laddr, raddr)
  49. if err != nil {
  50. return nil, err
  51. }
  52. return &TCPClient{
  53. TCPConn: conn,
  54. lock: sync.RWMutex{},
  55. status: 0,
  56. maxRetries: 10,
  57. retryInterval: 10 * time.Millisecond,
  58. }, nil
  59. }
  60. //建议用这个
  61. func DialTCPTimeout(network, raddr string, timeout time.Duration) (*TCPClient, error) {
  62. conn, err := net.DialTimeout(network, raddr, timeout)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &TCPClient{
  67. TCPConn: conn.(*net.TCPConn),
  68. lock: sync.RWMutex{},
  69. status: 0,
  70. maxRetries: 10,
  71. retryInterval: 10 * time.Millisecond,
  72. }, nil
  73. }
  74. //设置最大尝试次数
  75. func (c *TCPClient) SetMaxRetries(maxRetries int) {
  76. c.lock.Lock()
  77. defer c.lock.Unlock()
  78. c.maxRetries = maxRetries
  79. }
  80. //
  81. func (c *TCPClient) GetMaxRetries() int {
  82. c.lock.RLock()
  83. defer c.lock.RUnlock()
  84. return c.maxRetries
  85. }
  86. //重连间隔时间
  87. func (c *TCPClient) SetRetryInterval(retryInterval time.Duration) {
  88. c.lock.Lock()
  89. defer c.lock.Unlock()
  90. c.retryInterval = retryInterval
  91. }
  92. //
  93. func (c *TCPClient) GetRetryInterval() time.Duration {
  94. c.lock.RLock()
  95. defer c.lock.RUnlock()
  96. return c.retryInterval
  97. }
  98. //
  99. func (c *TCPClient) reconnect() error {
  100. if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) {
  101. return nil
  102. }
  103. //
  104. if c.OnRequestConnect != nil {
  105. c.OnRequestConnect()
  106. }
  107. raddr := c.TCPConn.RemoteAddr()
  108. conn, err := net.DialTCP(raddr.Network(), nil, raddr.(*net.TCPAddr))
  109. if err != nil {
  110. defer atomic.StoreInt32(&c.status, statusOffline)
  111. switch err.(type) {
  112. case *net.OpError:
  113. return err
  114. default:
  115. return err
  116. }
  117. }
  118. c.TCPConn.Close()
  119. c.TCPConn = conn
  120. atomic.StoreInt32(&c.status, statusOnline)
  121. if c.OnConnectSuccess != nil {
  122. c.OnConnectSuccess() //执行后续操作
  123. }
  124. return nil
  125. }
  126. //读操作
  127. func (c *TCPClient) Read(b []byte) (int, error) {
  128. c.lock.RLock()
  129. defer c.lock.RUnlock()
  130. for i := 0; i < c.maxRetries; i++ {
  131. if atomic.LoadInt32(&c.status) == statusOnline {
  132. n, err := c.TCPConn.Read(b)
  133. if err == nil {
  134. return n, err
  135. }
  136. switch err.(type) {
  137. case *net.OpError:
  138. atomic.StoreInt32(&c.status, statusOffline)
  139. default:
  140. if err.Error() == "EOF" {
  141. atomic.StoreInt32(&c.status, statusOffline)
  142. } else {
  143. return n, err
  144. }
  145. }
  146. } else if atomic.LoadInt32(&c.status) == statusOffline {
  147. if err := c.reconnect(); err != nil {
  148. return -1, err
  149. }
  150. }
  151. if i < (c.maxRetries - 1) {
  152. time.Sleep(c.retryInterval)
  153. }
  154. }
  155. return -1, ErrMaxRetries
  156. }
  157. //
  158. func (c *TCPClient) ReadFrom(r io.Reader) (int64, error) {
  159. c.lock.RLock()
  160. defer c.lock.RUnlock()
  161. for i := 0; i < c.maxRetries; i++ {
  162. if atomic.LoadInt32(&c.status) == statusOnline {
  163. n, err := c.TCPConn.ReadFrom(r)
  164. if err == nil {
  165. return n, err
  166. }
  167. switch err.(type) {
  168. case *net.OpError:
  169. atomic.StoreInt32(&c.status, statusOffline)
  170. default:
  171. if err.Error() == "EOF" {
  172. atomic.StoreInt32(&c.status, statusOffline)
  173. } else {
  174. return n, err
  175. }
  176. }
  177. } else if atomic.LoadInt32(&c.status) == statusOffline {
  178. if err := c.reconnect(); err != nil {
  179. return -1, err
  180. }
  181. }
  182. if i < (c.maxRetries - 1) {
  183. time.Sleep(c.retryInterval)
  184. }
  185. }
  186. return -1, ErrMaxRetries
  187. }
  188. //
  189. func (c *TCPClient) Write(b []byte) (int, error) {
  190. c.lock.RLock()
  191. defer c.lock.RUnlock()
  192. for i := 0; i < c.maxRetries; i++ {
  193. if atomic.LoadInt32(&c.status) == statusOnline {
  194. for c.MyId == "" {
  195. time.Sleep(1 * time.Second)
  196. }
  197. if (c.MyId != "" && string(b[4:12]) != c.MyId) || i == 1 { //重连过了,老数据要切换来源(仅修一次)
  198. tmp := append(b[:4])
  199. tmp = append(tmp, []byte(c.MyId)...)
  200. tmp = append(tmp, b[12:]...)
  201. b = append(tmp)
  202. }
  203. n, err := c.TCPConn.Write(b)
  204. if err == nil {
  205. return n, err
  206. }
  207. switch err.(type) {
  208. case *net.OpError:
  209. atomic.StoreInt32(&c.status, statusOffline)
  210. default:
  211. return n, err
  212. }
  213. } else if atomic.LoadInt32(&c.status) == statusOffline {
  214. if err := c.reconnect(); err != nil {
  215. return -1, err
  216. }
  217. }
  218. if i < (c.maxRetries - 1) {
  219. time.Sleep(c.retryInterval)
  220. }
  221. }
  222. return -1, ErrMaxRetries
  223. }