main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. _ "github.com/go-sql-driver/mysql"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "log"
  9. "strings"
  10. "time"
  11. )
  12. var mysql *sql.DB
  13. func init() {
  14. var err error
  15. mysql, err = sql.Open("mysql", g.Cfg().MustGet(context.Background(), "mysql").String())
  16. if err != nil {
  17. panic(err)
  18. }
  19. // 设置数据库连接参数
  20. mysql.SetMaxOpenConns(100) // 最大连接数
  21. mysql.SetMaxIdleConns(10) // 最大空闲连接数
  22. mysql.SetConnMaxLifetime(time.Minute * 30) // 连接最大生命周期
  23. }
  24. type handleData struct {
  25. Id int64 `json:"id"`
  26. Value string `json:"value"`
  27. }
  28. var (
  29. maxNum = g.Cfg().MustGet(context.Background(), "maxNum").Int() //最大数量限制
  30. table = g.Cfg().MustGet(context.Background(), "flushTable").String()
  31. )
  32. func main() {
  33. // 执行查询语句
  34. rows, err := mysql.Query(fmt.Sprintf("SELECT id,value FROM %s WHERE field ='commonlyRenew' ", table))
  35. if err != nil {
  36. panic(err)
  37. }
  38. defer rows.Close()
  39. // 定义每批查询的大小
  40. batchSize := 100
  41. var update int64 = 0
  42. // 逐批读取数据
  43. for {
  44. // 创建切片用于存储当前批次的数据
  45. data := make([]*handleData, 0, batchSize)
  46. // 从查询结果中读取数据,直到达到批次大小或没有更多数据为止
  47. for i := 0; i < batchSize && rows.Next(); i++ {
  48. var item handleData
  49. err := rows.Scan(&item.Id, &item.Value) // 替换为你的结构体字段
  50. if err != nil {
  51. panic(err)
  52. }
  53. data = append(data, &item)
  54. }
  55. affect := doHandleData(data)
  56. if affect >= 0 {
  57. update += affect
  58. } else {
  59. g.Log().Error(context.Background(), "更新异常", data)
  60. }
  61. // 如果当前批次的数据少于批次大小,表示已经读取完全量数据,退出循环
  62. if len(data) < batchSize {
  63. break
  64. }
  65. }
  66. g.Log().Info(context.Background(), "更新数据条数", update)
  67. }
  68. func doHandleData(list []*handleData) int64 {
  69. var rData []*handleData
  70. for i := 0; i < len(list); i++ {
  71. t := list[i]
  72. if ids := strings.Split(t.Value, ","); len(ids) > maxNum {
  73. //去更新
  74. rData = append(rData, &handleData{t.Id, strings.Join(ids[:maxNum], ",")})
  75. }
  76. }
  77. if len(rData) > 0 {
  78. var query string
  79. var params []interface{}
  80. var idArr []string
  81. for _, d := range rData {
  82. query += fmt.Sprintf("WHEN %d THEN ? ", d.Id)
  83. idArr = append(idArr, fmt.Sprintf("%d", d.Id))
  84. params = append(params, d.Value)
  85. }
  86. result, err := mysql.Exec(fmt.Sprintf("UPDATE %s SET value = CASE id %s END WHERE id in (' %s ')", table, query, strings.Join(idArr, "','")), params...)
  87. if err != nil {
  88. log.Println(err)
  89. return -1
  90. }
  91. count, err := result.RowsAffected()
  92. if err != nil {
  93. log.Println(err)
  94. return -1
  95. }
  96. return count
  97. }
  98. return 0
  99. }