task.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package main
  2. import (
  3. "fmt"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "strings"
  8. "sync"
  9. )
  10. func taskRepair() {
  11. pool := make(chan bool, 2) //控制线程数
  12. wg := &sync.WaitGroup{}
  13. finalId := 0
  14. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC", "global_common_data.dws_f_bpmc_relation"))
  15. if len(*lastInfo) > 0 {
  16. finalId = util.IntAll((*lastInfo)[0]["id"])
  17. }
  18. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  19. lastid, count := finalId, 0
  20. for {
  21. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  22. q := fmt.Sprintf("SELECT * FROM %s WHERE id < %d ORDER BY id DESC limit 1000000", "global_common_data.dws_f_bpmc_relation", lastid)
  23. rows, err := MysqlTool.DB.Query(q)
  24. if err != nil {
  25. log.Error("mysql query err ", zap.Error(err))
  26. }
  27. columns, err := rows.Columns()
  28. //if finalId == lastid {
  29. // log.Info("----finish-----", zap.Int("count: ", count))
  30. // break
  31. //}
  32. for rows.Next() {
  33. scanArgs := make([]interface{}, len(columns))
  34. values := make([]interface{}, len(columns))
  35. ret := make(map[string]interface{})
  36. for k := range values {
  37. scanArgs[k] = &values[k]
  38. }
  39. err = rows.Scan(scanArgs...)
  40. if err != nil {
  41. log.Error("mysql scan err ", zap.Error(err))
  42. break
  43. }
  44. for i, col := range values {
  45. if v, ok := col.([]uint8); ok {
  46. ret[columns[i]] = string(v)
  47. } else {
  48. ret[columns[i]] = col
  49. }
  50. }
  51. lastid = util.IntAll(ret["id"])
  52. count++
  53. if count%2000 == 0 {
  54. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  55. }
  56. pool <- true
  57. wg.Add(1)
  58. go func(tmp map[string]interface{}) {
  59. defer func() {
  60. <-pool
  61. wg.Done()
  62. }()
  63. task1(tmp)
  64. }(ret)
  65. ret = make(map[string]interface{})
  66. }
  67. _ = rows.Close()
  68. wg.Wait()
  69. }
  70. }
  71. func task1(tmp map[string]interface{}) {
  72. sql := `SELECT count(*) FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ?`
  73. sql1 := `SELECT id FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ? ORDER BY id`
  74. sql2 := `DELETE FROM global_common_data.dws_f_bpmc_relation WHERE id IN (%s)`
  75. c := MysqlTool.QueryCount(sql, tmp["infoid"], tmp["projectid"], tmp["name_id"])
  76. if c > 1 {
  77. info := MysqlTool.Query(sql1, tmp["infoid"], tmp["projectid"], tmp["name_id"])
  78. if info != nil && len(*info) > 0 {
  79. var ids []int64
  80. for i, v := range *info {
  81. if i != 0 {
  82. ids = append(ids, util.Int64All(v["id"]))
  83. }
  84. }
  85. if len(ids) > 0 {
  86. str1, arr1 := WhArgs(ids)
  87. log.Info("del---", zap.Any("ids---", ids))
  88. MysqlTool.ExecBySql(fmt.Sprintf(sql2, str1), arr1...)
  89. }
  90. }
  91. }
  92. }
  93. func WhArgs(args []int64) (string, []interface{}) {
  94. newArgs := make([]interface{}, len(args))
  95. wh := make([]string, len(args))
  96. for k, v := range args {
  97. newArgs[k] = v
  98. wh[k] = "?"
  99. }
  100. return strings.Join(wh, ","), newArgs
  101. }