|
@@ -13,7 +13,7 @@ var CltLogs []map[string]interface{} //清理日志
|
|
|
|
|
|
func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
|
|
|
qu.Try(func() {
|
|
|
- //data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"59925222e138231f45d7bb4f"}}}`)
|
|
|
+ //log.Println("==============", elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"596c1b6a5d11e1c74548725f"}}}`))
|
|
|
_id := qu.BsonIdToSId((*doc)["_id"])
|
|
|
data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
|
|
|
kvMap := make(map[string][]map[string]interface{})
|
|
@@ -45,8 +45,6 @@ func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
tmpmaparr[field] = tmparr
|
|
|
- log.Println(field, "====", len((*doc)[field]), (*doc)[field])
|
|
|
- log.Println("tmpmaparr=====", len(tmpmaparr), tmpmaparr)
|
|
|
set[field] = (*doc)[field]
|
|
|
}
|
|
|
tmpmaparr["resultid"] = _id
|
|
@@ -57,16 +55,14 @@ func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
|
|
|
//封装更新信息
|
|
|
updatearr := []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
|
- "_id": _id,
|
|
|
+ "_id": qu.StringTOBsonId(_id),
|
|
|
},
|
|
|
map[string]interface{}{
|
|
|
"$set": set,
|
|
|
},
|
|
|
}
|
|
|
- log.Println("updatearr====", updatearr)
|
|
|
lock.Lock()
|
|
|
c.UpdateResult = append(c.UpdateResult, updatearr)
|
|
|
- log.Println("len(updatearr)=====", len(c.UpdateResult))
|
|
|
lock.Unlock()
|
|
|
}, func(err interface{}) {
|
|
|
log.Println((*doc)["_id"], err)
|
|
@@ -82,17 +78,19 @@ func SaveCltLog() {
|
|
|
tmpLogs = CltLogs
|
|
|
CltLogs = []map[string]interface{}{}
|
|
|
lock.Unlock()
|
|
|
- if len(tmpLogs) < saveLimit {
|
|
|
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
|
|
|
- } else {
|
|
|
- for {
|
|
|
- if len(tmpLogs) > saveLimit {
|
|
|
- tmp := tmpLogs[:saveLimit]
|
|
|
- db.Mgo.SaveBulk("clearlogs", tmp...)
|
|
|
- tmpLogs = tmpLogs[saveLimit:]
|
|
|
- } else {
|
|
|
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
|
|
|
- break
|
|
|
+ if len(tmpLogs) > 0 {
|
|
|
+ if len(tmpLogs) < saveLimit {
|
|
|
+ db.Mgo.SaveBulk("clearlogs", tmpLogs...)
|
|
|
+ } else {
|
|
|
+ for {
|
|
|
+ if len(tmpLogs) > saveLimit {
|
|
|
+ tmp := tmpLogs[:saveLimit]
|
|
|
+ db.Mgo.SaveBulk("clearlogs", tmp...)
|
|
|
+ tmpLogs = tmpLogs[saveLimit:]
|
|
|
+ } else {
|
|
|
+ db.Mgo.SaveBulk("clearlogs", tmpLogs...)
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -105,9 +103,9 @@ func (c *ClearTask) UpdateResultVal() {
|
|
|
c.ClearChannel = make(chan bool, 5)
|
|
|
c.UpdateResult = [][]map[string]interface{}{}
|
|
|
for {
|
|
|
- if len(c.UpdateResult) > 500 {
|
|
|
+ if len(c.UpdateResult) > 2 {
|
|
|
c.ClearChannel <- true
|
|
|
- arr := c.UpdateResult[:500]
|
|
|
+ arr := c.UpdateResult[:2]
|
|
|
go func(tmp *[][]map[string]interface{}) {
|
|
|
qu.Try(func() {
|
|
|
db.Mgo.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
|
|
@@ -118,7 +116,7 @@ func (c *ClearTask) UpdateResultVal() {
|
|
|
<-c.ClearChannel
|
|
|
})
|
|
|
}(&arr)
|
|
|
- c.UpdateResult = c.UpdateResult[500:]
|
|
|
+ c.UpdateResult = c.UpdateResult[2:]
|
|
|
} else {
|
|
|
c.ClearChannel <- true
|
|
|
arr := c.UpdateResult
|