taskEs.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/cobra"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "sync"
  9. )
  10. func dealerEs() *cobra.Command {
  11. cmdClient := &cobra.Command{
  12. Use: "dealer_es",
  13. Short: "Start processing dealer_es data",
  14. Run: func(cmd *cobra.Command, args []string) {
  15. InitEs()
  16. go SaveEs("medical_dealer_v1", "medical_dealer")
  17. taskDealerEs()
  18. },
  19. }
  20. return cmdClient
  21. }
  22. func taskDealerEs() {
  23. pool := make(chan bool, 5) //控制线程数
  24. wg := &sync.WaitGroup{}
  25. finalId := 0
  26. lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_dealer_baseinfo"))
  27. if len(*lastInfo) > 0 {
  28. finalId = util.IntAll((*lastInfo)[0]["id"])
  29. }
  30. log.Info("taskDealerEs---", zap.Int("finally id", finalId))
  31. lastid, count := 0, 0
  32. for {
  33. util.Debug("重新查询,lastid---", lastid)
  34. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_dealer_baseinfo", lastid)
  35. rows, err := MysqlM.DB.Query(q)
  36. if err != nil {
  37. log.Error("taskDealerEs---", zap.Error(err))
  38. }
  39. columns, err := rows.Columns()
  40. if finalId == lastid {
  41. util.Debug("----finish----------", count)
  42. break
  43. }
  44. for rows.Next() {
  45. scanArgs := make([]interface{}, len(columns))
  46. values := make([]interface{}, len(columns))
  47. ret := make(map[string]interface{})
  48. for k := range values {
  49. scanArgs[k] = &values[k]
  50. }
  51. err = rows.Scan(scanArgs...)
  52. if err != nil {
  53. log.Error("taskDealerEs---", zap.Error(err))
  54. break
  55. }
  56. for i, col := range values {
  57. if v, ok := col.([]uint8); ok {
  58. ret[columns[i]] = string(v)
  59. } else {
  60. ret[columns[i]] = col
  61. }
  62. }
  63. lastid = util.IntAll(ret["id"])
  64. count++
  65. if count%500 == 0 {
  66. util.Debug("current-------", count, lastid)
  67. }
  68. pool <- true
  69. wg.Add(1)
  70. go func(tmp map[string]interface{}) {
  71. defer func() {
  72. <-pool
  73. wg.Done()
  74. }()
  75. saveM := make(map[string]interface{})
  76. for _, f := range []string{"id", "name_id", "company_id", "dealer_name", "area_code", "city_code", "district_code"} {
  77. if tmp[f] != nil {
  78. saveM[f] = tmp[f]
  79. }
  80. }
  81. EsSaveCache <- saveM
  82. }(ret)
  83. ret = make(map[string]interface{})
  84. }
  85. _ = rows.Close()
  86. wg.Wait()
  87. }
  88. }