main.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package main
  2. import (
  3. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  5. "time"
  6. )
  7. var (
  8. MongoTool *mongodb.MongodbSim
  9. Es *elastic.Elastic
  10. updatePool chan []map[string]interface{}
  11. updateSp chan bool
  12. updatePool1 chan []map[string]interface{}
  13. updateSp1 chan bool
  14. saveSize int
  15. savePool chan map[string]interface{}
  16. saveSp chan bool
  17. )
  18. func init() {
  19. MongoTool = &mongodb.MongodbSim{
  20. MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
  21. Size: 10,
  22. DbName: "mixdata",
  23. UserName: "SJZY_RWESBid_Other",
  24. Password: "SJZY@O17t8herB3B",
  25. }
  26. MongoTool.InitPool()
  27. Es = &elastic.Elastic{
  28. S_esurl: "http://172.17.162.27:19908", //http://172.17.4.184:19800
  29. I_size: 10,
  30. Username: "dataGr_appli",
  31. Password: "L2ds90Ha4e5#",
  32. }
  33. Es.InitElasticSize()
  34. saveSize = 200
  35. savePool = make(chan map[string]interface{}, 5000)
  36. saveSp = make(chan bool, 5)
  37. }
  38. func main() {
  39. go saveMethod()
  40. go updateMethod()
  41. //go findEs()
  42. go TimeTask()
  43. ch := make(chan bool, 1)
  44. <-ch
  45. }
  46. func saveMethod() {
  47. arru := make([]map[string]interface{}, saveSize)
  48. indexu := 0
  49. for {
  50. select {
  51. case v := <-savePool:
  52. arru[indexu] = v
  53. indexu++
  54. if indexu == saveSize {
  55. saveSp <- true
  56. go func(arru []map[string]interface{}) {
  57. defer func() {
  58. <-saveSp
  59. }()
  60. MongoTool.SaveBulk("project_forecast", arru...)
  61. }(arru)
  62. arru = make([]map[string]interface{}, saveSize)
  63. indexu = 0
  64. }
  65. case <-time.After(1000 * time.Millisecond):
  66. if indexu > 0 {
  67. saveSp <- true
  68. go func(arru []map[string]interface{}) {
  69. defer func() {
  70. <-saveSp
  71. }()
  72. MongoTool.SaveBulk("project_forecast", arru...)
  73. }(arru[:indexu])
  74. arru = make([]map[string]interface{}, saveSize)
  75. indexu = 0
  76. }
  77. }
  78. }
  79. }
  80. func updateMethod() {
  81. arru := make([][]map[string]interface{}, saveSize)
  82. indexu := 0
  83. for {
  84. select {
  85. case v := <-updatePool1:
  86. arru[indexu] = v
  87. indexu++
  88. if indexu == saveSize {
  89. updateSp1 <- true
  90. go func(arru [][]map[string]interface{}) {
  91. defer func() {
  92. <-updateSp1
  93. }()
  94. MongoTool.UpSertBulk("project_forecast", arru...)
  95. }(arru)
  96. arru = make([][]map[string]interface{}, saveSize)
  97. indexu = 0
  98. }
  99. case <-time.After(1000 * time.Millisecond):
  100. if indexu > 0 {
  101. updateSp1 <- true
  102. go func(arru [][]map[string]interface{}) {
  103. defer func() {
  104. <-updateSp1
  105. }()
  106. MongoTool.UpSertBulk("project_forecast", arru...)
  107. }(arru[:indexu])
  108. arru = make([][]map[string]interface{}, saveSize)
  109. indexu = 0
  110. }
  111. }
  112. }
  113. }