main.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import (
  3. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  5. "log"
  6. "time"
  7. )
  8. var (
  9. MgoB *mongodb.MongodbSim
  10. MgoQy *mongodb.MongodbSim
  11. MgoP *mongodb.MongodbSim
  12. Es *es.Elastic
  13. updatePool = make(chan []map[string]interface{}, 5000)
  14. updateEsPool = make(chan []map[string]interface{}, 5000)
  15. updateEsSp = make(chan bool, 5) //保存协程
  16. )
  17. func InitMgo() {
  18. //MgoB = &mongodb.MongodbSim{
  19. // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  20. // //MongodbAddr: "127.0.0.1:27083",
  21. // Size: 10,
  22. // DbName: "qfw",
  23. // UserName: "SJZY_RWbid_ES",a
  24. // Password: "SJZY@B4i4D5e6S",
  25. // //Direct: true,
  26. //}
  27. //MgoB.InitPool()
  28. MgoQy = &mongodb.MongodbSim{
  29. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  30. //MongodbAddr: "127.0.0.1:27083",
  31. Size: 10,
  32. DbName: "mixdata",
  33. UserName: "SJZY_RWbid_ES",
  34. Password: "SJZY@B4i4D5e6S",
  35. //Direct: true,
  36. }
  37. MgoQy.InitPool()
  38. MgoP = &mongodb.MongodbSim{
  39. //MongodbAddr: "127.0.0.1:27080",
  40. MongodbAddr: "172.17.4.85:27080",
  41. DbName: "qfw",
  42. Size: 10,
  43. //Direct: true,
  44. }
  45. MgoP.InitPool()
  46. // 本地数据库
  47. //MgoB = &mongodb.MongodbSim{
  48. // //MongodbAddr: "172.17.189.140:27080",
  49. // MongodbAddr: "127.0.0.1:27017",
  50. // Size: 10,
  51. // DbName: "wcc",
  52. // //UserName: "SJZY_RWbid_ES",
  53. // //Password: "SJZY@B4i4D5e6S",
  54. // //Direct: true,
  55. //}
  56. //MgoB.InitPool()
  57. // 测试环境
  58. //MgoB = &mongodb.MongodbSim{
  59. // MongodbAddr: "192.168.3.206:27002",
  60. // //MongodbAddr: "127.0.0.1:27017",
  61. // Size: 10,
  62. // DbName: "qfw_data",
  63. // UserName: "root",
  64. // Password: "root",
  65. // //Direct: true,
  66. //}
  67. //MgoB.InitPool()
  68. }
  69. func InitEs() {
  70. Es = &es.Elastic{
  71. //S_esurl: "http://127.0.0.1:19908",
  72. S_esurl: "http://172.17.4.184:19908",
  73. I_size: 5,
  74. Username: "jybid",
  75. Password: "Top2023_JEB01i@31",
  76. }
  77. Es.InitElasticSize()
  78. }
  79. func main() {
  80. InitMgo()
  81. InitEs()
  82. go updateEsMethod()
  83. fixQyxy()
  84. log.Println("11111111111")
  85. select {}
  86. }
  87. // updateEsMethod 更新es
  88. func updateEsMethod() {
  89. arru := make([][]map[string]interface{}, 200)
  90. indexu := 0
  91. for {
  92. select {
  93. case v := <-updateEsPool:
  94. arru[indexu] = v
  95. indexu++
  96. if indexu == 200 {
  97. updateEsSp <- true
  98. go func(arru [][]map[string]interface{}) {
  99. defer func() {
  100. <-updateEsSp
  101. }()
  102. Es.UpdateBulk("projectset", arru...)
  103. }(arru)
  104. arru = make([][]map[string]interface{}, 200)
  105. indexu = 0
  106. }
  107. case <-time.After(1000 * time.Millisecond):
  108. if indexu > 0 {
  109. updateEsSp <- true
  110. go func(arru [][]map[string]interface{}) {
  111. defer func() {
  112. <-updateEsSp
  113. }()
  114. Es.UpdateBulk("projectset", arru...)
  115. }(arru[:indexu])
  116. arru = make([][]map[string]interface{}, 200)
  117. indexu = 0
  118. }
  119. }
  120. }
  121. }