Browse Source

封装自动重连客户端

Tao Zhang 6 năm trước cách đây
mục cha
commit
a252d1a65f

BIN
bin/service


+ 168 - 0
src/mfw/util/autorcclient.go

@@ -0,0 +1,168 @@
+/**
+ *支持自动连接的socket客户端,
+ *支持tls
+ */
+package util
+
+import (
+	"crypto/tls"
+	"errors"
+	"io"
+	"net"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+//
+const (
+	statusOnline = iota
+	statusOffline
+	statusReconnecting
+)
+
+//
+type AutoRCClient struct {
+	io.ReadWriteCloser
+	lock          sync.RWMutex
+	status        int32
+	maxRetries    int           //重连允许次数
+	retryInterval time.Duration //重连间隔时间
+	network       string        //网络类型
+	serveraddr    string        //地址
+	onconnectedfn func()        //连接成功后的通知函数
+	tlsconfig     *tls.Config
+	dialtimeout   time.Duration //
+}
+
+//
+func NewAutoRCClient(network, serveraddr string,
+	config *tls.Config, maxretries int,
+	retryinterval time.Duration,
+	dialtimeout time.Duration,
+	connectedcallback func()) *AutoRCClient {
+	//
+	return &AutoRCClient{
+		network:       network,
+		serveraddr:    serveraddr,
+		lock:          sync.RWMutex{},
+		maxRetries:    maxretries,
+		status:        statusOffline,
+		dialtimeout:   dialtimeout,
+		retryInterval: retryinterval,
+		onconnectedfn: connectedcallback,
+		tlsconfig:     config,
+	}
+}
+
+//连接
+func (t *AutoRCClient) Dial() error {
+	return t.DialWithTimeout(t.dialtimeout)
+}
+
+//
+func (t *AutoRCClient) DialWithTimeout(d time.Duration) error {
+	if conn, err := t.dial4RCClient(d); err == nil {
+		t.ReadWriteCloser = conn
+		t.status = statusOnline
+		return nil
+	} else {
+		return err
+	}
+}
+
+//
+func (t *AutoRCClient) dial4RCClient(d time.Duration) (io.ReadWriteCloser, error) {
+	var err error
+	var conn io.ReadWriteCloser
+	if t.tlsconfig == nil {
+		conn, err = net.DialTimeout(t.network, t.serveraddr, d)
+	} else {
+		conn, err = tls.DialWithDialer(&net.Dialer{
+			Timeout: d,
+		}, t.network, t.serveraddr, t.tlsconfig)
+	}
+	if err != nil {
+		return nil, err
+	}
+	return conn, nil
+}
+
+//
+func (c *AutoRCClient) reConnect() error {
+	if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) {
+		return nil
+	}
+	conn, err := c.dial4RCClient(c.dialtimeout)
+	if err != nil {
+		atomic.StoreInt32(&c.status, statusOffline)
+		return err
+	}
+	c.ReadWriteCloser.Close() //关闭以前的连接
+	c.ReadWriteCloser = conn
+	atomic.StoreInt32(&c.status, statusOnline)
+	if c.onconnectedfn != nil {
+		c.onconnectedfn() //执行后续操作
+	}
+	return nil
+}
+
+//读操作
+func (c *AutoRCClient) Read(b []byte) (int, error) {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+	for i := 0; i < c.maxRetries; i++ {
+		if atomic.LoadInt32(&c.status) == statusOnline {
+			n, err := c.ReadWriteCloser.Read(b)
+			if err == nil {
+				return n, err
+			}
+			switch err.(type) {
+			case *net.OpError:
+				atomic.StoreInt32(&c.status, statusOffline)
+			default:
+				if err.Error() == "EOF" {
+					atomic.StoreInt32(&c.status, statusOffline)
+				} else {
+					return n, err
+				}
+			}
+		} else if atomic.LoadInt32(&c.status) == statusOffline {
+			if err := c.reConnect(); err != nil {
+				return -1, err
+			}
+		}
+		if i < (c.maxRetries - 1) {
+			time.Sleep(c.retryInterval)
+		}
+	}
+	return -1, errors.New("超过了最大尝试连接次数")
+}
+
+//
+func (c *AutoRCClient) Write(b []byte) (int, error) {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+	for i := 0; i < c.maxRetries; i++ {
+		if atomic.LoadInt32(&c.status) == statusOnline {
+			n, err := c.ReadWriteCloser.Write(b)
+			if err == nil {
+				return n, err
+			}
+			switch err.(type) {
+			case *net.OpError:
+				atomic.StoreInt32(&c.status, statusOffline)
+			default:
+				return n, err
+			}
+		} else if atomic.LoadInt32(&c.status) == statusOffline {
+			if err := c.reConnect(); err != nil {
+				return -1, err
+			}
+		}
+		if i < (c.maxRetries - 1) {
+			time.Sleep(c.retryInterval)
+		}
+	}
+	return -1, errors.New("超过了最大尝试连接次数")
+}

