package mfw import ( "io" "net" "sync" "sync/atomic" "time" ) // const ( statusOnline = iota statusOffline = iota statusReconnecting = iota ErrMaxRetries Error = 0x01 ) // type Error int // type TCPClient struct { *net.TCPConn MyId string lock sync.RWMutex status int32 maxRetries int retryInterval time.Duration OnConnectSuccess func() //重连成功后的回调 OnRequestConnect func() //请求重连 } // func (e Error) Error() string { switch e { case ErrMaxRetries: return "超过最大尝试次数" default: return "未知错误" } } // func Dial(network, addr string) (*TCPClient, error) { raddr, err := net.ResolveTCPAddr(network, addr) if err != nil { return nil, err } return DialTCP(network, nil, raddr) } // func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPClient, error) { conn, err := net.DialTCP(network, laddr, raddr) if err != nil { return nil, err } return &TCPClient{ TCPConn: conn, lock: sync.RWMutex{}, status: 0, maxRetries: 10, retryInterval: 10 * time.Millisecond, }, nil } //建议用这个 func DialTCPTimeout(network, raddr string, timeout time.Duration) (*TCPClient, error) { conn, err := net.DialTimeout(network, raddr, timeout) if err != nil { return nil, err } return &TCPClient{ TCPConn: conn.(*net.TCPConn), lock: sync.RWMutex{}, status: 0, maxRetries: 10, retryInterval: 10 * time.Millisecond, }, nil } //设置最大尝试次数 func (c *TCPClient) SetMaxRetries(maxRetries int) { c.lock.Lock() defer c.lock.Unlock() c.maxRetries = maxRetries } // func (c *TCPClient) GetMaxRetries() int { c.lock.RLock() defer c.lock.RUnlock() return c.maxRetries } //重连间隔时间 func (c *TCPClient) SetRetryInterval(retryInterval time.Duration) { c.lock.Lock() defer c.lock.Unlock() c.retryInterval = retryInterval } // func (c *TCPClient) GetRetryInterval() time.Duration { c.lock.RLock() defer c.lock.RUnlock() return c.retryInterval } // func (c *TCPClient) reconnect() error { if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) { return nil } // if c.OnRequestConnect != nil { c.OnRequestConnect() } raddr := c.TCPConn.RemoteAddr() conn, err := net.DialTCP(raddr.Network(), nil, raddr.(*net.TCPAddr)) if err != nil { defer atomic.StoreInt32(&c.status, statusOffline) switch err.(type) { case *net.OpError: return err default: return err } } c.TCPConn.Close() c.TCPConn = conn atomic.StoreInt32(&c.status, statusOnline) if c.OnConnectSuccess != nil { c.OnConnectSuccess() //执行后续操作 } return nil } //读操作 func (c *TCPClient) Read(b []byte) (int, error) { c.lock.RLock() defer c.lock.RUnlock() for i := 0; i < c.maxRetries; i++ { if atomic.LoadInt32(&c.status) == statusOnline { n, err := c.TCPConn.Read(b) if err == nil { return n, err } switch err.(type) { case *net.OpError: atomic.StoreInt32(&c.status, statusOffline) default: if err.Error() == "EOF" { atomic.StoreInt32(&c.status, statusOffline) } else { return n, err } } } else if atomic.LoadInt32(&c.status) == statusOffline { if err := c.reconnect(); err != nil { return -1, err } } if i < (c.maxRetries - 1) { time.Sleep(c.retryInterval) } } return -1, ErrMaxRetries } // func (c *TCPClient) ReadFrom(r io.Reader) (int64, error) { c.lock.RLock() defer c.lock.RUnlock() for i := 0; i < c.maxRetries; i++ { if atomic.LoadInt32(&c.status) == statusOnline { n, err := c.TCPConn.ReadFrom(r) if err == nil { return n, err } switch err.(type) { case *net.OpError: atomic.StoreInt32(&c.status, statusOffline) default: if err.Error() == "EOF" { atomic.StoreInt32(&c.status, statusOffline) } else { return n, err } } } else if atomic.LoadInt32(&c.status) == statusOffline { if err := c.reconnect(); err != nil { return -1, err } } if i < (c.maxRetries - 1) { time.Sleep(c.retryInterval) } } return -1, ErrMaxRetries } // func (c *TCPClient) Write(b []byte) (int, error) { c.lock.RLock() defer c.lock.RUnlock() for i := 0; i < c.maxRetries; i++ { if atomic.LoadInt32(&c.status) == statusOnline { for c.MyId == "" { time.Sleep(1 * time.Second) } if (c.MyId != "" && string(b[4:12]) != c.MyId) || i == 1 { //重连过了,老数据要切换来源(仅修一次) tmp := append(b[:4]) tmp = append(tmp, []byte(c.MyId)...) tmp = append(tmp, b[12:]...) b = append(tmp) } n, err := c.TCPConn.Write(b) if err == nil { return n, err } switch err.(type) { case *net.OpError: atomic.StoreInt32(&c.status, statusOffline) default: return n, err } } else if atomic.LoadInt32(&c.status) == statusOffline { if err := c.reconnect(); err != nil { return -1, err } } if i < (c.maxRetries - 1) { time.Sleep(c.retryInterval) } } return -1, ErrMaxRetries }