main.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package main
  2. import (
  3. "buyer_data/config"
  4. "fmt"
  5. "github.com/spf13/cobra"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  7. "time"
  8. )
  9. func main() {
  10. rootCmd := &cobra.Command{Use: "my cmd"}
  11. rootCmd.AddCommand(buyerEnt()) // buyer_enterprise
  12. rootCmd.AddCommand(buyerErr()) // buyer_err
  13. rootCmd.AddCommand(buyerTidb()) // dws_f_ent_baseinfo
  14. rootCmd.AddCommand(deduplication()) // 去重
  15. rootCmd.AddCommand(buyerPy()) // buyer_detail_1019
  16. rootCmd.AddCommand(buyerTask()) // reliability不存在的数据
  17. if err := rootCmd.Execute(); err != nil {
  18. fmt.Println("rootCmd.Execute failed", err.Error())
  19. }
  20. c := make(chan bool, 1)
  21. <-c
  22. }
  23. func buyerEnt() *cobra.Command {
  24. cmdClient := &cobra.Command{
  25. Use: "buyer-mongo",
  26. Short: "Start dispose buyer data",
  27. Run: func(cmd *cobra.Command, args []string) {
  28. go SaveMethod()
  29. taskInfo1()
  30. },
  31. }
  32. return cmdClient
  33. }
  34. func buyerErr() *cobra.Command {
  35. cmdClient := &cobra.Command{
  36. Use: "buyer-err",
  37. Short: "Start dispose buyer data",
  38. Run: func(cmd *cobra.Command, args []string) {
  39. go SaveMethod()
  40. taskInfo2()
  41. },
  42. }
  43. return cmdClient
  44. }
  45. func buyerTidb() *cobra.Command {
  46. cmdClient := &cobra.Command{
  47. Use: "buyer-tidb",
  48. Short: "Start dispose buyer-tidb data",
  49. Run: func(cmd *cobra.Command, args []string) {
  50. InitMysql()
  51. go SaveMethod()
  52. go updateMethod()
  53. taskMysql()
  54. },
  55. }
  56. return cmdClient
  57. }
  58. func deduplication() *cobra.Command {
  59. cmdClient := &cobra.Command{
  60. Use: "buyer-dep",
  61. Short: "Start deduplication data",
  62. Run: func(cmd *cobra.Command, args []string) {
  63. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Code, config.Conf.DB.Redis.Addr), 8)
  64. taskInfo5()
  65. },
  66. }
  67. return cmdClient
  68. }
  69. func buyerPy() *cobra.Command {
  70. cmdClient := &cobra.Command{
  71. Use: "buyer-py",
  72. Short: "Start dispose buyer-python data",
  73. Run: func(cmd *cobra.Command, args []string) {
  74. go SaveMethod()
  75. go updateMethod()
  76. taskInfo4()
  77. },
  78. }
  79. return cmdClient
  80. }
  81. func buyerTask() *cobra.Command {
  82. cmdClient := &cobra.Command{
  83. Use: "buyer-err",
  84. Short: "Start dispose buyer data",
  85. Run: func(cmd *cobra.Command, args []string) {
  86. go updateMethod()
  87. taskInfo3()
  88. },
  89. }
  90. return cmdClient
  91. }
  92. func SaveMethod() {
  93. arru := make([]map[string]interface{}, saveSize)
  94. indexu := 0
  95. for {
  96. select {
  97. case v := <-savePool:
  98. arru[indexu] = v
  99. indexu++
  100. if indexu == saveSize {
  101. saveSp <- true
  102. go func(arru []map[string]interface{}) {
  103. defer func() {
  104. <-saveSp
  105. }()
  106. MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...)
  107. }(arru)
  108. arru = make([]map[string]interface{}, saveSize)
  109. indexu = 0
  110. }
  111. case <-time.After(1000 * time.Millisecond):
  112. if indexu > 0 {
  113. saveSp <- true
  114. go func(arru []map[string]interface{}) {
  115. defer func() {
  116. <-saveSp
  117. }()
  118. MongoTool.SaveBulk(config.Conf.DB.Mongo.SaveColl, arru...)
  119. }(arru[:indexu])
  120. arru = make([]map[string]interface{}, saveSize)
  121. indexu = 0
  122. }
  123. }
  124. }
  125. }
  126. func updateMethod() {
  127. arru := make([][]map[string]interface{}, saveSize)
  128. indexu := 0
  129. for {
  130. select {
  131. case v := <-updatePool:
  132. arru[indexu] = v
  133. indexu++
  134. if indexu == saveSize {
  135. updateSp <- true
  136. go func(arru [][]map[string]interface{}) {
  137. defer func() {
  138. <-updateSp
  139. }()
  140. MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...)
  141. }(arru)
  142. arru = make([][]map[string]interface{}, saveSize)
  143. indexu = 0
  144. }
  145. case <-time.After(1 * time.Second):
  146. if indexu > 0 {
  147. updateSp <- true
  148. go func(arru [][]map[string]interface{}) {
  149. defer func() {
  150. <-updateSp
  151. }()
  152. MongoTool.UpSertBulk(config.Conf.DB.Mongo.SaveColl, arru...)
  153. }(arru[:indexu])
  154. arru = make([][]map[string]interface{}, saveSize)
  155. indexu = 0
  156. }
  157. }
  158. }
  159. }