+ 9 - 16
src/mfw/util/client.go

@@ -12,17 +12,9 @@ import (
 	"time"
 )
 
-//
-type CustomConn interface {
-	io.Reader
-	io.Writer
-	io.Closer
-	Dial() error
-}
-
 //封装,可以链接多个消息总线
 type Client struct {
-	conn                 CustomConn
+	conn                 *AutoRCClient
 	myid                 string //
 	businessEventHandler func(*Packet)
 	canHandleEvent       []int
@@ -85,23 +77,24 @@ func NewClient(conf *ClientConfig) (*Client, error) {
 			"myid":    client.myid,
 		})
 	}
+	var tlsconfig *tls.Config
 	//使用tls
 	if conf.UseTls {
 		cert, err := tls.LoadX509KeyPair(conf.TlsCertFile, conf.TlsKeyFile)
 		if err != nil {
 			Log.Fail.Fatalln(err)
 		}
-		config := &tls.Config{
+		tlsconfig = &tls.Config{
 			InsecureSkipVerify: true,
 			Certificates:       []tls.Certificate{cert},
 		}
-		client.conn = NewTlsClient("tcp4", conf.MsgServerAddr,
-			math.MaxInt32, 2*time.Second, config, connectedcallbackfn)
-
-	} else {
-		client.conn = NewTCPClient("tcp4", conf.MsgServerAddr,
-			math.MaxInt32, 2*time.Second, connectedcallbackfn)
 	}
+	//
+	client.conn = NewAutoRCClient("tcp4", conf.MsgServerAddr,
+		tlsconfig,
+		math.MaxInt32, 2*time.Second, 10*time.Second,
+		connectedcallbackfn)
+
 	for {
 		if err := client.conn.Dial(); err != nil {
 			Log.Debug.Println("尝试等待连接到消息总线")

+ 0 - 156
src/mfw/util/tcpclient.go

@@ -1,156 +0,0 @@
-package util
-
-import (
-	"net"
-	"sync"
-	"sync/atomic"
-	"time"
-)
-
-//
-const (
-	statusOnline             = iota
-	statusOffline            = iota
-	statusReconnecting       = iota
-	ErrMaxRetries      Error = 0x01
-)
-
-//
-type Error int
-
-//
-type TCPClient struct {
-	*net.TCPConn
-	lock          sync.RWMutex
-	status        int32
-	maxRetries    int
-	retryInterval time.Duration
-	network       string
-	serveraddr    string
-	onconnectedfn func()
-}
-
-//
-func NewTCPClient(network, serveraddr string, maxretries int, retryinterval time.Duration, connectedcallback func()) *TCPClient {
-	return &TCPClient{
-		network:       network,
-		serveraddr:    serveraddr,
-		lock:          sync.RWMutex{},
-		maxRetries:    maxretries,
-		status:        statusOffline,
-		retryInterval: retryinterval,
-		onconnectedfn: connectedcallback,
-	}
-}
-
-//
-func (e Error) Error() string {
-	switch e {
-	case ErrMaxRetries:
-		return "超过最大尝试次数"
-	default:
-		return "未知错误"
-	}
-}
-
-//
-func (t *TCPClient) Dial() error {
-	raddr, err := net.ResolveTCPAddr(t.network, t.serveraddr)
-	if err != nil {
-		return err
-	}
-	conn, err := net.DialTCP(t.network, nil, raddr)
-	if err != nil {
-		return err
-	}
-	t.TCPConn = conn
-	t.status = statusOnline
-	return nil
-}
-
-//
-func (c *TCPClient) reconnect() error {
-	if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) {
-		return nil
-	}
-
-	raddr := c.TCPConn.RemoteAddr()
-	conn, err := net.DialTCP(raddr.Network(), nil, raddr.(*net.TCPAddr))
-	if err != nil {
-		defer atomic.StoreInt32(&c.status, statusOffline)
-		switch err.(type) {
-		case *net.OpError:
-			return err
-		default:
-			return err
-		}
-	}
-
-	c.TCPConn.Close()
-	c.TCPConn = conn //tlscon.(*net.TCPConn)
-	atomic.StoreInt32(&c.status, statusOnline)
-	if c.onconnectedfn != nil {
-		c.onconnectedfn() //执行后续操作
-	}
-	return nil
-}
-
-//读操作
-func (c *TCPClient) Read(b []byte) (int, error) {
-	c.lock.RLock()
-	defer c.lock.RUnlock()
-	for i := 0; i < c.maxRetries; i++ {
-		if atomic.LoadInt32(&c.status) == statusOnline {
-			n, err := c.TCPConn.Read(b)
-			if err == nil {
-				return n, err
-			}
-			switch err.(type) {
-			case *net.OpError:
-				atomic.StoreInt32(&c.status, statusOffline)
-			default:
-				if err.Error() == "EOF" {
-					atomic.StoreInt32(&c.status, statusOffline)
-				} else {
-					return n, err
-				}
-			}
-		} else if atomic.LoadInt32(&c.status) == statusOffline {
-			if err := c.reconnect(); err != nil {
-				return -1, err
-			}
-		}
-		if i < (c.maxRetries - 1) {
-			time.Sleep(c.retryInterval)
-		}
-	}
-	return -1, ErrMaxRetries
-}
-
-//
-func (c *TCPClient) Write(b []byte) (int, error) {
-	c.lock.RLock()
-	defer c.lock.RUnlock()
-	for i := 0; i < c.maxRetries; i++ {
-		if atomic.LoadInt32(&c.status) == statusOnline {
-			n, err := c.TCPConn.Write(b)
-			if err == nil {
-				return n, err
-			}
-			switch err.(type) {
-			case *net.OpError:
-				atomic.StoreInt32(&c.status, statusOffline)
-			default:
-				return n, err
-			}
-		} else if atomic.LoadInt32(&c.status) == statusOffline {
-			if err := c.reconnect(); err != nil {
-				return -1, err
-			}
-		}
-		if i < (c.maxRetries - 1) {
-			time.Sleep(c.retryInterval)
-		}
-	}
-	return -1, ErrMaxRetries
-}

+ 0 - 135
src/mfw/util/tlsclient.go

@@ -1,135 +0,0 @@
-/**
-支持tls的自动重连tcp连接,作为对tcpclient的补充
-*/
-package util
-
-import (
-	"crypto/tls"
-	"net"
-	"sync"
-	"sync/atomic"
-	"time"
-)
-
-//
-type TLSClient struct {
-	*tls.Conn
-	lock          sync.RWMutex
-	status        int32
-	maxRetries    int
-	retryInterval time.Duration
-	onconnectedfn func()
-	tlsconfig     *tls.Config
-	network       string
-	serveraddr    string
-}
-
-func NewTlsClient(network, serveraddr string, maxretries int, retryinterval time.Duration, tlsconfig *tls.Config, onconnectedfn func()) *TLSClient {
-	return &TLSClient{
-		network:       network,
-		serveraddr:    serveraddr,
-		lock:          sync.RWMutex{},
-		maxRetries:    maxretries,
-		status:        statusOffline,
-		retryInterval: retryinterval,
-		tlsconfig:     tlsconfig,
-		onconnectedfn: onconnectedfn,
-	}
-}
-
-//连接
-func (t *TLSClient) Dial() error {
-	conn, err := tls.Dial(t.network, t.serveraddr, t.tlsconfig)
-	if err != nil {
-		return err
-	}
-	t.Conn = conn
-	t.status = statusOnline
-	return nil
-}
-
-//
-func (c *TLSClient) reconnect() error {
-	if !atomic.CompareAndSwapInt32(&c.status, statusOffline, statusReconnecting) {
-		return nil
-	}
-
-	conn, err := tls.Dial(c.network, c.serveraddr, c.tlsconfig)
-	if err != nil {
-		defer atomic.StoreInt32(&c.status, statusOffline)
-		switch err.(type) {
-		case *net.OpError:
-			return err
-		default:
-			return err
-		}
-	}
-
-	c.Conn.Close()
-	c.Conn = conn
-	atomic.StoreInt32(&c.status, statusOnline)
-	if c.onconnectedfn != nil {
-		c.onconnectedfn() //执行后续操作
-	}
-	return nil
-}
-
-//读操作
-func (c *TLSClient) Read(b []byte) (int, error) {
-	c.lock.RLock()
-	defer c.lock.RUnlock()
-	for i := 0; i < c.maxRetries; i++ {
-		if atomic.LoadInt32(&c.status) == statusOnline {
-			n, err := c.Conn.Read(b)
-			if err == nil {
-				return n, err
-			}
-			switch err.(type) {
-			case *net.OpError:
-				atomic.StoreInt32(&c.status, statusOffline)
-			default:
-				if err.Error() == "EOF" {
-					atomic.StoreInt32(&c.status, statusOffline)
-				} else {
-					return n, err
-				}
-			}
-		} else if atomic.LoadInt32(&c.status) == statusOffline {
-			if err := c.reconnect(); err != nil {
-				return -1, err
-			}
-		}
-		if i < (c.maxRetries - 1) {
-			time.Sleep(c.retryInterval)
-		}
-	}
-	return -1, ErrMaxRetries
-}
-
-//
-func (c *TLSClient) Write(b []byte) (int, error) {
-	c.lock.RLock()
-	defer c.lock.RUnlock()
-	for i := 0; i < c.maxRetries; i++ {
-		if atomic.LoadInt32(&c.status) == statusOnline {
-			n, err := c.Conn.Write(b)
-			if err == nil {
-				return n, err
-			}
-			switch err.(type) {
-			case *net.OpError:
-				atomic.StoreInt32(&c.status, statusOffline)
-			default:
-				return n, err
-			}
-		} else if atomic.LoadInt32(&c.status) == statusOffline {
-			if err := c.reconnect(); err != nil {
-				return -1, err
-			}
-		}
-		if i < (c.maxRetries - 1) {
-			time.Sleep(c.retryInterval)
-		}
-	}
-	return -1, ErrMaxRetries
-}

+ 1 - 1
src/发布服务样例/main.go

@@ -32,7 +32,7 @@ func main() {
 	tlscert := flag.String("cert", "", "cert文件")
 	tlskey := flag.String("key", "", "key文件")
 	flag.Parse()
-	util.Log.SetParam(util.LOG_LEVEL_INFO, true, "./logs", "pushservice")
+	util.Log.SetParam(util.LOG_LEVEL_INFO, false, "./logs", "pushservice")
 	var err error
 	client, err = util.NewClient(&util.ClientConfig{
 		MsgServerAddr:   *addr,