123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package main
- import (
- "context"
- "database/sql"
- "fmt"
- _ "github.com/go-sql-driver/mysql"
- "github.com/gogf/gf/v2/frame/g"
- "log"
- "strings"
- "time"
- )
- var mysql *sql.DB
- func init() {
- var err error
- mysql, err = sql.Open("mysql", g.Cfg().MustGet(context.Background(), "mysql").String())
- if err != nil {
- panic(err)
- }
- // 设置数据库连接参数
- mysql.SetMaxOpenConns(100) // 最大连接数
- mysql.SetMaxIdleConns(10) // 最大空闲连接数
- mysql.SetConnMaxLifetime(time.Minute * 30) // 连接最大生命周期
- }
- type handleData struct {
- Id int64 `json:"id"`
- Value string `json:"value"`
- }
- var (
- maxNum = g.Cfg().MustGet(context.Background(), "maxNum").Int() //最大数量限制
- table = g.Cfg().MustGet(context.Background(), "flushTable").String()
- )
- func main() {
- // 执行查询语句
- rows, err := mysql.Query(fmt.Sprintf("SELECT id,value FROM %s WHERE field ='commonlyRenew' ", table))
- if err != nil {
- panic(err)
- }
- defer rows.Close()
- // 定义每批查询的大小
- batchSize := 100
- var update int64 = 0
- // 逐批读取数据
- for {
- // 创建切片用于存储当前批次的数据
- data := make([]*handleData, 0, batchSize)
- // 从查询结果中读取数据,直到达到批次大小或没有更多数据为止
- for i := 0; i < batchSize && rows.Next(); i++ {
- var item handleData
- err := rows.Scan(&item.Id, &item.Value) // 替换为你的结构体字段
- if err != nil {
- panic(err)
- }
- data = append(data, &item)
- }
- affect := doHandleData(data)
- if affect >= 0 {
- update += affect
- } else {
- g.Log().Error(context.Background(), "更新异常", data)
- }
- // 如果当前批次的数据少于批次大小,表示已经读取完全量数据,退出循环
- if len(data) < batchSize {
- break
- }
- }
- g.Log().Info(context.Background(), "更新数据条数", update)
- }
- func doHandleData(list []*handleData) int64 {
- var rData []*handleData
- for i := 0; i < len(list); i++ {
- t := list[i]
- if ids := strings.Split(t.Value, ","); len(ids) > maxNum {
- //去更新
- rData = append(rData, &handleData{t.Id, strings.Join(ids[:maxNum], ",")})
- }
- }
- if len(rData) > 0 {
- var query string
- var params []interface{}
- var idArr []string
- for _, d := range rData {
- query += fmt.Sprintf("WHEN %d THEN ? ", d.Id)
- idArr = append(idArr, fmt.Sprintf("%d", d.Id))
- params = append(params, d.Value)
- }
- result, err := mysql.Exec(fmt.Sprintf("UPDATE %s SET value = CASE id %s END WHERE id in (' %s ')", table, query, strings.Join(idArr, "','")), params...)
- if err != nil {
- log.Println(err)
- return -1
- }
- count, err := result.RowsAffected()
- if err != nil {
- log.Println(err)
- return -1
- }
- return count
- }
- return 0
- }
|