init.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package main
  2. import (
  3. "fmt"
  4. nebula "github.com/vesoft-inc/nebula-go/v3"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "log"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. MgoQy *mongodb.MongodbSim //163.mixdata
  13. )
  14. func InitMgo() {
  15. //181 凭安库
  16. Mgo181 = &mongodb.MongodbSim{
  17. //MongodbAddr: "172.17.4.181:27001",
  18. MongodbAddr: "127.0.0.1:27001",
  19. DbName: "mixdata",
  20. Size: 10,
  21. UserName: "",
  22. Password: "",
  23. Direct: true,
  24. }
  25. Mgo181.InitPool()
  26. //
  27. MgoQy = &mongodb.MongodbSim{
  28. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  29. //MongodbAddr: "127.0.0.1:27083",
  30. Size: 10,
  31. DbName: "mixdata",
  32. UserName: "SJZY_RWbid_ES",
  33. Password: "SJZY@B4i4D5e6S",
  34. //Direct: true,
  35. }
  36. MgoQy.InitPool()
  37. }
  38. type NebulaClient struct {
  39. hosts []nebula.HostAddress
  40. username string
  41. password string
  42. pool *nebula.ConnectionPool
  43. session *nebula.Session
  44. mu sync.Mutex
  45. }
  46. // NewNebulaClient 初始化 NebulaGraph 客户端
  47. func NewNebulaClient(hosts []nebula.HostAddress, username, password string) (*NebulaClient, error) {
  48. client := &NebulaClient{
  49. hosts: hosts,
  50. username: username,
  51. password: password,
  52. }
  53. if err := client.connect(); err != nil {
  54. return nil, err
  55. }
  56. return client, nil
  57. }
  58. // connect 建立连接
  59. func (c *NebulaClient) connect() error {
  60. config := nebula.GetDefaultConf()
  61. config.TimeOut = time.Minute * 10 // 连接超时
  62. config.IdleTime = time.Minute * 5 // 空闲 session 存活时间
  63. config.MaxConnPoolSize = 10 // 最大连接池大小
  64. config.UseHTTP2 = false // 禁用 HTTP2(如需要)
  65. config.HandshakeKey = "" // 默认密钥
  66. pool, err := nebula.NewConnectionPool(c.hosts, config, nebula.DefaultLogger{})
  67. if err != nil {
  68. return fmt.Errorf("初始化连接池失败: %w", err)
  69. }
  70. session, err := pool.GetSession(c.username, c.password)
  71. if err != nil {
  72. pool.Close()
  73. return fmt.Errorf("获取 session 失败: %w", err)
  74. }
  75. c.pool = pool
  76. c.session = session
  77. return nil
  78. }
  79. // ExecuteWithReconnect 执行查询,检测 session 异常自动重连
  80. func (c *NebulaClient) ExecuteWithReconnect(query string) (*nebula.ResultSet, error) {
  81. c.mu.Lock()
  82. defer c.mu.Unlock()
  83. // session 为 nil 时尝试重新连接
  84. if c.session == nil {
  85. log.Println("session 为 nil,尝试连接...")
  86. if err := c.connect(); err != nil {
  87. return nil, err
  88. }
  89. }
  90. // 执行查询
  91. resp, err := c.session.Execute(query)
  92. if err != nil || resp == nil || !resp.IsSucceed() {
  93. errMsg := ""
  94. if err != nil {
  95. errMsg = err.Error()
  96. }
  97. if resp != nil {
  98. errMsg = resp.GetErrorMsg()
  99. }
  100. if strings.Contains(errMsg, "Session") {
  101. log.Println("Session 异常,尝试重连...")
  102. if c.pool != nil {
  103. c.pool.Close()
  104. }
  105. if err := c.connect(); err != nil {
  106. return nil, fmt.Errorf("重连失败: %w", err)
  107. }
  108. // 重试一次
  109. resp, err = c.session.Execute(query)
  110. if err != nil {
  111. return nil, fmt.Errorf("重连后执行失败: %w", err)
  112. }
  113. if resp == nil || !resp.IsSucceed() {
  114. return nil, fmt.Errorf("重连后查询失败: %s", resp.GetErrorMsg())
  115. }
  116. return resp, nil
  117. }
  118. return nil, fmt.Errorf("查询失败: %s", errMsg)
  119. }
  120. return resp, nil
  121. }
  122. // Close 释放资源
  123. func (c *NebulaClient) Close() {
  124. if c.session != nil {
  125. c.session.Release()
  126. c.session = nil
  127. }
  128. if c.pool != nil {
  129. c.pool.Close()
  130. c.pool = nil
  131. }
  132. }