updateMethod.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package main
  2. import (
  3. "log"
  4. "time"
  5. )
  6. type updateInfo struct {
  7. //更新或新增通道
  8. updatePool chan []map[string]interface{}
  9. //数量
  10. saveSize int
  11. }
  12. var sp = make(chan bool, 5)
  13. func newUpdatePool() *updateInfo {
  14. update:=&updateInfo{make(chan []map[string]interface{}, 50000),500}
  15. return update
  16. }
  17. func (update *updateInfo) updateData() {
  18. log.Println("开始不断监听--待更新数据")
  19. tmpArr := make([][]map[string]interface{}, update.saveSize)
  20. tmpIndex := 0
  21. for {
  22. select {
  23. case value := <-update.updatePool:
  24. tmpArr[tmpIndex] = value
  25. tmpIndex++
  26. if tmpIndex == update.saveSize {
  27. sp <- true
  28. go func(dataArr [][]map[string]interface{}) {
  29. defer func() {
  30. <-sp
  31. }()
  32. mgo.UpSertBulk(extract, dataArr...)
  33. }(tmpArr)
  34. tmpArr = make([][]map[string]interface{}, update.saveSize)
  35. tmpIndex = 0
  36. }
  37. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  38. if tmpIndex > 0 {
  39. sp <- true
  40. go func(dataArr [][]map[string]interface{}) {
  41. defer func() {
  42. <-sp
  43. }()
  44. mgo.UpSertBulk(extract, dataArr...)
  45. }(tmpArr[:tmpIndex])
  46. tmpArr = make([][]map[string]interface{}, update.saveSize)
  47. tmpIndex = 0
  48. }
  49. }
  50. }
  51. }