123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package main
- import (
- "fmt"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "strings"
- "sync"
- )
- func taskRepair() {
- pool := make(chan bool, 2) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC", "global_common_data.dws_f_bpmc_relation"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("查询最后id---", zap.Int("finally id: ", finalId))
- lastid, count := finalId, 0
- for {
- log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
- q := fmt.Sprintf("SELECT * FROM %s WHERE id < %d ORDER BY id DESC limit 1000000", "global_common_data.dws_f_bpmc_relation", lastid)
- rows, err := MysqlTool.DB.Query(q)
- if err != nil {
- log.Error("mysql query err ", zap.Error(err))
- }
- columns, err := rows.Columns()
- //if finalId == lastid {
- // log.Info("----finish-----", zap.Int("count: ", count))
- // break
- //}
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("mysql scan err ", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%2000 == 0 {
- log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- task1(tmp)
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func task1(tmp map[string]interface{}) {
- sql := `SELECT count(*) FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ?`
- sql1 := `SELECT id FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ? ORDER BY id`
- sql2 := `DELETE FROM global_common_data.dws_f_bpmc_relation WHERE id IN (%s)`
- c := MysqlTool.QueryCount(sql, tmp["infoid"], tmp["projectid"], tmp["name_id"])
- if c > 1 {
- info := MysqlTool.Query(sql1, tmp["infoid"], tmp["projectid"], tmp["name_id"])
- if info != nil && len(*info) > 0 {
- var ids []int64
- for i, v := range *info {
- if i != 0 {
- ids = append(ids, util.Int64All(v["id"]))
- }
- }
- if len(ids) > 0 {
- str1, arr1 := WhArgs(ids)
- log.Info("del---", zap.Any("ids---", ids))
- MysqlTool.ExecBySql(fmt.Sprintf(sql2, str1), arr1...)
- }
- }
- }
- }
- func WhArgs(args []int64) (string, []interface{}) {
- newArgs := make([]interface{}, len(args))
- wh := make([]string, len(args))
- for k, v := range args {
- newArgs[k] = v
- wh[k] = "?"
- }
- return strings.Join(wh, ","), newArgs
- }
|