mysql.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. package ent_util
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "fmt"
  6. "reflect"
  7. "strings"
  8. "time"
  9. "log"
  10. _ "github.com/go-sql-driver/mysql"
  11. )
  12. type Mysql struct {
  13. Address string //数据库地址:端口
  14. UserName string //用户名
  15. PassWord string //密码
  16. DBName string //数据库名
  17. DB *sql.DB //数据库连接池对象
  18. MaxOpenConns int //用于设置最大打开的连接数,默认值为0表示不限制。
  19. MaxIdleConns int //用于设置闲置的连接数。
  20. }
  21. func (m *Mysql) Init() {
  22. if m.MaxOpenConns <= 0 {
  23. m.MaxOpenConns = 20
  24. }
  25. if m.MaxIdleConns <= 0 {
  26. m.MaxIdleConns = 20
  27. }
  28. var err error //utf8mb4
  29. m.DB, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName))
  30. log.Println(fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", m.UserName, m.PassWord, m.Address, m.DBName))
  31. if err != nil {
  32. log.Println(err)
  33. return
  34. }
  35. m.DB.SetMaxOpenConns(m.MaxOpenConns)
  36. m.DB.SetMaxIdleConns(m.MaxIdleConns)
  37. m.DB.SetConnMaxLifetime(time.Minute * 3)
  38. err = m.DB.Ping()
  39. if err != nil {
  40. log.Println(err)
  41. }
  42. }
  43. // 新增
  44. func (m *Mysql) Insert(tableName string, data map[string]interface{}) int64 {
  45. return m.InsertByTx(nil, tableName, data)
  46. }
  47. // 带有事务的新增
  48. func (m *Mysql) InsertByTx(tx *sql.Tx, tableName string, data map[string]interface{}) int64 {
  49. fields := []string{}
  50. values := []interface{}{}
  51. placeholders := []string{}
  52. if tableName == "dataexport_order" {
  53. if _, ok := data["user_nickname"]; ok {
  54. data["user_nickname"] = ""
  55. }
  56. }
  57. for k, v := range data {
  58. fields = append(fields, k)
  59. values = append(values, v)
  60. placeholders = append(placeholders, "?")
  61. }
  62. q := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", tableName, strings.Join(fields, ","), strings.Join(placeholders, ","))
  63. log.Println("mysql", q, values)
  64. return m.InsertBySqlByTx(tx, q, values...)
  65. }
  66. // sql语句新增
  67. func (m *Mysql) InsertBySql(q string, args ...interface{}) int64 {
  68. return m.InsertBySqlByTx(nil, q, args...)
  69. }
  70. // 带有事务的sql语句新增
  71. func (m *Mysql) InsertBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
  72. result, _ := m.ExecBySqlByTx(tx, q, args...)
  73. if result == nil {
  74. return -1
  75. }
  76. id, err := result.LastInsertId()
  77. if err != nil {
  78. log.Println(err)
  79. return -1
  80. }
  81. return id
  82. }
  83. // 批量新增
  84. func (m *Mysql) InsertIgnoreBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  85. return m.InsertIgnoreBatchByTx(nil, tableName, fields, values)
  86. }
  87. // 带事务的批量新增
  88. func (m *Mysql) InsertIgnoreBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  89. return m.insertOrReplaceBatchByTx(tx, "INSERT", "IGNORE", tableName, fields, values)
  90. }
  91. // 批量新增
  92. func (m *Mysql) InsertBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  93. return m.InsertBatchByTx(nil, tableName, fields, values)
  94. }
  95. // 带事务的批量新增
  96. func (m *Mysql) InsertBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  97. return m.insertOrReplaceBatchByTx(tx, "INSERT", "", tableName, fields, values)
  98. }
  99. // 批量更新
  100. func (m *Mysql) ReplaceBatch(tableName string, fields []string, values []interface{}) (int64, int64) {
  101. return m.ReplaceBatchByTx(nil, tableName, fields, values)
  102. }
  103. // 带事务的批量更新
  104. func (m *Mysql) ReplaceBatchByTx(tx *sql.Tx, tableName string, fields []string, values []interface{}) (int64, int64) {
  105. return m.insertOrReplaceBatchByTx(tx, "REPLACE", "", tableName, fields, values)
  106. }
  107. func (m *Mysql) insertOrReplaceBatchByTx(tx *sql.Tx, tp string, afterInsert, tableName string, fields []string, values []interface{}) (int64, int64) {
  108. placeholders := []string{}
  109. for range fields {
  110. placeholders = append(placeholders, "?")
  111. }
  112. placeholder := strings.Join(placeholders, ",")
  113. array := []string{}
  114. for i := 0; i < len(values)/len(fields); i++ {
  115. array = append(array, fmt.Sprintf("(%s)", placeholder))
  116. }
  117. q := fmt.Sprintf("%s %s INTO %s (%s) VALUES %s", tp, afterInsert, tableName, strings.Join(fields, ","), strings.Join(array, ","))
  118. result, _ := m.ExecBySqlByTx(tx, q, values...)
  119. if result == nil {
  120. return -1, -1
  121. }
  122. v1, e1 := result.RowsAffected()
  123. if e1 != nil {
  124. log.Println(e1)
  125. return -1, -1
  126. }
  127. v2, e2 := result.LastInsertId()
  128. if e2 != nil {
  129. log.Println(e2)
  130. return -1, -1
  131. }
  132. return v1, v2
  133. }
  134. // sql语句执行
  135. func (m *Mysql) ExecBySql(q string, args ...interface{}) (sql.Result, error) {
  136. return m.ExecBySqlByTx(nil, q, args...)
  137. }
  138. // sql语句执行,带有事务
  139. func (m *Mysql) ExecBySqlByTx(tx *sql.Tx, q string, args ...interface{}) (sql.Result, error) {
  140. var stmtIns *sql.Stmt
  141. var err error
  142. if tx == nil {
  143. stmtIns, err = m.DB.Prepare(q)
  144. } else {
  145. stmtIns, err = tx.Prepare(q)
  146. }
  147. if err != nil {
  148. log.Println(err)
  149. return nil, err
  150. }
  151. defer stmtIns.Close()
  152. result, err := stmtIns.Exec(args...)
  153. if err != nil {
  154. log.Println(args, err)
  155. return nil, err
  156. }
  157. return result, nil
  158. }
  159. /*不等于 map[string]string{"ne":"1"}
  160. *不等于多个 map[string]string{"notin":[]interface{}{1,2}}
  161. *字段为空 map[string]string{"name":"$isNull"}
  162. *字段不为空 map[string]string{"name":"$isNotNull"}
  163. */
  164. func (m *Mysql) Find(tableName string, query map[string]interface{}, fields, order string, start, pageSize int) *[]map[string]interface{} {
  165. fs := []string{}
  166. vs := []interface{}{}
  167. for k, v := range query {
  168. rt := reflect.TypeOf(v)
  169. rv := reflect.ValueOf(v)
  170. if rt.Kind() == reflect.Map {
  171. for _, rv_k := range rv.MapKeys() {
  172. if rv_k.String() == "ne" {
  173. fs = append(fs, fmt.Sprintf("%s!=?", k))
  174. vs = append(vs, rv.MapIndex(rv_k).Interface())
  175. }
  176. if rv_k.String() == "notin" {
  177. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  178. for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  179. fs = append(fs, fmt.Sprintf("%s!=?", k))
  180. vs = append(vs, v)
  181. }
  182. }
  183. }
  184. if rv_k.String() == "gte" {
  185. fs = append(fs, fmt.Sprintf("%s>=?", k))
  186. vs = append(vs, rv.MapIndex(rv_k).Interface())
  187. }
  188. if rv_k.String() == "gt" {
  189. fs = append(fs, fmt.Sprintf("%s>?", k))
  190. vs = append(vs, rv.MapIndex(rv_k).Interface())
  191. }
  192. if rv_k.String() == "lte" {
  193. fs = append(fs, fmt.Sprintf("%s<=?", k))
  194. vs = append(vs, rv.MapIndex(rv_k).Interface())
  195. }
  196. if rv_k.String() == "lt" {
  197. fs = append(fs, fmt.Sprintf("%s<?", k))
  198. vs = append(vs, rv.MapIndex(rv_k).Interface())
  199. }
  200. if rv_k.String() == "in" {
  201. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  202. _fs := fmt.Sprintf("%s in (?", k)
  203. for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  204. if k > 0 {
  205. _fs += ",?"
  206. }
  207. vs = append(vs, v)
  208. }
  209. _fs += ")"
  210. fs = append(fs, _fs)
  211. }
  212. }
  213. }
  214. } else {
  215. if v == "$isNull" {
  216. fs = append(fs, fmt.Sprintf("%s is null", k))
  217. } else if v == "$isNotNull" {
  218. fs = append(fs, fmt.Sprintf("%s is not null", k))
  219. } else {
  220. fs = append(fs, fmt.Sprintf("%s=?", k))
  221. vs = append(vs, v)
  222. }
  223. }
  224. }
  225. var buffer bytes.Buffer
  226. buffer.WriteString("select ")
  227. if fields == "" {
  228. buffer.WriteString("*")
  229. } else {
  230. buffer.WriteString(fields)
  231. }
  232. buffer.WriteString(" from ")
  233. buffer.WriteString(tableName)
  234. if len(fs) > 0 {
  235. buffer.WriteString(" where ")
  236. buffer.WriteString(strings.Join(fs, " and "))
  237. }
  238. if order != "" {
  239. buffer.WriteString(" order by ")
  240. buffer.WriteString(order)
  241. }
  242. if start > -1 && pageSize > 0 {
  243. buffer.WriteString(" limit ")
  244. buffer.WriteString(fmt.Sprint(start))
  245. buffer.WriteString(",")
  246. buffer.WriteString(fmt.Sprint(pageSize))
  247. }
  248. q := buffer.String()
  249. //log.Println(q, vs)
  250. return m.SelectBySql(q, vs...)
  251. }
  252. // sql语句查询
  253. func (m *Mysql) SelectBySql(q string, args ...interface{}) *[]map[string]interface{} {
  254. return m.SelectBySqlByTx(nil, q, args...)
  255. }
  256. func (m *Mysql) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  257. return m.Select(0, nil, tx, q, args...)
  258. }
  259. func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
  260. var stmtOut *sql.Stmt
  261. var err error
  262. if tx == nil {
  263. stmtOut, err = m.DB.Prepare(q)
  264. } else {
  265. stmtOut, err = tx.Prepare(q)
  266. }
  267. if err != nil {
  268. log.Println(err)
  269. return nil
  270. }
  271. defer stmtOut.Close()
  272. rows, err := stmtOut.Query(args...)
  273. if err != nil {
  274. log.Println(err)
  275. return nil
  276. }
  277. if rows != nil {
  278. defer rows.Close()
  279. }
  280. columns, err := rows.Columns()
  281. if err != nil {
  282. log.Println(err)
  283. return nil
  284. }
  285. list := []map[string]interface{}{}
  286. for rows.Next() {
  287. scanArgs := make([]interface{}, len(columns))
  288. values := make([]interface{}, len(columns))
  289. ret := make(map[string]interface{})
  290. for k, _ := range values {
  291. scanArgs[k] = &values[k]
  292. }
  293. err = rows.Scan(scanArgs...)
  294. if err != nil {
  295. log.Println(err)
  296. break
  297. }
  298. for i, col := range values {
  299. if v, ok := col.([]uint8); ok {
  300. ret[columns[i]] = string(v)
  301. } else {
  302. ret[columns[i]] = col
  303. }
  304. }
  305. list = append(list, ret)
  306. if bath > 0 && len(list) == bath {
  307. f(&list)
  308. list = []map[string]interface{}{}
  309. }
  310. }
  311. if bath > 0 && len(list) > 0 {
  312. f(&list)
  313. list = []map[string]interface{}{}
  314. }
  315. return &list
  316. }
  317. func (m *Mysql) SelectByBath(bath int, f func(l *[]map[string]interface{}), q string, args ...interface{}) {
  318. m.SelectByBathByTx(bath, f, nil, q, args...)
  319. }
  320. func (m *Mysql) SelectByBathByTx(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) {
  321. m.Select(bath, f, tx, q, args...)
  322. }
  323. func (m *Mysql) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {
  324. list := m.Find(tableName, query, fields, order, 0, 1)
  325. if list != nil && len(*list) == 1 {
  326. temp := (*list)[0]
  327. return &temp
  328. }
  329. return nil
  330. }
  331. // 修改
  332. func (m *Mysql) Update(tableName string, query, update map[string]interface{}) bool {
  333. return m.UpdateByTx(nil, tableName, query, update)
  334. }
  335. // 带事务的修改
  336. func (m *Mysql) UpdateByTx(tx *sql.Tx, tableName string, query, update map[string]interface{}) bool {
  337. q_fs := []string{}
  338. u_fs := []string{}
  339. values := []interface{}{}
  340. for k, v := range update {
  341. q_fs = append(q_fs, fmt.Sprintf("%s=?", k))
  342. values = append(values, v)
  343. }
  344. for k, v := range query {
  345. u_fs = append(u_fs, fmt.Sprintf("%s=?", k))
  346. values = append(values, v)
  347. }
  348. q := fmt.Sprintf("update %s set %s where %s", tableName, strings.Join(q_fs, ","), strings.Join(u_fs, " and "))
  349. //log.Println(q, values)
  350. return m.UpdateOrDeleteBySqlByTx(tx, q, values...) >= 0
  351. }
  352. // 删除
  353. func (m *Mysql) Delete(tableName string, query map[string]interface{}) bool {
  354. return m.DeleteByTx(nil, tableName, query)
  355. }
  356. func (m *Mysql) DeleteByTx(tx *sql.Tx, tableName string, query map[string]interface{}) bool {
  357. fields := []string{}
  358. values := []interface{}{}
  359. for k, v := range query {
  360. fields = append(fields, fmt.Sprintf("%s=?", k))
  361. values = append(values, v)
  362. }
  363. q := fmt.Sprintf("delete from %s where %s", tableName, strings.Join(fields, " and "))
  364. log.Println(q, values)
  365. return m.UpdateOrDeleteBySqlByTx(tx, q, values...) > 0
  366. }
  367. // 修改或删除
  368. func (m *Mysql) UpdateOrDeleteBySql(q string, args ...interface{}) int64 {
  369. return m.UpdateOrDeleteBySqlByTx(nil, q, args...)
  370. }
  371. // 带事务的修改或删除
  372. func (m *Mysql) UpdateOrDeleteBySqlByTx(tx *sql.Tx, q string, args ...interface{}) int64 {
  373. result, err := m.ExecBySqlByTx(tx, q, args...)
  374. if err != nil {
  375. log.Println(err)
  376. return -1
  377. }
  378. count, err := result.RowsAffected()
  379. if err != nil {
  380. log.Println(err)
  381. return -1
  382. }
  383. return count
  384. }
  385. // 总数
  386. func (m *Mysql) Count(tableName string, query map[string]interface{}) int64 {
  387. fields := []string{}
  388. values := []interface{}{}
  389. for k, v := range query {
  390. rt := reflect.TypeOf(v)
  391. rv := reflect.ValueOf(v)
  392. if rt.Kind() == reflect.Map {
  393. for _, rv_k := range rv.MapKeys() {
  394. if rv_k.String() == "ne" {
  395. fields = append(fields, fmt.Sprintf("%s!=?", k))
  396. values = append(values, rv.MapIndex(rv_k).Interface())
  397. }
  398. if rv_k.String() == "notin" {
  399. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  400. for _, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  401. fields = append(fields, fmt.Sprintf("%s!=?", k))
  402. values = append(values, v)
  403. }
  404. }
  405. }
  406. if rv_k.String() == "in" {
  407. if len(rv.MapIndex(rv_k).Interface().([]interface{})) > 0 {
  408. _fs := fmt.Sprintf("%s in (?", k)
  409. for k, v := range rv.MapIndex(rv_k).Interface().([]interface{}) {
  410. if k > 0 {
  411. _fs += ",?"
  412. }
  413. values = append(values, v)
  414. }
  415. _fs += ")"
  416. fields = append(fields, _fs)
  417. }
  418. }
  419. }
  420. } else if v == "$isNull" {
  421. fields = append(fields, fmt.Sprintf("%s is null", k))
  422. } else if v == "$isNotNull" {
  423. fields = append(fields, fmt.Sprintf("%s is not null", k))
  424. } else {
  425. fields = append(fields, fmt.Sprintf("%s=?", k))
  426. values = append(values, v)
  427. }
  428. }
  429. q := fmt.Sprintf("select count(1) as count from %s", tableName)
  430. if len(query) > 0 {
  431. q += fmt.Sprintf(" where %s", strings.Join(fields, " and "))
  432. }
  433. log.Println(q, values)
  434. return m.CountBySql(q, values...)
  435. }
  436. func (m *Mysql) CountBySql(q string, args ...interface{}) int64 {
  437. stmtIns, err := m.DB.Prepare(q)
  438. if err != nil {
  439. log.Println(err)
  440. return -1
  441. }
  442. defer stmtIns.Close()
  443. rows, err := stmtIns.Query(args...)
  444. if err != nil {
  445. log.Println(err)
  446. return -1
  447. }
  448. if rows != nil {
  449. defer rows.Close()
  450. }
  451. var count int64 = -1
  452. if rows.Next() {
  453. err = rows.Scan(&count)
  454. if err != nil {
  455. log.Println(err)
  456. }
  457. }
  458. return count
  459. }
  460. // 执行事务
  461. func (m *Mysql) ExecTx(msg string, f func(tx *sql.Tx) bool) bool {
  462. tx, err := m.DB.Begin()
  463. if err != nil {
  464. log.Println(msg, "获取事务错误", err)
  465. } else {
  466. if f(tx) {
  467. if err := tx.Commit(); err != nil {
  468. log.Println(msg, "提交事务错误", err)
  469. } else {
  470. return true
  471. }
  472. } else {
  473. if err := tx.Rollback(); err != nil {
  474. log.Println(msg, "事务回滚错误", err)
  475. }
  476. }
  477. }
  478. return false
  479. }
  480. /*************方法命名不规范,上面有替代方法*************/
  481. func (m *Mysql) Query(query string, args ...interface{}) *[]map[string]interface{} {
  482. return m.SelectBySql(query, args...)
  483. }
  484. func (m *Mysql) QueryCount(query string, args ...interface{}) (count int) {
  485. count = -1
  486. if !strings.Contains(strings.ToLower(query), "count(*)") {
  487. fmt.Println("QueryCount need query like < select count(*) from ..... >")
  488. return
  489. }
  490. count = int(m.CountBySql(query, args...))
  491. return
  492. }