|
@@ -96,7 +96,6 @@ func NewPT() *ProjectTask {
|
|
|
|
|
|
var P_QL *ProjectTask
|
|
var P_QL *ProjectTask
|
|
var sp = make(chan bool, 5)
|
|
var sp = make(chan bool, 5)
|
|
-var updatalock sync.RWMutex
|
|
|
|
//初始化全量合并对象
|
|
//初始化全量合并对象
|
|
func init() {
|
|
func init() {
|
|
P_QL = NewPT()
|
|
P_QL = NewPT()
|
|
@@ -114,32 +113,26 @@ func (p *ProjectTask) updateAllQueue() {
|
|
arru[indexu] = v
|
|
arru[indexu] = v
|
|
indexu++
|
|
indexu++
|
|
if indexu == p.saveSize {
|
|
if indexu == p.saveSize {
|
|
- updatalock.Lock()
|
|
|
|
- tmparr := arru
|
|
|
|
- arru = make([][]map[string]interface{}, p.saveSize)
|
|
|
|
- updatalock.Unlock()
|
|
|
|
sp <- true
|
|
sp <- true
|
|
- go func(tmparr [][]map[string]interface{}) {
|
|
|
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
defer func() {
|
|
defer func() {
|
|
<-sp
|
|
<-sp
|
|
}()
|
|
}()
|
|
- MongoTool.UpSertBulk(p.coll, tmparr...)
|
|
|
|
- }(tmparr)
|
|
|
|
|
|
+ MongoTool.UpSertBulk(p.coll, arru...)
|
|
|
|
+ }(arru)
|
|
|
|
+ arru = make([][]map[string]interface{}, p.saveSize)
|
|
indexu = 0
|
|
indexu = 0
|
|
}
|
|
}
|
|
case <-time.After(1000 * time.Millisecond):
|
|
case <-time.After(1000 * time.Millisecond):
|
|
if indexu > 0 {
|
|
if indexu > 0 {
|
|
- updatalock.Lock()
|
|
|
|
- tmparr := arru
|
|
|
|
- arru = make([][]map[string]interface{}, p.saveSize)
|
|
|
|
- updatalock.Unlock()
|
|
|
|
sp <- true
|
|
sp <- true
|
|
- go func(tmparr [][]map[string]interface{}) {
|
|
|
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
defer func() {
|
|
defer func() {
|
|
<-sp
|
|
<-sp
|
|
}()
|
|
}()
|
|
- MongoTool.UpSertBulk(p.coll, tmparr...)
|
|
|
|
- }(tmparr[:indexu])
|
|
|
|
|
|
+ MongoTool.UpSertBulk(p.coll, arru...)
|
|
|
|
+ }(arru[:indexu])
|
|
|
|
+ arru = make([][]map[string]interface{}, p.saveSize)
|
|
indexu = 0
|
|
indexu = 0
|
|
}
|
|
}
|
|
}
|
|
}
|