123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package main
- import (
- "fmt"
- nebula "github.com/vesoft-inc/nebula-go/v3"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "strings"
- "sync"
- "time"
- )
- var (
- MgoQy *mongodb.MongodbSim //163.mixdata
- )
- func InitMgo() {
- //181 凭安库
- //Mgo181 = &mongodb.MongodbSim{
- // MongodbAddr: "172.17.4.181:27001",
- // //MongodbAddr: "127.0.0.1:27001",
- // DbName: "mixdata",
- // Size: 10,
- // UserName: "",
- // Password: "",
- // //Direct: true,
- //}
- //Mgo181.InitPool()
- //
- MgoQy = &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoQy.InitPool()
- }
- type NebulaClient struct {
- hosts []nebula.HostAddress
- username string
- password string
- pool *nebula.ConnectionPool
- session *nebula.Session
- mu sync.Mutex
- }
- // NewNebulaClient 初始化 NebulaGraph 客户端
- func NewNebulaClient(hosts []nebula.HostAddress, username, password string) (*NebulaClient, error) {
- client := &NebulaClient{
- hosts: hosts,
- username: username,
- password: password,
- }
- if err := client.connect(); err != nil {
- return nil, err
- }
- return client, nil
- }
- // connect 建立连接
- func (c *NebulaClient) connect() error {
- config := nebula.GetDefaultConf()
- config.TimeOut = time.Minute * 10 // 连接超时
- config.IdleTime = time.Minute * 5 // 空闲 session 存活时间
- config.MaxConnPoolSize = 10 // 最大连接池大小
- config.UseHTTP2 = false // 禁用 HTTP2(如需要)
- config.HandshakeKey = "" // 默认密钥
- pool, err := nebula.NewConnectionPool(c.hosts, config, nebula.DefaultLogger{})
- if err != nil {
- return fmt.Errorf("初始化连接池失败: %w", err)
- }
- session, err := pool.GetSession(c.username, c.password)
- if err != nil {
- pool.Close()
- return fmt.Errorf("获取 session 失败: %w", err)
- }
- c.pool = pool
- c.session = session
- return nil
- }
- // ExecuteWithReconnect 执行查询,检测 session 异常自动重连
- func (c *NebulaClient) ExecuteWithReconnect(query string) (*nebula.ResultSet, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- // session 为 nil 时尝试重新连接
- if c.session == nil {
- log.Println("session 为 nil,尝试连接...")
- if err := c.connect(); err != nil {
- return nil, err
- }
- }
- // 执行查询
- resp, err := c.session.Execute(query)
- if err != nil || resp == nil || !resp.IsSucceed() {
- errMsg := ""
- if err != nil {
- errMsg = err.Error()
- }
- if resp != nil {
- errMsg = resp.GetErrorMsg()
- }
- if strings.Contains(errMsg, "Session") {
- log.Println("Session 异常,尝试重连...")
- if c.pool != nil {
- c.pool.Close()
- }
- if err := c.connect(); err != nil {
- return nil, fmt.Errorf("重连失败: %w", err)
- }
- // 重试一次
- resp, err = c.session.Execute(query)
- if err != nil {
- return nil, fmt.Errorf("重连后执行失败: %w", err)
- }
- if resp == nil || !resp.IsSucceed() {
- return nil, fmt.Errorf("重连后查询失败: %s", resp.GetErrorMsg())
- }
- return resp, nil
- }
- return nil, fmt.Errorf("查询失败: %s", errMsg)
- }
- return resp, nil
- }
- // Close 释放资源
- func (c *NebulaClient) Close() {
- if c.session != nil {
- c.session.Release()
- c.session = nil
- }
- if c.pool != nil {
- c.pool.Close()
- c.pool = nil
- }
- }
|