main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package main
  2. import (
  3. "mongodb"
  4. es "qfw/util/elastic"
  5. "time"
  6. )
  7. var (
  8. MongoTool *mongodb.MongodbSim
  9. Es *es.Elastic
  10. updatePool chan []map[string]interface{}
  11. updateSp chan bool
  12. updatePool1 chan []map[string]interface{}
  13. updateSp1 chan bool
  14. saveSize int
  15. savePool chan map[string]interface{}
  16. saveSp chan bool
  17. savePool1 chan map[string]interface{}
  18. saveSp1 chan bool
  19. )
  20. func init() {
  21. MongoTool = &mongodb.MongodbSim{
  22. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", // 172.17.4.187:27082,172.17.145.163:27083
  23. Size: 10,
  24. DbName: "mixdata",
  25. UserName: "SJZY_RWESBid_Other",
  26. Password: "SJZY@O17t8herB3B",
  27. }
  28. MongoTool.InitPool()
  29. Es = &es.Elastic{
  30. S_esurl: "http://172.17.145.170:9800", //http://172.17.145.170:9800
  31. I_size: 10,
  32. }
  33. Es.InitElasticSize()
  34. saveSize = 200
  35. updatePool = make(chan []map[string]interface{}, 5000)
  36. updateSp = make(chan bool, 5)
  37. updatePool1 = make(chan []map[string]interface{}, 5000)
  38. updateSp1 = make(chan bool, 5)
  39. savePool = make(chan map[string]interface{}, 5000)
  40. saveSp = make(chan bool, 5)
  41. savePool1 = make(chan map[string]interface{}, 5000)
  42. saveSp1 = make(chan bool, 5)
  43. }
  44. func main() {
  45. go saveMethod()
  46. go saveMethod1()
  47. go updateMethod()
  48. //go updateMethod1()
  49. //go findEs()
  50. //go fcResult()
  51. go TimeTask()
  52. ch := make(chan bool, 1)
  53. <-ch
  54. }
  55. func saveMethod() {
  56. arru := make([]map[string]interface{}, saveSize)
  57. indexu := 0
  58. for {
  59. select {
  60. case v := <-savePool:
  61. arru[indexu] = v
  62. indexu++
  63. if indexu == saveSize {
  64. saveSp <- true
  65. go func(arru []map[string]interface{}) {
  66. defer func() {
  67. <-saveSp
  68. }()
  69. MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
  70. }(arru)
  71. arru = make([]map[string]interface{}, saveSize)
  72. indexu = 0
  73. }
  74. case <-time.After(1000 * time.Millisecond):
  75. if indexu > 0 {
  76. saveSp <- true
  77. go func(arru []map[string]interface{}) {
  78. defer func() {
  79. <-saveSp
  80. }()
  81. MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
  82. }(arru[:indexu])
  83. arru = make([]map[string]interface{}, saveSize)
  84. indexu = 0
  85. }
  86. }
  87. }
  88. }
  89. func saveMethod1() {
  90. arru := make([]map[string]interface{}, saveSize)
  91. indexu := 0
  92. for {
  93. select {
  94. case v := <-savePool1:
  95. arru[indexu] = v
  96. indexu++
  97. if indexu == saveSize {
  98. saveSp1 <- true
  99. go func(arru []map[string]interface{}) {
  100. defer func() {
  101. <-saveSp1
  102. }()
  103. MongoTool.SaveBulk("project_forecast", arru...)
  104. }(arru)
  105. arru = make([]map[string]interface{}, saveSize)
  106. indexu = 0
  107. }
  108. case <-time.After(1000 * time.Millisecond):
  109. if indexu > 0 {
  110. saveSp1 <- true
  111. go func(arru []map[string]interface{}) {
  112. defer func() {
  113. <-saveSp1
  114. }()
  115. MongoTool.SaveBulk("project_forecast", arru...)
  116. }(arru[:indexu])
  117. arru = make([]map[string]interface{}, saveSize)
  118. indexu = 0
  119. }
  120. }
  121. }
  122. }
  123. func updateMethod() {
  124. arru := make([][]map[string]interface{}, saveSize)
  125. indexu := 0
  126. for {
  127. select {
  128. case v := <-updatePool:
  129. arru[indexu] = v
  130. indexu++
  131. if indexu == saveSize {
  132. updateSp <- true
  133. go func(arru [][]map[string]interface{}) {
  134. defer func() {
  135. <-updateSp
  136. }()
  137. MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
  138. }(arru)
  139. arru = make([][]map[string]interface{}, saveSize)
  140. indexu = 0
  141. }
  142. case <-time.After(1000 * time.Millisecond):
  143. if indexu > 0 {
  144. updateSp <- true
  145. go func(arru [][]map[string]interface{}) {
  146. defer func() {
  147. <-updateSp
  148. }()
  149. MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
  150. }(arru[:indexu])
  151. arru = make([][]map[string]interface{}, saveSize)
  152. indexu = 0
  153. }
  154. }
  155. }
  156. }
  157. func updateMethod1() {
  158. arru := make([][]map[string]interface{}, saveSize)
  159. indexu := 0
  160. for {
  161. select {
  162. case v := <-updatePool1:
  163. arru[indexu] = v
  164. indexu++
  165. if indexu == saveSize {
  166. updateSp1 <- true
  167. go func(arru [][]map[string]interface{}) {
  168. defer func() {
  169. <-updateSp1
  170. }()
  171. MongoTool.UpSertBulk("project_forecast", arru...)
  172. }(arru)
  173. arru = make([][]map[string]interface{}, saveSize)
  174. indexu = 0
  175. }
  176. case <-time.After(1000 * time.Millisecond):
  177. if indexu > 0 {
  178. updateSp1 <- true
  179. go func(arru [][]map[string]interface{}) {
  180. defer func() {
  181. <-updateSp1
  182. }()
  183. MongoTool.UpSertBulk("project_forecast", arru...)
  184. }(arru[:indexu])
  185. arru = make([][]map[string]interface{}, saveSize)
  186. indexu = 0
  187. }
  188. }
  189. }
  190. }