123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package extract
- import (
- db "jy/mongodbutil"
- ju "jy/util"
- qu "qfw/util"
- "qfw/util/elastic"
- "time"
- log "github.com/donnie4w/go-logger/logger"
- )
- var CltLogs []map[string]interface{} //清理日志
- func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
- qu.Try(func() {
- //log.Println("==============", elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"596c1b6a5d11e1c74548725f"}}}`))
- _id := qu.BsonIdToSId((*doc)["_id"])
- data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
- kvMap := make(map[string][]map[string]interface{})
- tmpmaparr := make(map[string]interface{})
- set := make(map[string]interface{})
- if data != nil && len(*data) > 0 {
- list := (*data)[0]["list"].([]interface{})
- listArr := qu.ObjArrToMapArr(list)
- kvMap["list"] = listArr
- }
- for field, luas := range c.ClearLuas {
- tmparr := []map[string]interface{}{}
- for _, l := range luas {
- beforeval := (*doc)[l.Field]
- lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
- clearResult := lua.ClearRunScript() //清理后结果
- (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值
- if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
- logdata := map[string]interface{}{
- "code": l.Code,
- "name": l.Name,
- "field": l.Field,
- "type": "clear",
- "luatext": l.LuaText,
- "before": beforeval,
- "value": clearResult[l.Field],
- }
- tmparr = append(tmparr, logdata)
- }
- }
- tmpmaparr[field] = tmparr
- // qu.Debug("tmpmaparr--", tmpmaparr)
- set[field] = (*doc)[field]
- }
- tmpmaparr["resultid"] = _id
- lock.Lock()
- CltLogs = append(CltLogs, tmpmaparr) //日志信息
- lock.Unlock()
- //封装更新信息
- updatearr := []map[string]interface{}{
- map[string]interface{}{
- "_id": qu.StringTOBsonId(_id),
- },
- map[string]interface{}{
- "$set": set,
- },
- }
- lock.Lock()
- c.UpdateResult = append(c.UpdateResult, updatearr)
- lock.Unlock()
- }, func(err interface{}) {
- log.Debug((*doc)["_id"], err)
- <-c.ClearTaskInfo.ProcessPool
- })
- <-c.ClearTaskInfo.ProcessPool
- }
- //保存清理日志
- func SaveCltLog() {
- tmpLogs := []map[string]interface{}{}
- lock.Lock()
- tmpLogs = CltLogs
- CltLogs = []map[string]interface{}{}
- lock.Unlock()
- if len(tmpLogs) > 0 {
- if len(tmpLogs) < saveLimit {
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
- } else {
- for {
- if len(tmpLogs) > saveLimit {
- tmp := tmpLogs[:saveLimit]
- db.Mgo.SaveBulk("clearlogs", tmp...)
- tmpLogs = tmpLogs[saveLimit:]
- } else {
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
- break
- }
- }
- }
- }
- time.AfterFunc(10*time.Second, SaveCltLog)
- }
- //批量更新抽取结果的值
- func (c *ClearTask) UpdateResultVal(init bool) {
- defer qu.Catch()
- if c.UpdateResult == nil {
- c.UpdateResult = [][]map[string]interface{}{}
- }
- if init {
- go func() {
- for {
- if len(c.UpdateResult) > 500 {
- arr := c.UpdateResult[:500]
- c.UpdateResult = c.UpdateResult[500:]
- qu.Try(func() {
- c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
- }, func(err interface{}) {
- log.Debug(err)
- })
- } else {
- arr := c.UpdateResult
- c.UpdateResult = [][]map[string]interface{}{}
- qu.Try(func() {
- c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
- }, func(err interface{}) {
- log.Debug(err)
- })
- time.Sleep(10 * time.Second)
- }
- }
- }()
- } else {
- arr := c.UpdateResult
- c.UpdateResult = [][]map[string]interface{}{}
- qu.Try(func() {
- c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
- }, func(err interface{}) {
- log.Debug(err)
- })
- time.Sleep(1 * time.Second)
- }
- // c.ClearChannel = make(chan bool, 5)
- // c.UpdateResult = [][]map[string]interface{}{}
- // for {
- // if len(c.UpdateResult) > 500 {
- // c.ClearChannel <- true
- // arr := c.UpdateResult[:500]
- // go func(tmp *[][]map[string]interface{}) {
- // qu.Try(func() {
- // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
- // <-c.ClearChannel
- // }, func(err interface{}) {
- // log.Println(err)
- // <-c.ClearChannel
- // })
- // }(&arr)
- // c.UpdateResult = c.UpdateResult[500:]
- // } else {
- // c.ClearChannel <- true
- // arr := c.UpdateResult
- // func(tmp *[][]map[string]interface{}) {
- // qu.Try(func() {
- // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
- // <-c.ClearChannel
- // }, func(err interface{}) {
- // log.Println(err)
- // <-c.ClearChannel
- // })
- // }(&arr)
- // c.UpdateResult = [][]map[string]interface{}{}
- // time.Sleep(10 * time.Second)
- // }
- // }
- }
|