123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package main
- import (
- "log"
- "time"
- )
- var sp = make(chan bool, 5)
- type updateInfo struct { //更新或新增通道
- updatePool chan []map[string]interface{}
- saveSize int
- }
- func newUpdatePool() *updateInfo {
- update := &updateInfo{make(chan []map[string]interface{}, 50000), 200}
- return update
- }
- // 临时~新增组
- type addGroupInfo struct {
- pool chan map[string]interface{}
- saveSize int
- }
- func newAddGroupPool() *addGroupInfo {
- info := &addGroupInfo{make(chan map[string]interface{}, 50000), 200}
- return info
- }
- // 监听更新
- func (update *updateInfo) updateData() {
- log.Println("开始不断监听--待更新数据")
- tmpArr := make([][]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.updatePool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- data_mgo.UpSertBulk(extract, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(5 * time.Second): //无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- data_mgo.UpSertBulk(extract, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- // 监听新增
- func (info *addGroupInfo) addGroupData() {
- tmpArr := make([]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
- }(tmpArr)
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(7 * time.Second): //无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|