main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "data_clear_sync/config"
  4. "flag"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  6. "log"
  7. "time"
  8. )
  9. var (
  10. V string
  11. Fields = []string{"area", "city", "projectname", "projectcode", "budget", "s_winner", "bidamount", "buyer"}
  12. updateEsPool = make(chan []map[string]interface{}, 5000)
  13. updateEsSp = make(chan bool, 1)
  14. updatePool = make(chan []map[string]interface{}, 5000)
  15. updateSp = make(chan bool, 5)
  16. updateRcPool = make(chan []map[string]interface{}, 5000)
  17. updateRcsp = make(chan bool, 5)
  18. )
  19. func init() {
  20. config.Init("./common.toml")
  21. InitLog()
  22. InitMgo()
  23. InitEs()
  24. redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
  25. }
  26. func main() {
  27. flag.StringVar(&V, "v", "", "version")
  28. flag.Parse()
  29. if V != "" {
  30. go updateFuc()
  31. go updateRcFuc()
  32. go updateEsFuc()
  33. if V == "v1" {
  34. taskinfoV1()
  35. } else if V == "v2" {
  36. taskinfoV2()
  37. }
  38. ch := make(chan bool, 1)
  39. <-ch
  40. } else {
  41. flag.PrintDefaults()
  42. log.Println("参数错误.")
  43. }
  44. }
  45. func updateFuc() {
  46. arru := make([][]map[string]interface{}, 500)
  47. indexu := 0
  48. for {
  49. select {
  50. case v := <-updatePool:
  51. arru[indexu] = v
  52. indexu++
  53. if indexu == 500 {
  54. updateSp <- true
  55. go func(arru [][]map[string]interface{}) {
  56. defer func() {
  57. <-updateSp
  58. }()
  59. Mongo.UpdateBulk("bidding", arru...)
  60. }(arru)
  61. arru = make([][]map[string]interface{}, 500)
  62. indexu = 0
  63. }
  64. case <-time.After(1000 * time.Millisecond):
  65. if indexu > 0 {
  66. updateSp <- true
  67. go func(arru [][]map[string]interface{}) {
  68. defer func() {
  69. <-updateSp
  70. }()
  71. Mongo.UpdateBulk("bidding", arru...)
  72. }(arru[:indexu])
  73. arru = make([][]map[string]interface{}, 500)
  74. indexu = 0
  75. }
  76. }
  77. }
  78. }
  79. func updateRcFuc() {
  80. arru := make([][]map[string]interface{}, 500)
  81. indexu := 0
  82. for {
  83. select {
  84. case v := <-updateRcPool:
  85. arru[indexu] = v
  86. indexu++
  87. if indexu == 500 {
  88. updateRcsp <- true
  89. go func(arru [][]map[string]interface{}) {
  90. defer func() {
  91. <-updateRcsp
  92. }()
  93. Mongo.UpSertBulk("bidding_modify_record", arru...)
  94. }(arru)
  95. arru = make([][]map[string]interface{}, 500)
  96. indexu = 0
  97. }
  98. case <-time.After(1000 * time.Millisecond):
  99. if indexu > 0 {
  100. updateRcsp <- true
  101. go func(arru [][]map[string]interface{}) {
  102. defer func() {
  103. <-updateSp
  104. }()
  105. Mongo.UpSertBulk("bidding_modify_record", arru...)
  106. }(arru[:indexu])
  107. arru = make([][]map[string]interface{}, 500)
  108. indexu = 0
  109. }
  110. }
  111. }
  112. }
  113. func updateEsFuc() {
  114. arru := make([][]map[string]interface{}, 200)
  115. indexu := 0
  116. for {
  117. select {
  118. case v := <-updateEsPool:
  119. arru[indexu] = v
  120. indexu++
  121. if indexu == 200 {
  122. updateEsSp <- true
  123. go func(arru [][]map[string]interface{}) {
  124. defer func() {
  125. <-updateEsSp
  126. }()
  127. Es.UpdateBulk("bidding", arru...)
  128. }(arru)
  129. arru = make([][]map[string]interface{}, 200)
  130. indexu = 0
  131. }
  132. case <-time.After(1000 * time.Millisecond):
  133. if indexu > 0 {
  134. updateEsSp <- true
  135. go func(arru [][]map[string]interface{}) {
  136. defer func() {
  137. <-updateEsSp
  138. }()
  139. Es.UpdateBulk("bidding", arru...)
  140. }(arru[:indexu])
  141. arru = make([][]map[string]interface{}, 200)
  142. indexu = 0
  143. }
  144. }
  145. }
  146. }