package main import ( "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "strings" "sync" ) func taskRepair() { pool := make(chan bool, 2) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC", "global_common_data.dws_f_bpmc_relation")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("查询最后id---", zap.Int("finally id: ", finalId)) lastid, count := finalId, 0 for { log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid)) q := fmt.Sprintf("SELECT * FROM %s WHERE id < %d ORDER BY id DESC limit 1000000", "global_common_data.dws_f_bpmc_relation", lastid) rows, err := MysqlTool.DB.Query(q) if err != nil { log.Error("mysql query err ", zap.Error(err)) } columns, err := rows.Columns() //if finalId == lastid { // log.Info("----finish-----", zap.Int("count: ", count)) // break //} for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("mysql scan err ", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() task1(tmp) }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func task1(tmp map[string]interface{}) { sql := `SELECT count(*) FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ?` sql1 := `SELECT id FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ? ORDER BY id` sql2 := `DELETE FROM global_common_data.dws_f_bpmc_relation WHERE id IN (%s)` c := MysqlTool.QueryCount(sql, tmp["infoid"], tmp["projectid"], tmp["name_id"]) if c > 1 { info := MysqlTool.Query(sql1, tmp["infoid"], tmp["projectid"], tmp["name_id"]) if info != nil && len(*info) > 0 { var ids []int64 for i, v := range *info { if i != 0 { ids = append(ids, util.Int64All(v["id"])) } } if len(ids) > 0 { str1, arr1 := WhArgs(ids) log.Info("del---", zap.Any("ids---", ids)) MysqlTool.ExecBySql(fmt.Sprintf(sql2, str1), arr1...) } } } } func WhArgs(args []int64) (string, []interface{}) { newArgs := make([]interface{}, len(args)) wh := make([]string, len(args)) for k, v := range args { newArgs[k] = v wh[k] = "?" } return strings.Join(wh, ","), newArgs }