main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "mongodb"
  4. "time"
  5. )
  6. var (
  7. MongoTool *mongodb.MongodbSim
  8. Es *Elastic
  9. updatePool chan []map[string]interface{}
  10. updateSp chan bool
  11. updatePool1 chan []map[string]interface{}
  12. updateSp1 chan bool
  13. saveSize int
  14. savePool chan map[string]interface{}
  15. saveSp chan bool
  16. savePool1 chan map[string]interface{}
  17. saveSp1 chan bool
  18. )
  19. func init() {
  20. MongoTool = &mongodb.MongodbSim{
  21. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", // 172.17.4.187:27082,172.17.145.163:27083
  22. Size: 10,
  23. DbName: "mixdata",
  24. UserName: "SJZY_RWESBid_Other",
  25. Password: "SJZY@O17t8herB3B",
  26. }
  27. MongoTool.InitPool()
  28. Es = &Elastic{
  29. S_esurl: "http://172.17.145.164:19805", //http://172.17.4.184:19800
  30. I_size: 10,
  31. Username: "es_all",
  32. Password: "TopJkO2E_d1x",
  33. }
  34. Es.InitElasticSize()
  35. saveSize = 200
  36. updatePool = make(chan []map[string]interface{}, 5000)
  37. updateSp = make(chan bool, 5)
  38. updatePool1 = make(chan []map[string]interface{}, 5000)
  39. updateSp1 = make(chan bool, 5)
  40. savePool = make(chan map[string]interface{}, 5000)
  41. saveSp = make(chan bool, 5)
  42. savePool1 = make(chan map[string]interface{}, 5000)
  43. saveSp1 = make(chan bool, 5)
  44. }
  45. func main() {
  46. go saveMethod()
  47. go saveMethod1()
  48. go updateMethod()
  49. //go updateMethod1()
  50. //go findEs()
  51. go fcResult()
  52. //go TimeTask()
  53. ch := make(chan bool, 1)
  54. <-ch
  55. }
  56. func saveMethod() {
  57. arru := make([]map[string]interface{}, saveSize)
  58. indexu := 0
  59. for {
  60. select {
  61. case v := <-savePool:
  62. arru[indexu] = v
  63. indexu++
  64. if indexu == saveSize {
  65. saveSp <- true
  66. go func(arru []map[string]interface{}) {
  67. defer func() {
  68. <-saveSp
  69. }()
  70. MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
  71. }(arru)
  72. arru = make([]map[string]interface{}, saveSize)
  73. indexu = 0
  74. }
  75. case <-time.After(1000 * time.Millisecond):
  76. if indexu > 0 {
  77. saveSp <- true
  78. go func(arru []map[string]interface{}) {
  79. defer func() {
  80. <-saveSp
  81. }()
  82. MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
  83. }(arru[:indexu])
  84. arru = make([]map[string]interface{}, saveSize)
  85. indexu = 0
  86. }
  87. }
  88. }
  89. }
  90. func saveMethod1() {
  91. arru := make([]map[string]interface{}, saveSize)
  92. indexu := 0
  93. for {
  94. select {
  95. case v := <-savePool1:
  96. arru[indexu] = v
  97. indexu++
  98. if indexu == saveSize {
  99. saveSp1 <- true
  100. go func(arru []map[string]interface{}) {
  101. defer func() {
  102. <-saveSp1
  103. }()
  104. MongoTool.SaveBulk("project_forecast_1", arru...)
  105. }(arru)
  106. arru = make([]map[string]interface{}, saveSize)
  107. indexu = 0
  108. }
  109. case <-time.After(1000 * time.Millisecond):
  110. if indexu > 0 {
  111. saveSp1 <- true
  112. go func(arru []map[string]interface{}) {
  113. defer func() {
  114. <-saveSp1
  115. }()
  116. MongoTool.SaveBulk("project_forecast_1", arru...)
  117. }(arru[:indexu])
  118. arru = make([]map[string]interface{}, saveSize)
  119. indexu = 0
  120. }
  121. }
  122. }
  123. }
  124. func updateMethod() {
  125. arru := make([][]map[string]interface{}, saveSize)
  126. indexu := 0
  127. for {
  128. select {
  129. case v := <-updatePool:
  130. arru[indexu] = v
  131. indexu++
  132. if indexu == saveSize {
  133. updateSp <- true
  134. go func(arru [][]map[string]interface{}) {
  135. defer func() {
  136. <-updateSp
  137. }()
  138. MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
  139. }(arru)
  140. arru = make([][]map[string]interface{}, saveSize)
  141. indexu = 0
  142. }
  143. case <-time.After(1000 * time.Millisecond):
  144. if indexu > 0 {
  145. updateSp <- true
  146. go func(arru [][]map[string]interface{}) {
  147. defer func() {
  148. <-updateSp
  149. }()
  150. MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
  151. }(arru[:indexu])
  152. arru = make([][]map[string]interface{}, saveSize)
  153. indexu = 0
  154. }
  155. }
  156. }
  157. }
  158. func updateMethod1() {
  159. arru := make([][]map[string]interface{}, saveSize)
  160. indexu := 0
  161. for {
  162. select {
  163. case v := <-updatePool1:
  164. arru[indexu] = v
  165. indexu++
  166. if indexu == saveSize {
  167. updateSp1 <- true
  168. go func(arru [][]map[string]interface{}) {
  169. defer func() {
  170. <-updateSp1
  171. }()
  172. MongoTool.UpSertBulk("project_forecast", arru...)
  173. }(arru)
  174. arru = make([][]map[string]interface{}, saveSize)
  175. indexu = 0
  176. }
  177. case <-time.After(1000 * time.Millisecond):
  178. if indexu > 0 {
  179. updateSp1 <- true
  180. go func(arru [][]map[string]interface{}) {
  181. defer func() {
  182. <-updateSp1
  183. }()
  184. MongoTool.UpSertBulk("project_forecast", arru...)
  185. }(arru[:indexu])
  186. arru = make([][]map[string]interface{}, saveSize)
  187. indexu = 0
  188. }
  189. }
  190. }
  191. }