|
@@ -7,6 +7,7 @@ import (
|
|
"log"
|
|
"log"
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "time"
|
|
)
|
|
)
|
|
|
|
|
|
func InitMgo() {
|
|
func InitMgo() {
|
|
@@ -33,24 +34,27 @@ type NebulaClient struct {
|
|
mu sync.Mutex
|
|
mu sync.Mutex
|
|
}
|
|
}
|
|
|
|
|
|
-// NewNebulaClient 初始化客户端-NebulaGraph
|
|
|
|
|
|
+// NewNebulaClient 初始化 NebulaGraph 客户端
|
|
func NewNebulaClient(hosts []nebula.HostAddress, username, password string) (*NebulaClient, error) {
|
|
func NewNebulaClient(hosts []nebula.HostAddress, username, password string) (*NebulaClient, error) {
|
|
client := &NebulaClient{
|
|
client := &NebulaClient{
|
|
hosts: hosts,
|
|
hosts: hosts,
|
|
username: username,
|
|
username: username,
|
|
password: password,
|
|
password: password,
|
|
}
|
|
}
|
|
- err := client.connect()
|
|
|
|
- if err != nil {
|
|
|
|
|
|
+ if err := client.connect(); err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
return client, nil
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// connect 建立连接
|
|
func (c *NebulaClient) connect() error {
|
|
func (c *NebulaClient) connect() error {
|
|
config := nebula.GetDefaultConf()
|
|
config := nebula.GetDefaultConf()
|
|
- config.UseHTTP2 = false
|
|
|
|
- config.HandshakeKey = ""
|
|
|
|
|
|
+ config.TimeOut = time.Minute * 10 // 连接超时
|
|
|
|
+ config.IdleTime = time.Minute * 5 // 空闲 session 存活时间
|
|
|
|
+ config.MaxConnPoolSize = 10 // 最大连接池大小
|
|
|
|
+ config.UseHTTP2 = false // 禁用 HTTP2(如需要)
|
|
|
|
+ config.HandshakeKey = "" // 默认密钥
|
|
|
|
|
|
pool, err := nebula.NewConnectionPool(c.hosts, config, nebula.DefaultLogger{})
|
|
pool, err := nebula.NewConnectionPool(c.hosts, config, nebula.DefaultLogger{})
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -68,53 +72,63 @@ func (c *NebulaClient) connect() error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-// 自动重连逻辑:当返回错误里包含 session 不存在时触发重连
|
|
|
|
|
|
+// ExecuteWithReconnect 执行查询,检测 session 异常自动重连
|
|
func (c *NebulaClient) ExecuteWithReconnect(query string) (*nebula.ResultSet, error) {
|
|
func (c *NebulaClient) ExecuteWithReconnect(query string) (*nebula.ResultSet, error) {
|
|
c.mu.Lock()
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
- // 如果 session 为 nil,则重新连接
|
|
|
|
|
|
+ // session 为 nil 时尝试重新连接
|
|
if c.session == nil {
|
|
if c.session == nil {
|
|
- log.Println("session 为 nil,重连中...")
|
|
|
|
|
|
+ log.Println("session 为 nil,尝试连接...")
|
|
if err := c.connect(); err != nil {
|
|
if err := c.connect(); err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // 执行查询
|
|
resp, err := c.session.Execute(query)
|
|
resp, err := c.session.Execute(query)
|
|
- if err != nil || !resp.IsSucceed() {
|
|
|
|
- // 检查是否是 session 已失效
|
|
|
|
- if strings.Contains(resp.GetErrorMsg(), "Session") ||
|
|
|
|
- (err != nil && strings.Contains(err.Error(), "Session")) {
|
|
|
|
|
|
+ if err != nil || resp == nil || !resp.IsSucceed() {
|
|
|
|
+ errMsg := ""
|
|
|
|
+ if err != nil {
|
|
|
|
+ errMsg = err.Error()
|
|
|
|
+ }
|
|
|
|
+ if resp != nil {
|
|
|
|
+ errMsg = resp.GetErrorMsg()
|
|
|
|
+ }
|
|
|
|
|
|
- log.Println("session 可能失效,正在重连...")
|
|
|
|
|
|
+ if strings.Contains(errMsg, "Session") {
|
|
|
|
+ log.Println("Session 异常,尝试重连...")
|
|
if c.pool != nil {
|
|
if c.pool != nil {
|
|
c.pool.Close()
|
|
c.pool.Close()
|
|
}
|
|
}
|
|
if err := c.connect(); err != nil {
|
|
if err := c.connect(); err != nil {
|
|
return nil, fmt.Errorf("重连失败: %w", err)
|
|
return nil, fmt.Errorf("重连失败: %w", err)
|
|
}
|
|
}
|
|
- // 重试一次查询
|
|
|
|
|
|
+ // 重试一次
|
|
resp, err = c.session.Execute(query)
|
|
resp, err = c.session.Execute(query)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, err
|
|
|
|
|
|
+ return nil, fmt.Errorf("重连后执行失败: %w", err)
|
|
}
|
|
}
|
|
- if !resp.IsSucceed() {
|
|
|
|
|
|
+ if resp == nil || !resp.IsSucceed() {
|
|
return nil, fmt.Errorf("重连后查询失败: %s", resp.GetErrorMsg())
|
|
return nil, fmt.Errorf("重连后查询失败: %s", resp.GetErrorMsg())
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- return nil, fmt.Errorf("查询失败: %s", resp.GetErrorMsg())
|
|
|
|
|
|
+ return resp, nil
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ return nil, fmt.Errorf("查询失败: %s", errMsg)
|
|
}
|
|
}
|
|
|
|
|
|
return resp, nil
|
|
return resp, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// Close 释放资源
|
|
func (c *NebulaClient) Close() {
|
|
func (c *NebulaClient) Close() {
|
|
if c.session != nil {
|
|
if c.session != nil {
|
|
c.session.Release()
|
|
c.session.Release()
|
|
|
|
+ c.session = nil
|
|
}
|
|
}
|
|
if c.pool != nil {
|
|
if c.pool != nil {
|
|
c.pool.Close()
|
|
c.pool.Close()
|
|
|
|
+ c.pool = nil
|
|
}
|
|
}
|
|
}
|
|
}
|