buyertask.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package main
  2. import (
  3. "app.yhyue.com/data_processing/common_utils/log"
  4. "esindex/config"
  5. "fmt"
  6. "go.uber.org/zap"
  7. "time"
  8. )
  9. //buyerOnce 处理增量数据
  10. func buyerOnce() {
  11. rowsPerPage := 1000
  12. currentPage := 1
  13. total := 0
  14. for {
  15. log.Info("buyerOnce", zap.Int("currentPage", currentPage))
  16. arrEs := make([]map[string]interface{}, 0)
  17. offset := (currentPage - 1) * rowsPerPage
  18. //year, month, day := 2022, time.October, 01
  19. //now := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
  20. now := time.Now()
  21. curTime := now.Format("2006-01-02")
  22. query := fmt.Sprintf(`
  23. SELECT
  24. b.name,
  25. t.id,
  26. t.name_id,
  27. t.createtime,
  28. t.updatetime,
  29. c.area,
  30. c.city,
  31. class.name AS buyerclass
  32. FROM
  33. dws_f_ent_tags AS t
  34. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  35. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  36. LEFT JOIN code_area AS c ON b.city_code = c.code
  37. WHERE t.createtime > '%v' OR t.updatetime > '%v'
  38. ORDER BY t.createtime ASC
  39. LIMIT %d, %d;
  40. `, curTime, curTime, offset, rowsPerPage)
  41. result := MysqlB.SelectBySql(query)
  42. if result == nil {
  43. break
  44. }
  45. if len(*result) > 0 {
  46. for _, re := range *result {
  47. tmp := make(map[string]interface{}, 0)
  48. tmp["name"] = re["name"]
  49. tmp["buyer_name"] = re["name"]
  50. tmp["id"] = re["name_id"]
  51. tmp["province"] = re["area"]
  52. tmp["city"] = re["city"]
  53. tmp["buyerclass"] = re["buyerclass"]
  54. if re["createtime"] != nil {
  55. if createtime, ok := re["createtime"].(time.Time); ok {
  56. tmp["createtime"] = createtime.Unix()
  57. if re["updatetime"] != nil {
  58. if updatetime, ok := re["updatetime"].(time.Time); ok {
  59. tmp["updatetime"] = updatetime.Unix()
  60. }
  61. } else {
  62. tmp["updatetime"] = createtime.Unix()
  63. }
  64. }
  65. }
  66. sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
  67. counts := MysqlB.SelectBySql(sql)
  68. if len(*counts) > 0 {
  69. tmp["is_contact"] = true
  70. } else {
  71. tmp["is_contact"] = false
  72. }
  73. arrEs = append(arrEs, tmp)
  74. }
  75. total = total + len(arrEs)
  76. err := Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  77. if err != nil {
  78. log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
  79. }
  80. }
  81. if len(arrEs) < rowsPerPage {
  82. break
  83. }
  84. // 继续查询下一页
  85. currentPage++
  86. }
  87. log.Info("buyerOnce", zap.Int("结束,总数是:", total))
  88. }
  89. //buyerall 全量数据
  90. func buyerall() {
  91. rowsPerPage := 5000
  92. currentPage := 1
  93. total := 0
  94. for {
  95. log.Info("buyerall", zap.Int("currentPage", currentPage))
  96. arrEs := make([]map[string]interface{}, 0)
  97. offset := (currentPage - 1) * rowsPerPage
  98. query := fmt.Sprintf(`
  99. SELECT
  100. b.name,
  101. t.id,
  102. t.name_id,
  103. t.createtime,
  104. t.updatetime,
  105. c.area,
  106. c.city,
  107. class.name AS buyerclass
  108. FROM
  109. dws_f_ent_tags AS t
  110. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  111. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  112. LEFT JOIN code_area AS c ON b.city_code = c.code
  113. ORDER BY t.createtime ASC
  114. LIMIT %d, %d;
  115. `, offset, rowsPerPage)
  116. result := MysqlB.SelectBySql(query)
  117. if result == nil {
  118. break
  119. }
  120. if len(*result) > 0 {
  121. for _, re := range *result {
  122. tmp := make(map[string]interface{}, 0)
  123. tmp["name"] = re["name"]
  124. tmp["buyer_name"] = re["name"]
  125. tmp["id"] = re["name_id"]
  126. tmp["_id"] = re["name_id"]
  127. tmp["province"] = re["area"]
  128. tmp["city"] = re["city"]
  129. tmp["buyerclass"] = re["buyerclass"]
  130. if re["createtime"] != nil {
  131. if createtime, ok := re["createtime"].(time.Time); ok {
  132. tmp["createtime"] = createtime.Unix()
  133. if re["updatetime"] != nil {
  134. if updatetime, ok := re["updatetime"].(time.Time); ok {
  135. tmp["updatetime"] = updatetime.Unix()
  136. }
  137. } else {
  138. tmp["updatetime"] = createtime.Unix()
  139. }
  140. }
  141. }
  142. sql := fmt.Sprintf(`select id from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
  143. counts := MysqlB.SelectBySql(sql)
  144. if len(*counts) > 0 {
  145. tmp["is_contact"] = true
  146. } else {
  147. tmp["is_contact"] = false
  148. }
  149. arrEs = append(arrEs, tmp)
  150. }
  151. total = total + len(arrEs)
  152. //保存es
  153. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, arrEs)
  154. }
  155. // 如果本次查询返回的数据不足每页请求的数量,说明已经查询到最后一页
  156. if len(*result) < rowsPerPage {
  157. break
  158. }
  159. // 继续查询下一页
  160. currentPage++
  161. }
  162. log.Info("buyerall", zap.Int("结束,总数是:", total))
  163. }