mysql.go 16 KB


  1. package mysql
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "fmt"
  6. "log"
  7. "reflect"
  8. "strings"
  9. "time"
  10. _ "github.com/ClickHouse/clickhouse-go/v2"
  11. _ "github.com/go-sql-driver/mysql"
  12. )
  13. const (
  14. CLICKHOUSE = "clickhouse"
  15. )
  16. type Mysql struct {
  17. Address string //数据库地址:端口
  18. UserName string //用户名
  19. PassWord string //密码
  20. DBName string //数据库名
  21. DB *sql.DB `json:",optional"` //数据库连接池对象
  22. MaxOpenConns int //用于设置最大打开的连接数,默认值为0表示不限制。
  23. MaxIdleConns int //用于设置闲置的连接数。
  24. driverName string
  25. }
  26. //
  27. func NewInit(driverName, dataSourceName string, maxOpenConns, maxIdleConns int) *Mysql {
  28. if maxOpenConns <= 0 {
  29. maxOpenConns = 30
  30. }
  31. if maxIdleConns <= 0 {
  32. maxIdleConns = 6
  33. }
  34. db, err := sql.Open(driverName, dataSourceName)
  35. if err != nil {
  36. log.Println("open error", err)
  37. return nil
  38. }
  39. db.SetMaxOpenConns(maxOpenConns)
  40. db.SetMaxIdleConns(maxIdleConns)
  41. db.SetConnMaxLifetime(14400 * time.Second)
  42. if err := db.Ping(); err != nil {
  43. log.Println("ping error", err)
  44. return nil
  45. }
  46. return &Mysql{DB: db, driverName: driverName}
  47. }
  48. //
  49. func (m *Mysql) Init() {
  50. m.driverName = "mysql"
  51. nm := NewInit(m.driverName, fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName), m.MaxOpenConns, m.MaxIdleConns)
  52. if nm == nil {
  53. return
  54. }
  55. m.DB = nm.DB
  56. }
  57. //新增
  58. func (m *Mysql) Insert(tableName string, data map[string]interface{}) int64 {
  59. return m.InsertByTx(nil, tableName, data)
  60. }
  61. //带有事务的新增
  62. func (m *Mysql) InsertByTx(tx *sql.Tx, tableName string, data map[string]interface{}) int64 {
  63. fields := []string{}
  64. values := []interface{}{}
  65. placeholders := []string{}
  66. if tableName == "dataexport_order" {
  67. if _, ok := data["user_nickname"]; ok {
  68. data["user_nickname"] = ""
  69. }
  70. }
  71. for k, v := range data {
  72. fields = append(fields, k)
  73. values = append(values, v)
  74. placeholders = append(placeholders, "?")
  75. }
  76. q := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, strings.Join(fields, ","), strings.Join(placeholders, ","))
  77. log.Println(q, values)
  78. return m.InsertBySqlByTx(tx, q, values...)
  79. }
  80. //sql语句新增
  81. func (m *Mysql) InsertBySql(q string, args ...interface{}) int64 {
  82. return m.InsertBySqlByTx(nil, q, args...)
  83. }
  84. //带有事务的sql语句新增
  85. func (m *Mysql) InsertBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
  86. result, _ := m.ExecBySqlByTx(tx, q, args...)
  87. if result == nil {
  88. return -1
  89. }
  90. if m.driverName == CLICKHOUSE {
  91. return 0
  92. }
  93. id, err := result.LastInsertId()
  94. if err != nil {
  95. log.Println(err)
  96. return -1
  97. }
  98. return id
  99. }
  100. //批量新增
  101. func (m *Mysql) InsertIgnoreBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  102. return m.InsertIgnoreBatchByTx(nil, tableName, fields, values)
  103. }
  104. //带事务的批量新增
  105. func (m *Mysql) InsertIgnoreBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  106. return m.insertOrReplaceBatchByTx(tx, "INSERT", "IGNORE", tableName, fields, values)
  107. }
  108. //批量新增
  109. func (m *Mysql) InsertBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  110. return m.InsertBatchByTx(nil, tableName, fields, values)
  111. }
  112. //带事务的批量新增
  113. func (m *Mysql) InsertBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  114. return m.insertOrReplaceBatchByTx(tx, "INSERT", "", tableName, fields, values)
  115. }
  116. //批量更新
  117. func (m *Mysql) ReplaceBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  118. return m.ReplaceBatchByTx(nil, tableName, fields, values)
  119. }
  120. //带事务的批量更新
  121. func (m *Mysql) ReplaceBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  122. return m.insertOrReplaceBatchByTx(tx, "REPLACE", "", tableName, fields, values)
  123. }
  124. func (m *Mysql) insertOrReplaceBatchByTx(tx *sql.Tx, tp string, afterInsert, tableName string, fields []string, values []interface{}) (int64, int64) {
  125. placeholders := []string{}
  126. for range fields {
  127. placeholders = append(placeholders, "?")
  128. }
  129. placeholder := strings.Join(placeholders, ",")
  130. array := []string{}
  131. for i := 0; i < len(values)/len(fields); i++ {
  132. array = append(array, fmt.Sprintf("(%s)", placeholder))
  133. }
  134. q := fmt.Sprintf("%s %s INTO %s (%s) VALUES %s", tp, afterInsert, tableName, strings.Join(fields, ","), strings.Join(array, ","))
  135. result, _ := m.ExecBySqlByTx(tx, q, values...)
  136. if result == nil {
  137. return -1, -1
  138. }
  139. if m.driverName == CLICKHOUSE {
  140. return 0, 0
  141. }
  142. v1, e1 := result.RowsAffected()
  143. if e1 != nil {
  144. log.Println(e1)
  145. return -1, -1
  146. }
  147. v2, e2 := result.LastInsertId()
  148. if e2 != nil {
  149. log.Println(e2)
  150. return -1, -1
  151. }
  152. return v1, v2
  153. }
  154. //sql语句执行
  155. func (m *Mysql) ExecBySql(q string, args ...interface{}) (sql.Result, error) {
  156. return m.ExecBySqlByTx(nil, q, args...)
  157. }
  158. //sql语句执行,带有事务
  159. func (m *Mysql) ExecBySqlByTx(tx *sql.Tx, q string, args ...interface{}) (sql.Result, error) {
  160. var result sql.Result
  161. var err error
  162. if m.driverName == CLICKHOUSE {
  163. result, err = m.DB.Exec(q, args...)
  164. } else {
  165. var stmtIns *sql.Stmt
  166. if tx == nil {
  167. stmtIns, err = m.DB.Prepare(q)
  168. } else {
  169. stmtIns, err = tx.Prepare(q)
  170. }
  171. if err != nil {
  172. log.Println(err)
  173. return nil, err
  174. }
  175. defer stmtIns.Close()
  176. result, err = stmtIns.Exec(args...)
  177. }
  178. if err != nil {
  179. log.Println(args, err)
  180. return nil, err
  181. }
  182. return result, nil
  183. }
  184. /*不等于 map[string]string{"ne":"1"}
  185. *不等于多个 map[string]string{"notin":[]interface{}{1,2}}
  186. *字段为空 map[string]string{"name":"$isNull"}
  187. *字段不为空 map[string]string{"name":"$isNotNull"}
  188. */
  189. func (m *Mysql) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} {
  190. fs := []string{}
  191. vs := []interface{}{}
  192. for k, v := range query {
  193. rt := reflect.TypeOf(v)
  194. rv := reflect.ValueOf(v)
  195. if rt.Kind() == reflect.Map {
  196. for _, rv_k := range rv.MapKeys() {
  197. if rv_k.String() == "ne" {
  198. fs = append(fs, fmt.Sprintf("%s!=?", k))
  199. vs = append(vs, rv.MapIndex(rv_k).Interface())
  200. }
  201. if rv_k.String() == "notin" {
  202. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  203. for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  204. fs = append(fs, fmt.Sprintf("%s!=?", k))
  205. vs = append(vs, v)
  206. }
  207. }
  208. }
  209. if rv_k.String() == "in" {
  210. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  211. _fs := fmt.Sprintf("%s in (?", k)
  212. for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  213. if k > 0 {
  214. _fs += ",?"
  215. }
  216. vs = append(vs, v)
  217. }
  218. _fs += ")"
  219. fs = append(fs, _fs)
  220. }
  221. }
  222. }
  223. } else {
  224. if v == "$isNull" {
  225. fs = append(fs, fmt.Sprintf("%s is null", k))
  226. } else if v == "$isNotNull" {
  227. fs = append(fs, fmt.Sprintf("%s is not null", k))
  228. } else {
  229. fs = append(fs, fmt.Sprintf("%s=?", k))
  230. vs = append(vs, v)
  231. }
  232. }
  233. }
  234. var buffer bytes.Buffer
  235. buffer.WriteString("select ")
  236. if fields == "" {
  237. buffer.WriteString("*")
  238. } else {
  239. buffer.WriteString(fields)
  240. }
  241. buffer.WriteString(" from ")
  242. buffer.WriteString(tableName)
  243. if len(fs) > 0 {
  244. buffer.WriteString(" where ")
  245. buffer.WriteString(strings.Join(fs, " and "))
  246. }
  247. if order != "" {
  248. buffer.WriteString(" order by ")
  249. buffer.WriteString(order)
  250. }
  251. if start > -1 && pageSize > 0 {
  252. buffer.WriteString(" limit ")
  253. buffer.WriteString(fmt.Sprint(start))
  254. buffer.WriteString(",")
  255. buffer.WriteString(fmt.Sprint(pageSize))
  256. }
  257. q := buffer.String()
  258. log.Println(q, vs)
  259. return m.SelectBySql(q, vs...)
  260. }
  261. //sql语句查询
  262. func (m *Mysql) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} {
  263. return m.SelectBySqlByTx(nil, q, args...)
  264. }
  265. func (m *Mysql) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  266. return m.Select(0, nil, tx, q, args...)
  267. }
  268. func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}) bool, tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  269. var rows *sql.Rows
  270. var err error
  271. if m.driverName == CLICKHOUSE {
  272. rows, err = m.DB.Query(q, args...)
  273. } else {
  274. var stmtOut *sql.Stmt
  275. if tx == nil {
  276. stmtOut, err = m.DB.Prepare(q)
  277. } else {
  278. stmtOut, err = tx.Prepare(q)
  279. }
  280. if err != nil {
  281. log.Println(err)
  282. return nil
  283. }
  284. defer stmtOut.Close()
  285. rows, err = stmtOut.Query(args...)
  286. }
  287. if err != nil {
  288. log.Println(err)
  289. return nil
  290. }
  291. if rows != nil {
  292. defer rows.Close()
  293. }
  294. columns, err := rows.Columns()
  295. if err != nil {
  296. log.Println(err)
  297. return nil
  298. }
  299. list := []map[string]interface{}{}
  300. for rows.Next() {
  301. scanArgs := make([]interface{}, len(columns))
  302. values := make([]interface{}, len(columns))
  303. ret := make(map[string]interface{})
  304. for k, _ := range values {
  305. scanArgs[k] = &values[k]
  306. }
  307. err = rows.Scan(scanArgs...)
  308. if err != nil {
  309. log.Println(err)
  310. break
  311. }
  312. for i, col := range values {
  313. if v, ok := col.([]uint8); ok {
  314. ret[columns[i]] = string(v)
  315. } else {
  316. ret[columns[i]] = col
  317. }
  318. }
  319. list = append(list, ret)
  320. if bath > 0 && len(list) == bath {
  321. if !f(&list) {
  322. list = []map[string]interface{}{}
  323. break
  324. }
  325. list = []map[string]interface{}{}
  326. }
  327. }
  328. if bath > 0 && len(list) > 0 {
  329. f(&list)
  330. list = []map[string]interface{}{}
  331. }
  332. return &list
  333. }
  334. func (m *Mysql) SelectByBath(bath int, f func(l *[]map[string]interface{}) bool, q string, args ...interface{}) {
  335. m.SelectByBathByTx(bath, f, nil, q, args...)
  336. }
  337. func (m *Mysql) SelectByBathByTx(bath int, f func(l *[]map[string]interface{}) bool, tx *sql.Tx, q string, args ...interface{}) {
  338. m.Select(bath, f, tx, q, args...)
  339. }
  340. func (m *Mysql) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {
  341. list := m.Find(tableName, query, fields, order, 0, 1)
  342. if list != nil && len(*list) == 1 {
  343. temp := (*list)[0]
  344. return &temp
  345. }
  346. return nil
  347. }
  348. //修改
  349. func (m *Mysql) Update(tableName string, query, update map[string]interface{}) bool {
  350. return m.UpdateByTx(nil, tableName, query, update)
  351. }
  352. //带事务的修改
  353. func (m *Mysql) UpdateByTx(tx *sql.Tx, tableName string, query, update map[string]interface{}) bool {
  354. q_fs := []string{}
  355. u_fs := []string{}
  356. values := []interface{}{}
  357. for k, v := range update {
  358. q_fs = append(q_fs, fmt.Sprintf("%s=?", k))
  359. values = append(values, v)
  360. }
  361. for k, v := range query {
  362. u_fs = append(u_fs, fmt.Sprintf("%s=?", k))
  363. values = append(values, v)
  364. }
  365. q := fmt.Sprintf("update %s set %s where %s", tableName, strings.Join(q_fs, ","), strings.Join(u_fs, " and "))
  366. log.Println(q, values)
  367. return m.UpdateOrDeleteBySqlByTx(tx, q, values...) >= 0
  368. }
  369. //批量更新
  370. func (m *Mysql) UpdateBath(tableName string, fields []string, array [][]interface{}) {
  371. m.UpdateBathByTx(nil, tableName, fields, array)
  372. }
  373. //带事务的批量更新
  374. func (m *Mysql) UpdateBathByTx(tx *sql.Tx, tableName string, fields []string, array [][]interface{}) int64 {
  375. ws := []string{}
  376. args := []interface{}{}
  377. ids := []interface{}{}
  378. casethen := []string{}
  379. for n := 0; n < len(array[0]); n++ {
  380. for _, v := range array {
  381. if n == 0 {
  382. ws = append(ws, "?")
  383. ids = append(ids, v[0])
  384. casethen = append(casethen, "when ? then ?")
  385. } else {
  386. args = append(args, v[0], v[n])
  387. }
  388. }
  389. }
  390. ct := strings.Join(casethen, " ")
  391. sql_appends := []string{}
  392. for k, v := range fields {
  393. if k == 0 {
  394. continue
  395. }
  396. sql_appends = append(sql_appends, fmt.Sprintf(`%s=case %s %s end`, v, fields[0], ct))
  397. }
  398. args = append(args, ids...)
  399. sql := fmt.Sprintf(`update %s set %s where %s in (%s)`, tableName, strings.Join(sql_appends, ","), fields[0], strings.Join(ws, ","))
  400. return m.UpdateOrDeleteBySqlByTx(tx, sql, args...)
  401. }
  402. //删除
  403. func (m *Mysql) Delete(tableName string, query map[string]interface{}) bool {
  404. return m.DeleteByTx(nil, tableName, query)
  405. }
  406. func (m *Mysql) DeleteByTx(tx *sql.Tx, tableName string, query map[string]interface{}) bool {
  407. fields := []string{}
  408. values := []interface{}{}
  409. for k, v := range query {
  410. fields = append(fields, fmt.Sprintf("%s=?", k))
  411. values = append(values, v)
  412. }
  413. q := fmt.Sprintf("delete from %s where %s", tableName, strings.Join(fields, " and "))
  414. log.Println(q, values)
  415. return m.UpdateOrDeleteBySqlByTx(tx, q, values...) > 0
  416. }
  417. //修改或删除
  418. func (m *Mysql) UpdateOrDeleteBySql(q string, args ...interface{}) int64 {
  419. return m.UpdateOrDeleteBySqlByTx(nil, q, args...)
  420. }
  421. //带事务的修改或删除
  422. func (m *Mysql) UpdateOrDeleteBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
  423. result, _ := m.ExecBySqlByTx(tx, q, args...)
  424. if result == nil {
  425. return -1
  426. }
  427. if m.driverName == CLICKHOUSE {
  428. return 0
  429. }
  430. count, err := result.RowsAffected()
  431. if err != nil {
  432. log.Println(err)
  433. return -1
  434. }
  435. return count
  436. }
  437. //总数
  438. func (m *Mysql) Count(tableName string, query map[string]interface{}) int64 {
  439. fields := []string{}
  440. values := []interface{}{}
  441. for k, v := range query {
  442. rt := reflect.TypeOf(v)
  443. rv := reflect.ValueOf(v)
  444. if rt.Kind() == reflect.Map {
  445. for _, rv_k := range rv.MapKeys() {
  446. if rv_k.String() == "ne" {
  447. fields = append(fields, fmt.Sprintf("%s!=?", k))
  448. values = append(values, rv.MapIndex(rv_k).Interface())
  449. }
  450. if rv_k.String() == "notin" {
  451. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  452. for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  453. fields = append(fields, fmt.Sprintf("%s!=?", k))
  454. values = append(values, v)
  455. }
  456. }
  457. }
  458. if rv_k.String() == "in" {
  459. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  460. _fs := fmt.Sprintf("%s in (?", k)
  461. for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  462. if k > 0 {
  463. _fs += ",?"
  464. }
  465. values = append(values, v)
  466. }
  467. _fs += ")"
  468. fields = append(fields, _fs)
  469. }
  470. }
  471. }
  472. } else if v == "$isNull" {
  473. fields = append(fields, fmt.Sprintf("%s is null", k))
  474. } else if v == "$isNotNull" {
  475. fields = append(fields, fmt.Sprintf("%s is not null", k))
  476. } else {
  477. fields = append(fields, fmt.Sprintf("%s=?", k))
  478. values = append(values, v)
  479. }
  480. }
  481. q := fmt.Sprintf("select count(1) as count from %s", tableName)
  482. if len(query) > 0 {
  483. q += fmt.Sprintf(" where %s", strings.Join(fields, " and "))
  484. }
  485. log.Println(q, values)
  486. return m.CountBySql(q, values...)
  487. }
  488. func (m *Mysql) CountBySql(q string, args ...interface{}) int64 {
  489. var rows *sql.Rows
  490. var err error
  491. if m.driverName == CLICKHOUSE {
  492. rows, err = m.DB.Query(q, args...)
  493. } else {
  494. var stmtOut *sql.Stmt
  495. stmtOut, err = m.DB.Prepare(q)
  496. if err != nil {
  497. log.Println(err)
  498. return -1
  499. }
  500. defer stmtOut.Close()
  501. rows, err = stmtOut.Query(args...)
  502. }
  503. if err != nil {
  504. log.Println(err)
  505. return -1
  506. }
  507. if rows != nil {
  508. defer rows.Close()
  509. }
  510. var count int64 = -1
  511. if rows.Next() {
  512. err = rows.Scan(&count)
  513. if err != nil {
  514. log.Println(err)
  515. }
  516. }
  517. return count
  518. }
  519. //执行事务
  520. func (m *Mysql) ExecTx(msg string, f func(tx *sql.Tx) bool) bool {
  521. tx, err := m.DB.Begin()
  522. if err != nil {
  523. log.Println(msg, "获取事务错误", err)
  524. } else {
  525. if f(tx) {
  526. if err := tx.Commit(); err != nil {
  527. log.Println(msg, "提交事务错误", err)
  528. } else {
  529. return true
  530. }
  531. } else {
  532. if err := tx.Rollback(); err != nil {
  533. log.Println(msg, "事务回滚错误", err)
  534. }
  535. }
  536. }
  537. return false
  538. }
  539. /*************方法命名不规范,上面有替代方法*************/
  540. func (m *Mysql) Query(query string, args ...interface{}) *[]map[string]interface{} {
  541. return m.SelectBySql(query, args...)
  542. }
  543. func (m *Mysql) QueryCount(query string, args ...interface{}) (count int) {
  544. count = -1
  545. if !strings.Contains(strings.ToLower(query), "count(*)") {
  546. fmt.Println("QueryCount need query like < select count(*) from ..... >")
  547. return
  548. }
  549. count = int(m.CountBySql(query, args...))
  550. return
  551. }