package main import ( "bytes" "database/sql" "fmt" "github.com/ClickHouse/clickhouse-go/v2" "log" "reflect" "strings" "time" ) type Ck struct { Address string //数据库地址:端口 UserName string //用户名 PassWord string //密码 DBName string //数据库名 DB *sql.DB //数据库连接池对象 MaxOpenConns int //用于设置最大打开的连接数,默认值为0表示不限制。 MaxIdleConns int //用于设置闲置的连接数。 } func (c *Ck) Init() { if c.MaxOpenConns <= 0 { c.MaxOpenConns = 20 } if c.MaxIdleConns <= 0 { c.MaxIdleConns = 20 } c.DB = clickhouse.OpenDB(&clickhouse.Options{ Addr: strings.Split(c.Address, ","), Auth: clickhouse.Auth{ Database: c.DBName, Username: c.UserName, Password: c.PassWord, }, //TLS: &tls.Config{ // InsecureSkipVerify: true, //}, //Settings: clickhouse.Settings{ // "max_execution_time": 60, //}, DialTimeout: time.Second * 30, //Compression: &clickhouse.Compression{ // Method: clickhouse.CompressionLZ4, //}, //Debug: true, BlockBufferSize: 10, MaxCompressionBuffer: 10240, //ClientInfo: clickhouse.ClientInfo{ // Products: []struct { // Name string // Version string // }{ // {Name: "my-app", Version: "0.1"}, // }, //}, }) c.DB.SetMaxIdleConns(c.MaxIdleConns) c.DB.SetMaxOpenConns(c.MaxOpenConns) c.DB.SetConnMaxLifetime(time.Hour) } func (m *Ck) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} { list := m.Find(tableName, query, fields, order, 0, 1) if list != nil && len(*list) == 1 { temp := (*list)[0] return &temp } return nil } func (m *Ck) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} { fs := []string{} vs := []interface{}{} for k, v := range query { rt := reflect.TypeOf(v) rv := reflect.ValueOf(v) if rt.Kind() == reflect.Map { for _, rv_k := range rv.MapKeys() { if rv_k.String() == "ne" { fs = append(fs, fmt.Sprintf("%s!=?", k)) vs = append(vs, rv.MapIndex(rv_k).Interface()) } if rv_k.String() == "notin" { if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 { for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) { fs = append(fs, fmt.Sprintf("%s!=?", k)) vs = append(vs, v) } } } if rv_k.String() == "in" { if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 { _fs := fmt.Sprintf("%s in (?", k) for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) { if k > 0 { _fs += ",?" } vs = append(vs, v) } _fs += ")" fs = append(fs, _fs) } } } } else { if v == "$isNull" { fs = append(fs, fmt.Sprintf("%s is null", k)) } else if v == "$isNotNull" { fs = append(fs, fmt.Sprintf("%s is not null", k)) } else { fs = append(fs, fmt.Sprintf("%s=?", k)) vs = append(vs, v) } } } var buffer bytes.Buffer buffer.WriteString("select ") if fields == "" { buffer.WriteString("*") } else { buffer.WriteString(fields) } buffer.WriteString(" from ") buffer.WriteString(tableName) if len(fs) > 0 { buffer.WriteString(" where ") buffer.WriteString(strings.Join(fs, " and ")) } if order != "" { buffer.WriteString(" order by ") buffer.WriteString(order) } if start > -1 && pageSize > 0 { buffer.WriteString(" limit ") buffer.WriteString(fmt.Sprint(start)) buffer.WriteString(",") buffer.WriteString(fmt.Sprint(pageSize)) } q := buffer.String() //log.Println(q, vs) return m.SelectBySql(q, vs...) } func (m *Ck) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} { return m.SelectBySqlByTx(nil, q, args...) } func (m *Ck) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} { return m.Select(0, nil, tx, q, args...) } func (m *Ck) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} { var err error rows, err := m.DB.Query(q, args...) if err != nil { log.Println(err) return nil } if rows != nil { defer rows.Close() } columns, err := rows.Columns() if err != nil { log.Println(err) return nil } list := []map[string]interface{}{} for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k, _ := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Println(err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } list = append(list, ret) if bath > 0 && len(list) == bath { f(&list) list = []map[string]interface{}{} } } if bath > 0 && len(list) > 0 { f(&list) list = []map[string]interface{}{} } return &list }