clickhouse.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package main
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "fmt"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "log"
  8. "reflect"
  9. "strings"
  10. "time"
  11. )
  12. type Ck struct {
  13. Address string //数据库地址:端口
  14. UserName string //用户名
  15. PassWord string //密码
  16. DBName string //数据库名
  17. DB *sql.DB //数据库连接池对象
  18. MaxOpenConns int //用于设置最大打开的连接数,默认值为0表示不限制。
  19. MaxIdleConns int //用于设置闲置的连接数。
  20. }
  21. func (c *Ck) Init() {
  22. if c.MaxOpenConns <= 0 {
  23. c.MaxOpenConns = 20
  24. }
  25. if c.MaxIdleConns <= 0 {
  26. c.MaxIdleConns = 20
  27. }
  28. c.DB = clickhouse.OpenDB(&clickhouse.Options{
  29. Addr: strings.Split(c.Address, ","),
  30. Auth: clickhouse.Auth{
  31. Database: c.DBName,
  32. Username: c.UserName,
  33. Password: c.PassWord,
  34. },
  35. //TLS: &tls.Config{
  36. // InsecureSkipVerify: true,
  37. //},
  38. //Settings: clickhouse.Settings{
  39. // "max_execution_time": 60,
  40. //},
  41. DialTimeout: time.Second * 30,
  42. //Compression: &clickhouse.Compression{
  43. // Method: clickhouse.CompressionLZ4,
  44. //},
  45. //Debug: true,
  46. BlockBufferSize: 10,
  47. MaxCompressionBuffer: 10240,
  48. //ClientInfo: clickhouse.ClientInfo{
  49. // Products: []struct {
  50. // Name string
  51. // Version string
  52. // }{
  53. // {Name: "my-app", Version: "0.1"},
  54. // },
  55. //},
  56. })
  57. c.DB.SetMaxIdleConns(c.MaxIdleConns)
  58. c.DB.SetMaxOpenConns(c.MaxOpenConns)
  59. c.DB.SetConnMaxLifetime(time.Hour)
  60. }
  61. func (m *Ck) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {
  62. list := m.Find(tableName, query, fields, order, 0, 1)
  63. if list != nil && len(*list) == 1 {
  64. temp := (*list)[0]
  65. return &temp
  66. }
  67. return nil
  68. }
  69. func (m *Ck) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} {
  70. fs := []string{}
  71. vs := []interface{}{}
  72. for k, v := range query {
  73. rt := reflect.TypeOf(v)
  74. rv := reflect.ValueOf(v)
  75. if rt.Kind() == reflect.Map {
  76. for _, rv_k := range rv.MapKeys() {
  77. if rv_k.String() == "ne" {
  78. fs = append(fs, fmt.Sprintf("%s!=?", k))
  79. vs = append(vs, rv.MapIndex(rv_k).Interface())
  80. }
  81. if rv_k.String() == "notin" {
  82. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  83. for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  84. fs = append(fs, fmt.Sprintf("%s!=?", k))
  85. vs = append(vs, v)
  86. }
  87. }
  88. }
  89. if rv_k.String() == "in" {
  90. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  91. _fs := fmt.Sprintf("%s in (?", k)
  92. for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  93. if k > 0 {
  94. _fs += ",?"
  95. }
  96. vs = append(vs, v)
  97. }
  98. _fs += ")"
  99. fs = append(fs, _fs)
  100. }
  101. }
  102. }
  103. } else {
  104. if v == "$isNull" {
  105. fs = append(fs, fmt.Sprintf("%s is null", k))
  106. } else if v == "$isNotNull" {
  107. fs = append(fs, fmt.Sprintf("%s is not null", k))
  108. } else {
  109. fs = append(fs, fmt.Sprintf("%s=?", k))
  110. vs = append(vs, v)
  111. }
  112. }
  113. }
  114. var buffer bytes.Buffer
  115. buffer.WriteString("select ")
  116. if fields == "" {
  117. buffer.WriteString("*")
  118. } else {
  119. buffer.WriteString(fields)
  120. }
  121. buffer.WriteString(" from ")
  122. buffer.WriteString(tableName)
  123. if len(fs) > 0 {
  124. buffer.WriteString(" where ")
  125. buffer.WriteString(strings.Join(fs, " and "))
  126. }
  127. if order != "" {
  128. buffer.WriteString(" order by ")
  129. buffer.WriteString(order)
  130. }
  131. if start > -1 && pageSize > 0 {
  132. buffer.WriteString(" limit ")
  133. buffer.WriteString(fmt.Sprint(start))
  134. buffer.WriteString(",")
  135. buffer.WriteString(fmt.Sprint(pageSize))
  136. }
  137. q := buffer.String()
  138. //log.Println(q, vs)
  139. return m.SelectBySql(q, vs...)
  140. }
  141. func (m *Ck) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} {
  142. return m.SelectBySqlByTx(nil, q, args...)
  143. }
  144. func (m *Ck) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  145. return m.Select(0, nil, tx, q, args...)
  146. }
  147. func (m *Ck) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  148. var err error
  149. rows, err := m.DB.Query(q, args...)
  150. if err != nil {
  151. log.Println(err)
  152. return nil
  153. }
  154. if rows != nil {
  155. defer rows.Close()
  156. }
  157. columns, err := rows.Columns()
  158. if err != nil {
  159. log.Println(err)
  160. return nil
  161. }
  162. list := []map[string]interface{}{}
  163. for rows.Next() {
  164. scanArgs := make([]interface{}, len(columns))
  165. values := make([]interface{}, len(columns))
  166. ret := make(map[string]interface{})
  167. for k, _ := range values {
  168. scanArgs[k] = &values[k]
  169. }
  170. err = rows.Scan(scanArgs...)
  171. if err != nil {
  172. log.Println(err)
  173. break
  174. }
  175. for i, col := range values {
  176. if v, ok := col.([]uint8); ok {
  177. ret[columns[i]] = string(v)
  178. } else {
  179. ret[columns[i]] = col
  180. }
  181. }
  182. list = append(list, ret)
  183. if bath > 0 && len(list) == bath {
  184. f(&list)
  185. list = []map[string]interface{}{}
  186. }
  187. }
  188. if bath > 0 && len(list) > 0 {
  189. f(&list)
  190. list = []map[string]interface{}{}
  191. }
  192. return &list
  193. }