Quellcode durchsuchen

feat:增加clickhouse

wangchuanjin vor 1 Jahr
Ursprung
Commit
3b7d74036a
4 geänderte Dateien mit 1348 neuen und 70 gelöschten Zeilen
  1. 6 18
      go.mod
  2. 1227 4
      go.sum
  3. 86 48
      mysql/mysql.go
  4. 29 0
      mysql/mysql_test.go

+ 6 - 18
go.mod

@@ -4,36 +4,24 @@ go 1.13
 
 require (
 	app.yhyue.com/moapp/esv1 v0.0.0-20220414031211-3da4123e648d
+	github.com/ClickHouse/clickhouse-go/v2 v2.18.0
 	github.com/RoaringBitmap/roaring v1.5.0
 	github.com/coscms/tagfast v0.0.0-20150925144250-2b69b2496250
 	github.com/donnie4w/go-logger v0.0.0-20170827050443-4740c51383f4
-	github.com/fsnotify/fsnotify v1.4.9
+	github.com/fsnotify/fsnotify v1.6.0
 	github.com/garyburd/redigo v1.6.2
 	github.com/go-sql-driver/mysql v1.6.0
 	github.com/golang-jwt/jwt/v4 v4.4.2
-	github.com/golang/snappy v0.0.4 // indirect
 	github.com/gomodule/redigo v1.8.9
-	github.com/google/go-cmp v0.5.8 // indirect
 	github.com/howeyc/fsnotify v0.9.0
-	github.com/kr/text v0.2.0 // indirect
 	github.com/olivere/elastic/v7 v7.0.22
-	github.com/stretchr/testify v1.8.0 // indirect
 	github.com/yl2chen/cidranger v1.0.2
-	go.etcd.io/etcd/client/v3 v3.5.4
-	go.mongodb.org/mongo-driver v1.9.1
-	go.uber.org/zap v1.21.0
-	go.uber.org/atomic v1.9.0 // indirect
-	go.uber.org/goleak v1.1.12 // indirect
-	go.uber.org/multierr v1.8.0 // indirect
-	go.uber.org/zap v1.21.0 // indirect
-	golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
-	golang.org/x/net v0.0.0-20220531201128-c960675eff93 // indirect
-	google.golang.org/genproto v0.0.0-20220602131408-e326c6e8e9c8 // indirect
-	google.golang.org/grpc v1.47.0 // indirect
+	go.etcd.io/etcd/client/v3 v3.5.5
+	go.mongodb.org/mongo-driver v1.11.4
+	go.uber.org/zap v1.25.0
 	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc
-	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
 	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
-	gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
+	gopkg.in/natefinch/lumberjack.v2 v2.2.1
 	gorm.io/driver/mysql v1.0.5
 	gorm.io/gorm v1.21.3
 )

Datei-Diff unterdrückt, da er zu groß ist
+ 1227 - 4
go.sum


+ 86 - 48
mysql/mysql.go

@@ -9,9 +9,14 @@ import (
 	"strings"
 	"time"
 
+	_ "github.com/ClickHouse/clickhouse-go/v2"
 	_ "github.com/go-sql-driver/mysql"
 )
 
