123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package mfw
- import (
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- //
- const (
- statusOnline = iota
- statusOffline = iota
- statusReconnecting = iota
- ErrMaxRetries Error = 0x01
- )
- //
- type Error int
- //
- type TCPClient struct {
- *net.TCPConn
- MyId string
- lock sync.RWMutex
- status int32
- maxRetries int
- retryInterval time.Duration
- OnConnectSuccess func() //重连成功后的回调
- OnRequestConnect func() //请求重连
- }
- //
- func (e Error) Error() string {
- switch e {
- case ErrMaxRetries:
- return "超过最大尝试次数"
- default:
- return "未知错误"
- }
- }
- //
- func Dial(network, addr string) (*TCPClient, error) {
- raddr, err := net.ResolveTCPAddr(network, addr)
- if err != nil {
- return nil, err
- }
- return DialTCP(network, nil, raddr)
- }
- //
- func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPClient, error) {
- conn, err := net.DialTCP(network, laddr, raddr)
- if err != nil {
- return nil, err
- }
- return &TCPClient{
- TCPConn: conn,
- lock: sync.RWMutex{},
- status: 0,
- maxRetries: 10,
- retryInterval: 10 * time.Millisecond,
- }, nil
- }
- //建议用这个
- func DialTCPTimeout(network, raddr string, timeout time.Duration) (*TCPClient, error) {
- conn, err := net.DialTimeout(network, raddr, timeout)
- if err != nil {
- return nil, err
- }
- return &TCPClient{
- TCPConn: conn.(*net.TCPConn),
- lock: sync.RWMutex{},
- status: 0,
- maxRetries: 10,
- retryInterval: 10 * time.Millisecond,
- }, nil
- }
- //设置最大尝试次数
- func (c *TCPClient) SetMaxRetries(maxRetries int) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.maxRetries = maxRetries
- }
- //
- func (c *TCPClient) GetMaxRetries() int {
- c.lock.RLock()
- defer c.lock.RUnlock()
- return c.maxRetries
- }
- //重连间隔时间
- func (c *TCPClient) SetRetryInterval(retryInterval time.Duration) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.retryInterval = retryInterval
- }
- //
- func (c *TCPClient) GetRetryInterval() time.Duration {
- c.lock.RLock()
- defer c.lock.RUnlock()
- return c.retryInterval
- }
- //
- func (c *TCPClient) reconnect() error {
- if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) {
- return nil
- }
- //
- if c.OnRequestConnect != nil {
- c.OnRequestConnect()
- }
- raddr := c.TCPConn.RemoteAddr()
- conn, err := net.DialTCP(raddr.Network(), nil, raddr.(*net.TCPAddr))
- if err != nil {
- defer atomic.StoreInt32(&c.status, statusOffline)
- switch err.(type) {
- case *net.OpError:
- return err
- default:
- return err
- }
- }
- c.TCPConn.Close()
- c.TCPConn = conn
- atomic.StoreInt32(&c.status, statusOnline)
- if c.OnConnectSuccess != nil {
- c.OnConnectSuccess() //执行后续操作
- }
- return nil
- }
- //读操作
- func (c *TCPClient) Read(b []byte) (int, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- for i := 0; i < c.maxRetries; i++ {
- if atomic.LoadInt32(&c.status) == statusOnline {
- n, err := c.TCPConn.Read(b)
- if err == nil {
- return n, err
- }
- switch err.(type) {
- case *net.OpError:
- atomic.StoreInt32(&c.status, statusOffline)
- default:
- if err.Error() == "EOF" {
- atomic.StoreInt32(&c.status, statusOffline)
- } else {
- return n, err
- }
- }
- } else if atomic.LoadInt32(&c.status) == statusOffline {
- if err := c.reconnect(); err != nil {
- return -1, err
- }
- }
- if i < (c.maxRetries - 1) {
- time.Sleep(c.retryInterval)
- }
- }
- return -1, ErrMaxRetries
- }
- //
- func (c *TCPClient) ReadFrom(r io.Reader) (int64, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- for i := 0; i < c.maxRetries; i++ {
- if atomic.LoadInt32(&c.status) == statusOnline {
- n, err := c.TCPConn.ReadFrom(r)
- if err == nil {
- return n, err
- }
- switch err.(type) {
- case *net.OpError:
- atomic.StoreInt32(&c.status, statusOffline)
- default:
- if err.Error() == "EOF" {
- atomic.StoreInt32(&c.status, statusOffline)
- } else {
- return n, err
- }
- }
- } else if atomic.LoadInt32(&c.status) == statusOffline {
- if err := c.reconnect(); err != nil {
- return -1, err
- }
- }
- if i < (c.maxRetries - 1) {
- time.Sleep(c.retryInterval)
- }
- }
- return -1, ErrMaxRetries
- }
- //
- func (c *TCPClient) Write(b []byte) (int, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- for i := 0; i < c.maxRetries; i++ {
- if atomic.LoadInt32(&c.status) == statusOnline {
- for c.MyId == "" {
- time.Sleep(1 * time.Second)
- }
- if (c.MyId != "" && string(b[4:12]) != c.MyId) || i == 1 { //重连过了,老数据要切换来源(仅修一次)
- tmp := append(b[:4])
- tmp = append(tmp, []byte(c.MyId)...)
- tmp = append(tmp, b[12:]...)
- b = append(tmp)
- }
- n, err := c.TCPConn.Write(b)
- if err == nil {
- return n, err
- }
- switch err.(type) {
- case *net.OpError:
- atomic.StoreInt32(&c.status, statusOffline)
- default:
- return n, err
- }
- } else if atomic.LoadInt32(&c.status) == statusOffline {
- if err := c.reconnect(); err != nil {
- return -1, err
- }
- }
- if i < (c.maxRetries - 1) {
- time.Sleep(c.retryInterval)
- }
- }
- return -1, ErrMaxRetries
- }
|