updateMethod.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package main
  2. import (
  3. "time"
  4. )
  5. //新增组
  6. type addGroupInfo struct {
  7. pool chan map[string]interface{}
  8. saveSize int
  9. }
  10. //更新组
  11. type updateGroupInfo struct {
  12. pool chan []map[string]interface{}
  13. saveSize int
  14. }
  15. //新增融合
  16. type addFusionInfo struct {
  17. pool chan map[string]interface{}
  18. saveSize int
  19. }
  20. //更新融合
  21. type updateFusionInfo struct {
  22. pool chan []map[string]interface{}
  23. saveSize int
  24. }
  25. //新增日志
  26. type addRecordInfo struct {
  27. pool chan map[string]interface{}
  28. saveSize int
  29. }
  30. //更新日志
  31. type updateRecordInfo struct {
  32. pool chan []map[string]interface{}
  33. saveSize int
  34. }
  35. var sp = make(chan bool,5)
  36. func newAddGroupPool() *addGroupInfo {
  37. info:=&addGroupInfo{make(chan map[string]interface{}, 50000),200}
  38. return info
  39. }
  40. func newUpdateGroupPool() *updateGroupInfo {
  41. info:=&updateGroupInfo{make(chan []map[string]interface{}, 50000),200}
  42. return info
  43. }
  44. func newAddFusionPool() *addFusionInfo {
  45. info:=&addFusionInfo{make(chan map[string]interface{}, 50000),200}
  46. return info
  47. }
  48. func newupdateFusionPool() *updateFusionInfo {
  49. info:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),200}
  50. return info
  51. }
  52. func newaddRecordPool() *addRecordInfo {
  53. info:=&addRecordInfo{make(chan map[string]interface{}, 50000),200}
  54. return info
  55. }
  56. func newupdateRecordPool() *updateRecordInfo {
  57. info:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),200}
  58. return info
  59. }
  60. //新增-组数据
  61. func (info *addGroupInfo) addGroupData() {
  62. tmpArr := make([]map[string]interface{}, info.saveSize)
  63. tmpIndex := 0
  64. for {
  65. select {
  66. case value := <-info.pool:
  67. tmpArr[tmpIndex] = value
  68. tmpIndex++
  69. if tmpIndex == info.saveSize {
  70. sp <- true
  71. go func(dataArr []map[string]interface{}) {
  72. defer func() {
  73. <-sp
  74. }()
  75. mgo.SaveBulk(group_coll_name, dataArr...)
  76. }(tmpArr)
  77. tmpArr = make([]map[string]interface{}, info.saveSize)
  78. tmpIndex = 0
  79. }
  80. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  81. if tmpIndex > 0 {
  82. sp <- true
  83. go func(dataArr []map[string]interface{}) {
  84. defer func() {
  85. <-sp
  86. }()
  87. mgo.SaveBulk(group_coll_name, dataArr...)
  88. }(tmpArr[:tmpIndex])
  89. tmpArr = make([]map[string]interface{}, info.saveSize)
  90. tmpIndex = 0
  91. }
  92. }
  93. }
  94. }
  95. //更新-组数据
  96. func (info *updateGroupInfo) updateGroupData() {
  97. tmpArr := make([][]map[string]interface{}, info.saveSize)
  98. tmpIndex := 0
  99. for {
  100. select {
  101. case value := <-info.pool:
  102. tmpArr[tmpIndex] = value
  103. tmpIndex++
  104. if tmpIndex == info.saveSize {
  105. sp <- true
  106. go func(dataArr [][]map[string]interface{}) {
  107. defer func() {
  108. <-sp
  109. }()
  110. //批量更新
  111. mgo.UpSertBulk(group_coll_name, dataArr...)
  112. }(tmpArr)
  113. tmpArr = make([][]map[string]interface{}, info.saveSize)
  114. tmpIndex = 0
  115. }
  116. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  117. if tmpIndex > 0 {
  118. sp <- true
  119. go func(dataArr [][]map[string]interface{}) {
  120. defer func() {
  121. <-sp
  122. }()
  123. //批量更新
  124. mgo.UpSertBulk(group_coll_name, dataArr...)
  125. }(tmpArr[:tmpIndex])
  126. tmpArr = make([][]map[string]interface{}, info.saveSize)
  127. tmpIndex = 0
  128. }
  129. }
  130. }
  131. }
  132. //新增融合数据
  133. func (info *addFusionInfo) addFusionData() {
  134. tmpArr := make([]map[string]interface{}, info.saveSize)
  135. tmpIndex := 0
  136. for {
  137. select {
  138. case value := <-info.pool:
  139. tmpArr[tmpIndex] = value
  140. tmpIndex++
  141. if tmpIndex == info.saveSize {
  142. sp <- true
  143. go func(dataArr []map[string]interface{}) {
  144. defer func() {
  145. <-sp
  146. }()
  147. mgo.SaveBulk(fusion_coll_name, dataArr...)
  148. }(tmpArr)
  149. tmpArr = make([]map[string]interface{}, info.saveSize)
  150. tmpIndex = 0
  151. }
  152. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  153. if tmpIndex > 0 {
  154. sp <- true
  155. go func(dataArr []map[string]interface{}) {
  156. defer func() {
  157. <-sp
  158. }()
  159. mgo.SaveBulk(fusion_coll_name, dataArr...)
  160. }(tmpArr[:tmpIndex])
  161. tmpArr = make([]map[string]interface{}, info.saveSize)
  162. tmpIndex = 0
  163. }
  164. }
  165. }
  166. }
  167. //更新融合数据
  168. func (info *updateFusionInfo) updateFusionData() {
  169. tmpArr := make([][]map[string]interface{}, info.saveSize)
  170. tmpIndex := 0
  171. for {
  172. select {
  173. case value := <-info.pool:
  174. tmpArr[tmpIndex] = value
  175. tmpIndex++
  176. if tmpIndex == info.saveSize {
  177. sp <- true
  178. go func(dataArr [][]map[string]interface{}) {
  179. defer func() {
  180. <-sp
  181. }()
  182. //批量更新
  183. mgo.UpSertBulk(fusion_coll_name, dataArr...)
  184. }(tmpArr)
  185. tmpArr = make([][]map[string]interface{}, info.saveSize)
  186. tmpIndex = 0
  187. }
  188. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  189. if tmpIndex > 0 {
  190. sp <- true
  191. go func(dataArr [][]map[string]interface{}) {
  192. defer func() {
  193. <-sp
  194. }()
  195. //批量更新
  196. mgo.UpSertBulk(fusion_coll_name, dataArr...)
  197. }(tmpArr[:tmpIndex])
  198. tmpArr = make([][]map[string]interface{}, info.saveSize)
  199. tmpIndex = 0
  200. }
  201. }
  202. }
  203. }
  204. //新增日志数据
  205. func (info *addRecordInfo) addRecordData() {
  206. tmpArr := make([]map[string]interface{}, info.saveSize)
  207. tmpIndex := 0
  208. for {
  209. select {
  210. case value := <-info.pool:
  211. tmpArr[tmpIndex] = value
  212. tmpIndex++
  213. if tmpIndex == info.saveSize {
  214. sp <- true
  215. go func(dataArr []map[string]interface{}) {
  216. defer func() {
  217. <-sp
  218. }()
  219. mgo.SaveBulk(record_coll_name, dataArr...)
  220. }(tmpArr)
  221. tmpArr = make([]map[string]interface{}, info.saveSize)
  222. tmpIndex = 0
  223. }
  224. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  225. if tmpIndex > 0 {
  226. sp <- true
  227. go func(dataArr []map[string]interface{}) {
  228. defer func() {
  229. <-sp
  230. }()
  231. mgo.SaveBulk(record_coll_name, dataArr...)
  232. }(tmpArr[:tmpIndex])
  233. tmpArr = make([]map[string]interface{}, info.saveSize)
  234. tmpIndex = 0
  235. }
  236. }
  237. }
  238. }
  239. //更新日志数据
  240. func (info *updateRecordInfo) updateRecordData() {
  241. tmpArr := make([][]map[string]interface{}, info.saveSize)
  242. tmpIndex := 0
  243. for {
  244. select {
  245. case value := <-info.pool:
  246. tmpArr[tmpIndex] = value
  247. tmpIndex++
  248. if tmpIndex == info.saveSize {
  249. sp <- true
  250. go func(dataArr [][]map[string]interface{}) {
  251. defer func() {
  252. <-sp
  253. }()
  254. //批量更新
  255. mgo.UpSertBulk(record_coll_name, dataArr...)
  256. }(tmpArr)
  257. tmpArr = make([][]map[string]interface{}, info.saveSize)
  258. tmpIndex = 0
  259. }
  260. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  261. if tmpIndex > 0 {
  262. sp <- true
  263. go func(dataArr [][]map[string]interface{}) {
  264. defer func() {
  265. <-sp
  266. }()
  267. //批量更新
  268. mgo.UpSertBulk(record_coll_name, dataArr...)
  269. }(tmpArr[:tmpIndex])
  270. tmpArr = make([][]map[string]interface{}, info.saveSize)
  271. tmpIndex = 0
  272. }
  273. }
  274. }
  275. }