+const (
+	CLICKHOUSE = "clickhouse"
+)
+
 type Mysql struct {
 	Address      string  //数据库地址:端口
 	UserName     string  //用户名
@@ -20,28 +25,40 @@ type Mysql struct {
 	DB           *sql.DB `json:",optional"` //数据库连接池对象
 	MaxOpenConns int     //用于设置最大打开的连接数,默认值为0表示不限制。
 	MaxIdleConns int     //用于设置闲置的连接数。
+	driverName   string
 }
 
-func (m *Mysql) Init() {
-	if m.MaxOpenConns <= 0 {
-		m.MaxOpenConns = 30
+//
+func NewInit(driverName, dataSourceName string, maxOpenConns, maxIdleConns int) *Mysql {
+	if maxOpenConns <= 0 {
+		maxOpenConns = 30
 	}
-	if m.MaxIdleConns <= 0 {
-		m.MaxIdleConns = 6
+	if maxIdleConns <= 0 {
+		maxIdleConns = 6
 	}
-	var err error
-	m.DB, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName))
+	db, err := sql.Open(driverName, dataSourceName)
 	if err != nil {
-		log.Println(err)
-		return
+		log.Println("open error", err)
+		return nil
 	}
-	m.DB.SetMaxOpenConns(m.MaxOpenConns)
-	m.DB.SetMaxIdleConns(m.MaxIdleConns)
-	m.DB.SetConnMaxLifetime(14400 * time.Second)
-	err = m.DB.Ping()
-	if err != nil {
-		log.Println(err)
+	db.SetMaxOpenConns(maxOpenConns)
+	db.SetMaxIdleConns(maxIdleConns)
+	db.SetConnMaxLifetime(14400 * time.Second)
+	if err := db.Ping(); err != nil {
+		log.Println("ping error", err)
+		return nil
 	}
+	return &Mysql{DB: db, driverName: driverName}
+}
+
+//
+func (m *Mysql) Init() {
+	m.driverName = "mysql"
+	nm := NewInit(m.driverName, fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName), m.MaxOpenConns, m.MaxIdleConns)
+	if nm == nil {
+		return
+	}
+	m.DB = nm.DB
 }
 
 //新增
@@ -65,7 +82,7 @@ func (m *Mysql) InsertByTx(tx *sql.Tx, tableName string, data map[string]interfa
 		placeholders = append(placeholders, "?")
 	}
 	q := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, strings.Join(fields, ","), strings.Join(placeholders, ","))
-	log.Println("mysql", q, values)
+	log.Println(q, values)
 	return m.InsertBySqlByTx(tx, q, values...)
 }
 
@@ -80,6 +97,9 @@ func (m *Mysql) InsertBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64
 	if result == nil {
 		return -1
 	}
