package main import ( "fmt" nebula "github.com/vesoft-inc/nebula-go/v3" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" "sync" "time" ) var ( MgoQy *mongodb.MongodbSim //163.mixdata ) func InitMgo() { //181 凭安库 //Mgo181 = &mongodb.MongodbSim{ // MongodbAddr: "172.17.4.181:27001", // //MongodbAddr: "127.0.0.1:27001", // DbName: "mixdata", // Size: 10, // UserName: "", // Password: "", // //Direct: true, //} //Mgo181.InitPool() // MgoQy = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoQy.InitPool() } type NebulaClient struct { hosts []nebula.HostAddress username string password string pool *nebula.ConnectionPool session *nebula.Session mu sync.Mutex } // NewNebulaClient 初始化 NebulaGraph 客户端 func NewNebulaClient(hosts []nebula.HostAddress, username, password string) (*NebulaClient, error) { client := &NebulaClient{ hosts: hosts, username: username, password: password, } if err := client.connect(); err != nil { return nil, err } return client, nil } // connect 建立连接 func (c *NebulaClient) connect() error { config := nebula.GetDefaultConf() config.TimeOut = time.Minute * 10 // 连接超时 config.IdleTime = time.Minute * 5 // 空闲 session 存活时间 config.MaxConnPoolSize = 10 // 最大连接池大小 config.UseHTTP2 = false // 禁用 HTTP2(如需要) config.HandshakeKey = "" // 默认密钥 pool, err := nebula.NewConnectionPool(c.hosts, config, nebula.DefaultLogger{}) if err != nil { return fmt.Errorf("初始化连接池失败: %w", err) } session, err := pool.GetSession(c.username, c.password) if err != nil { pool.Close() return fmt.Errorf("获取 session 失败: %w", err) } c.pool = pool c.session = session return nil } // ExecuteWithReconnect 执行查询,检测 session 异常自动重连 func (c *NebulaClient) ExecuteWithReconnect(query string) (*nebula.ResultSet, error) { c.mu.Lock() defer c.mu.Unlock() // session 为 nil 时尝试重新连接 if c.session == nil { log.Println("session 为 nil,尝试连接...") if err := c.connect(); err != nil { return nil, err } } // 执行查询 resp, err := c.session.Execute(query) if err != nil || resp == nil || !resp.IsSucceed() { errMsg := "" if err != nil { errMsg = err.Error() } if resp != nil { errMsg = resp.GetErrorMsg() } if strings.Contains(errMsg, "Session") { log.Println("Session 异常,尝试重连...") if c.pool != nil { c.pool.Close() } if err := c.connect(); err != nil { return nil, fmt.Errorf("重连失败: %w", err) } // 重试一次 resp, err = c.session.Execute(query) if err != nil { return nil, fmt.Errorf("重连后执行失败: %w", err) } if resp == nil || !resp.IsSucceed() { return nil, fmt.Errorf("重连后查询失败: %s", resp.GetErrorMsg()) } return resp, nil } return nil, fmt.Errorf("查询失败: %s", errMsg) } return resp, nil } // Close 释放资源 func (c *NebulaClient) Close() { if c.session != nil { c.session.Release() c.session = nil } if c.pool != nil { c.pool.Close() c.pool = nil } }