123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package extract
- import (
- db "jy/mongodbutil"
- ju "jy/util"
- "log"
- qu "qfw/util"
- "qfw/util/elastic"
- "time"
- )
- var CltLogs []map[string]interface{} //清理日志
- var UpdateResult [][]map[string]interface{} //更新抽取结果
- func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
- qu.Try(func() {
- //data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"59925222e138231f45d7bb4f"}}}`)
- _id := qu.BsonIdToSId((*doc)["_id"])
- data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
- kvMap := make(map[string][]map[string]interface{})
- tmparr := []map[string]interface{}{}
- tmpmaparr := make(map[string]interface{})
- if len(*data) > 0 {
- list := (*data)[0]["list"].([]interface{})
- listArr := qu.ObjArrToMapArr(list)
- kvMap["list"] = listArr
- }
- field := ""
- for _, l := range c.ClearLuas {
- beforeval := (*doc)[l.Field]
- if field == "" {
- field = l.Field
- }
- lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
- clearResult := lua.ClearRunScript()
- (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值
- if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
- logdata := map[string]interface{}{
- "code": l.Code,
- "name": l.Name,
- "field": l.Field,
- "type": "clear",
- "luatext": l.LuaText,
- "before": beforeval,
- "value": clearResult[l.Field],
- }
- tmparr = append(tmparr, logdata)
- }
- }
- //封装日志信息
- tmpmaparr[field] = tmparr
- tmpmaparr["resultid"] = _id
- lock.Lock()
- CltLogs = append(CltLogs, tmpmaparr)
- lock.Unlock()
- //封装更新信息
- updatearr := []map[string]interface{}{
- map[string]interface{}{
- "_id": _id,
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- field: (*doc)[field],
- },
- },
- }
- lock.Lock()
- UpdateResult = append(UpdateResult, updatearr)
- lock.Unlock()
- }, func(err interface{}) {
- log.Println((*doc)["_id"], err)
- <-c.ClearTaskInfo.ProcessPool
- })
- <-c.ClearTaskInfo.ProcessPool
- }
- //保存清理日志
- func SaveCltLog() {
- tmpLogs := []map[string]interface{}{}
- lock.Lock()
- tmpLogs = CltLogs
- CltLogs = []map[string]interface{}{}
- lock.Unlock()
- if len(tmpLogs) < saveLimit {
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
- } else {
- for {
- if len(tmpLogs) > saveLimit {
- tmp := tmpLogs[:saveLimit]
- db.Mgo.SaveBulk("clearlogs", tmp...)
- tmpLogs = tmpLogs[saveLimit:]
- } else {
- db.Mgo.SaveBulk("clearlogs", tmpLogs...)
- break
- }
- }
- }
- time.AfterFunc(10*time.Second, SaveCltLog)
- }
- //批量更新抽取结果的值(todo)
- func (c *ClearTask) UpdateResultVal() {
- // defer qu.Catch()
- // e.ResultChanel = make(chan bool, 5)
- // e.ResultArr = [][]map[string]interface{}{}
- // for {
- // if len(e.ResultArr) > 500 {
- // e.ResultChanel <- true
- // arr := e.ResultArr[:500]
- // go func(tmp *[][]map[string]interface{}) {
- // qu.Try(func() {
- // db.Mgo.UpSertBulk("extract_result", *tmp...)
- // <-e.ResultChanel
- // }, func(err interface{}) {
- // log.Println(err)
- // <-e.ResultChanel
- // })
- // }(&arr)
- // e.ResultArr = e.ResultArr[500:]
- // } else {
- // e.ResultChanel <- true
- // arr := e.ResultArr
- // go func(tmp *[][]map[string]interface{}) {
- // qu.Try(func() {
- // db.Mgo.UpSertBulk("extract_result", *tmp...)
- // <-e.ResultChanel
- // }, func(err interface{}) {
- // log.Println(err)
- // <-e.ResultChanel
- // })
- // }(&arr)
- // e.ResultArr = [][]map[string]interface{}{}
- // time.Sleep(10 * time.Second)
- // }
- // }
- }
|