+	if m.driverName == CLICKHOUSE {
+		return 0
+	}
 	id, err := result.LastInsertId()
 	if err != nil {
 		log.Println(err)
@@ -133,6 +153,9 @@ func (m *Mysql) insertOrReplaceBatchByTx(tx *sql.Tx, tp string, afterInsert, tab
 	if result == nil {
 		return -1, -1
 	}
+	if m.driverName == CLICKHOUSE {
+		return 0, 0
+	}
 	v1, e1 := result.RowsAffected()
 	if e1 != nil {
 		log.Println(e1)
@@ -153,19 +176,24 @@ func (m *Mysql) ExecBySql(q string, args ...interface{}) (sql.Result, error) {
 
 //sql语句执行,带有事务
 func (m *Mysql) ExecBySqlByTx(tx *sql.Tx, q string, args ...interface{}) (sql.Result, error) {
-	var stmtIns *sql.Stmt
+	var result sql.Result
 	var err error
-	if tx == nil {
-		stmtIns, err = m.DB.Prepare(q)
+	if m.driverName == CLICKHOUSE {
+		result, err = m.DB.Exec(q, args...)
 	} else {
-		stmtIns, err = tx.Prepare(q)
-	}
-	if err != nil {
-		log.Println(err)
-		return nil, err
+		var stmtIns *sql.Stmt
+		if tx == nil {
+			stmtIns, err = m.DB.Prepare(q)
+		} else {
+			stmtIns, err = tx.Prepare(q)
+		}
+		if err != nil {
+			log.Println(err)
+			return nil, err
+		}
+		defer stmtIns.Close()
+		result, err = stmtIns.Exec(args...)
 	}
-	defer stmtIns.Close()
-	result, err := stmtIns.Exec(args...)
 	if err != nil {
 		log.Println(args, err)
 		return nil, err
@@ -259,21 +287,24 @@ func (m *Mysql) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]ma
 	return m.Select(0, nil, tx, q, args...)
 }
 func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}) bool, tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
-	var stmtOut *sql.Stmt
+	var rows *sql.Rows
 	var err error
-	if tx == nil {
-		stmtOut, err = m.DB.Prepare(q)
+	if m.driverName == CLICKHOUSE {
+		rows, err = m.DB.Query(q, args...)
 	} else {
-		stmtOut, err = tx.Prepare(q)
-	}
-	if err != nil {
-		log.Println(err)
-		return nil
+		var stmtOut *sql.Stmt
+		if tx == nil {
+			stmtOut, err = m.DB.Prepare(q)
+		} else {
+			stmtOut, err = tx.Prepare(q)
+		}
+		if err != nil {
+			return nil
+		}
+		defer stmtOut.Close()
+		rows, err = stmtOut.Query(args...)
 	}
-	defer stmtOut.Close()
-	rows, err := stmtOut.Query(args...)
 	if err != nil {
-		log.Println(err)
 		return nil
 	}
 	if rows != nil {
@@ -281,7 +312,6 @@ func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}) bool, tx *s
 	}
 	columns, err := rows.Columns()
 	if err != nil {
-		log.Println(err)
 		return nil
 	}
 	list := []map[string]interface{}{}
@@ -415,11 +445,13 @@ func (m *Mysql) UpdateOrDeleteBySql(q string, args ...interface{}) int64 {
 
 //带事务的修改或删除
 func (m *Mysql) UpdateOrDeleteBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
-	result, err := m.ExecBySqlByTx(tx, q, args...)
-	if err != nil {
-		log.Println(err)
+	result, _ := m.ExecBySqlByTx(tx, q, args...)
+	if result == nil {
 		return -1
 	}
+	if m.driverName == CLICKHOUSE {
+		return 0
+	}
 	count, err := result.RowsAffected()
 	if err != nil {
 		log.Println(err)
@@ -480,14 +512,20 @@ func (m *Mysql) Count(tableName string, query map[string]interface{}) int64 {
 	return m.CountBySql(q, values...)
 }
 func (m *Mysql) CountBySql(q string, args ...interface{}) int64 {
-	stmtIns, err := m.DB.Prepare(q)
-	if err != nil {
-		log.Println(err)
-		return -1
+	var rows *sql.Rows
+	var err error
+	if m.driverName == CLICKHOUSE {
+		rows, err = m.DB.Query(q, args...)
+	} else {
+		var stmtOut *sql.Stmt
+		stmtOut, err = m.DB.Prepare(q)
+		if err != nil {
+			log.Println(err)
+			return -1
+		}
+		defer stmtOut.Close()
+		rows, err = stmtOut.Query(args...)
 	}
-	defer stmtIns.Close()
-
-	rows, err := stmtIns.Query(args...)
 	if err != nil {
 		log.Println(err)
 		return -1

+ 29 - 0
mysql/mysql_test.go

@@ -0,0 +1,29 @@
+package mysql
+
+import (
+	"log"
+	"testing"
+)
+
+func TestMysqlSelect(t *testing.T) {
+	m := &Mysql{
+		Address:      "192.168.3.217:4000",
+		UserName:     "root",
+		PassWord:     "=PDT49#80Z!RVv52_z",
+		DBName:       "jianyu",
+		MaxOpenConns: 2, //用于设置最大打开的连接数,默认值为0表示不限制。
+		MaxIdleConns: 2, //用于设置闲置的连接数。
+	}
+	m.Init()
+	list := m.SelectBySql(`select * from entniche_info limit ?`, 1)
+	log.Println("-----", list)
+}
+
+//
+func TestClickHouseSelect(t *testing.T) {
+	m := NewInit("clickhouse", "clickhouse://jytop:pwdTopJy123@192.168.3.207:19000/information?dial_timeout=2000ms&max_execution_time=60s", 1, 1)
+	list := m.SelectBySql(`select id from information limit ?`, 1)
+	c := m.CountBySql(`select count(1) as c from information`)
+	log.Println("-----", list, c)
+	log.Println(m.InsertBatch("wcj.test", []string{"name"}, []interface{}{"456"}))
+}

Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.