123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package main
- import (
- "time"
- )
- //新增组
- type addGroupInfo struct {
- pool chan map[string]interface{}
- saveSize int
- }
- //更新组
- type updateGroupInfo struct {
- pool chan []map[string]interface{}
- saveSize int
- }
- //新增融合
- type addFusionInfo struct {
- pool chan map[string]interface{}
- saveSize int
- }
- //更新融合
- type updateFusionInfo struct {
- pool chan []map[string]interface{}
- saveSize int
- }
- //新增日志
- type addRecordInfo struct {
- pool chan map[string]interface{}
- saveSize int
- }
- //更新日志
- type updateRecordInfo struct {
- pool chan []map[string]interface{}
- saveSize int
- }
- var sp = make(chan bool,5)
- func newAddGroupPool() *addGroupInfo {
- info:=&addGroupInfo{make(chan map[string]interface{}, 50000),200}
- return info
- }
- func newUpdateGroupPool() *updateGroupInfo {
- info:=&updateGroupInfo{make(chan []map[string]interface{}, 50000),200}
- return info
- }
- func newAddFusionPool() *addFusionInfo {
- info:=&addFusionInfo{make(chan map[string]interface{}, 50000),200}
- return info
- }
- func newupdateFusionPool() *updateFusionInfo {
- info:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),200}
- return info
- }
- func newaddRecordPool() *addRecordInfo {
- info:=&addRecordInfo{make(chan map[string]interface{}, 50000),200}
- return info
- }
- func newupdateRecordPool() *updateRecordInfo {
- info:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),200}
- return info
- }
- //新增-组数据
- func (info *addGroupInfo) addGroupData() {
- tmpArr := make([]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(group_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(group_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //更新-组数据
- func (info *updateGroupInfo) updateGroupData() {
- tmpArr := make([][]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(group_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(group_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //新增融合数据
- func (info *addFusionInfo) addFusionData() {
- tmpArr := make([]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(fusion_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(fusion_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //更新融合数据
- func (info *updateFusionInfo) updateFusionData() {
- tmpArr := make([][]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(fusion_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(fusion_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //新增日志数据
- func (info *addRecordInfo) addRecordData() {
- tmpArr := make([]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(record_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.SaveBulk(record_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //更新日志数据
- func (info *updateRecordInfo) updateRecordData() {
- tmpArr := make([][]map[string]interface{}, info.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-info.pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == info.saveSize {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- //批量更新
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, info.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|