123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- package main
- import (
- "bytes"
- "database/sql"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2"
- "log"
- "reflect"
- "strings"
- "time"
- )
- type Ck struct {
- Address string //数据库地址:端口
- UserName string //用户名
- PassWord string //密码
- DBName string //数据库名
- DB *sql.DB //数据库连接池对象
- MaxOpenConns int //用于设置最大打开的连接数,默认值为0表示不限制。
- MaxIdleConns int //用于设置闲置的连接数。
- }
- func (c *Ck) Init() {
- if c.MaxOpenConns <= 0 {
- c.MaxOpenConns = 20
- }
- if c.MaxIdleConns <= 0 {
- c.MaxIdleConns = 20
- }
- c.DB = clickhouse.OpenDB(&clickhouse.Options{
- Addr: strings.Split(c.Address, ","),
- Auth: clickhouse.Auth{
- Database: c.DBName,
- Username: c.UserName,
- Password: c.PassWord,
- },
- //TLS: &tls.Config{
- // InsecureSkipVerify: true,
- //},
- //Settings: clickhouse.Settings{
- // "max_execution_time": 60,
- //},
- DialTimeout: time.Second * 30,
- //Compression: &clickhouse.Compression{
- // Method: clickhouse.CompressionLZ4,
- //},
- //Debug: true,
- BlockBufferSize: 10,
- MaxCompressionBuffer: 10240,
- //ClientInfo: clickhouse.ClientInfo{
- // Products: []struct {
- // Name string
- // Version string
- // }{
- // {Name: "my-app", Version: "0.1"},
- // },
- //},
- })
- c.DB.SetMaxIdleConns(c.MaxIdleConns)
- c.DB.SetMaxOpenConns(c.MaxOpenConns)
- c.DB.SetConnMaxLifetime(time.Hour)
- }
- func (m *Ck) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {
- list := m.Find(tableName, query, fields, order, 0, 1)
- if list != nil && len(*list) == 1 {
- temp := (*list)[0]
- return &temp
- }
- return nil
- }
- func (m *Ck) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} {
- fs := []string{}
- vs := []interface{}{}
- for k, v := range query {
- rt := reflect.TypeOf(v)
- rv := reflect.ValueOf(v)
- if rt.Kind() == reflect.Map {
- for _, rv_k := range rv.MapKeys() {
- if rv_k.String() == "ne" {
- fs = append(fs, fmt.Sprintf("%s!=?", k))
- vs = append(vs, rv.MapIndex(rv_k).Interface())
- }
- if rv_k.String() == "notin" {
- if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
- for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
- fs = append(fs, fmt.Sprintf("%s!=?", k))
- vs = append(vs, v)
- }
- }
- }
- if rv_k.String() == "in" {
- if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
- _fs := fmt.Sprintf("%s in (?", k)
- for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
- if k > 0 {
- _fs += ",?"
- }
- vs = append(vs, v)
- }
- _fs += ")"
- fs = append(fs, _fs)
- }
- }
- }
- } else {
- if v == "$isNull" {
- fs = append(fs, fmt.Sprintf("%s is null", k))
- } else if v == "$isNotNull" {
- fs = append(fs, fmt.Sprintf("%s is not null", k))
- } else {
- fs = append(fs, fmt.Sprintf("%s=?", k))
- vs = append(vs, v)
- }
- }
- }
- var buffer bytes.Buffer
- buffer.WriteString("select ")
- if fields == "" {
- buffer.WriteString("*")
- } else {
- buffer.WriteString(fields)
- }
- buffer.WriteString(" from ")
- buffer.WriteString(tableName)
- if len(fs) > 0 {
- buffer.WriteString(" where ")
- buffer.WriteString(strings.Join(fs, " and "))
- }
- if order != "" {
- buffer.WriteString(" order by ")
- buffer.WriteString(order)
- }
- if start > -1 && pageSize > 0 {
- buffer.WriteString(" limit ")
- buffer.WriteString(fmt.Sprint(start))
- buffer.WriteString(",")
- buffer.WriteString(fmt.Sprint(pageSize))
- }
- q := buffer.String()
- //log.Println(q, vs)
- return m.SelectBySql(q, vs...)
- }
- func (m *Ck) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} {
- return m.SelectBySqlByTx(nil, q, args...)
- }
- func (m *Ck) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
- return m.Select(0, nil, tx, q, args...)
- }
- func (m *Ck) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
- var err error
- rows, err := m.DB.Query(q, args...)
- if err != nil {
- log.Println(err)
- return nil
- }
- if rows != nil {
- defer rows.Close()
- }
- columns, err := rows.Columns()
- if err != nil {
- log.Println(err)
- return nil
- }
- list := []map[string]interface{}{}
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k, _ := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Println(err)
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- list = append(list, ret)
- if bath > 0 && len(list) == bath {
- f(&list)
- list = []map[string]interface{}{}
- }
- }
- if bath > 0 && len(list) > 0 {
- f(&list)
- list = []map[string]interface{}{}
- }
- return &list
- }
|