task.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package main
  2. import (
  3. util "app.yhyue.com/moapp/jybase/common"
  4. elastic "app.yhyue.com/moapp/jybase/esv7"
  5. "fmt"
  6. "github.com/spf13/cobra"
  7. "log"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. saveEs []map[string]interface{}
  13. SaveEsLock = &sync.Mutex{}
  14. esIndex = "clue_info"
  15. )
  16. // @Author jianghan
  17. // @Description 全量数据
  18. // @Date 2024/8/2
  19. func allData() *cobra.Command {
  20. cmdClient := &cobra.Command{
  21. Use: "all",
  22. Short: "Start processing full data",
  23. Run: func(cmd *cobra.Command, args []string) {
  24. esIndex = "clue_info"
  25. taskAll("jianyu_subjectdb.dwd_f_crm_clue_info")
  26. },
  27. }
  28. return cmdClient
  29. }
  30. // @Author jianghan
  31. // @Description 全量数据(测试库)
  32. // @Date 2024/8/2
  33. func allTestData() *cobra.Command {
  34. cmdClient := &cobra.Command{
  35. Use: "all-test",
  36. Short: "Start processing test full data",
  37. Run: func(cmd *cobra.Command, args []string) {
  38. esIndex = "clue_info_test"
  39. taskAll("jianyu_subjectdb_test.dwd_f_crm_clue_info")
  40. },
  41. }
  42. return cmdClient
  43. }
  44. func taskAll(coll string) {
  45. finalId := 0
  46. lastInfo := Tidb.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC", coll))
  47. if len(*lastInfo) > 0 {
  48. finalId = util.IntAll((*lastInfo)[0]["id"])
  49. }
  50. log.Println("查询最后id---finalId---", finalId)
  51. lastid, count := 0, 0
  52. for {
  53. log.Println("重新查询,lastid---", lastid)
  54. q := fmt.Sprintf("SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid)
  55. rows, err := Tidb.DB.Query(q)
  56. if err != nil {
  57. log.Println("mysql query err ", err)
  58. }
  59. columns, err := rows.Columns()
  60. if finalId == lastid {
  61. log.Println("----finish----- count: ", count)
  62. break
  63. }
  64. for rows.Next() {
  65. scanArgs := make([]interface{}, len(columns))
  66. values := make([]interface{}, len(columns))
  67. ret := make(map[string]interface{})
  68. for k := range values {
  69. scanArgs[k] = &values[k]
  70. }
  71. err = rows.Scan(scanArgs...)
  72. if err != nil {
  73. log.Println("mysql scan err ", err)
  74. break
  75. }
  76. for i, col := range values {
  77. if v, ok := col.([]uint8); ok {
  78. ret[columns[i]] = string(v)
  79. } else {
  80. ret[columns[i]] = col
  81. }
  82. }
  83. lastid = util.IntAll(ret["id"])
  84. count++
  85. if count%2000 == 0 {
  86. log.Println(fmt.Sprintf("current----, count: %d, lastid: %d", count, lastid))
  87. }
  88. taskinfo(ret)
  89. }
  90. _ = rows.Close()
  91. }
  92. if len(saveEs) > 0 {
  93. elastic.BulkSave(esIndex, "", &saveEs, false)
  94. saveEs = []map[string]interface{}{}
  95. }
  96. }
  97. func taskinfo(tmp map[string]interface{}) {
  98. save := make(map[string]interface{})
  99. for k, v := range tmp {
  100. if v == nil {
  101. continue
  102. }
  103. if k == "id" {
  104. save["id"] = util.ObjToString(tmp["id"])
  105. save["_id"] = util.ObjToString(tmp["id"])
  106. } else if k == "comeintime" || k == "createtime" || k == "updatetime" {
  107. t, _ := time.Parse(time.DateTime, util.ObjToString(tmp[k]))
  108. save[k] = t.Unix()
  109. } else {
  110. save[k] = tmp[k]
  111. }
  112. }
  113. if len(save) > 0 {
  114. saveEs = append(saveEs, save)
  115. }
  116. if len(saveEs) >= 200 {
  117. tmps := saveEs
  118. elastic.BulkSave(esIndex, "", &tmps, false)
  119. saveEs = []map[string]interface{}{}
  120. }
  121. }