package extract import ( db "jy/mongodbutil" ju "jy/util" "log" qu "qfw/util" "qfw/util/elastic" "time" ) var CltLogs []map[string]interface{} //清理日志 var UpdateResult [][]map[string]interface{} //更新抽取结果 func (c *ClearTask) ClearProcess(doc *map[string]interface{}) { qu.Try(func() { //data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"59925222e138231f45d7bb4f"}}}`) _id := qu.BsonIdToSId((*doc)["_id"]) data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`) kvMap := make(map[string][]map[string]interface{}) tmparr := []map[string]interface{}{} tmpmaparr := make(map[string]interface{}) if len(*data) > 0 { list := (*data)[0]["list"].([]interface{}) listArr := qu.ObjArrToMapArr(list) kvMap["list"] = listArr } field := "" for _, l := range c.ClearLuas { beforeval := (*doc)[l.Field] if field == "" { field = l.Field } lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap} clearResult := lua.ClearRunScript() (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值 if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志 logdata := map[string]interface{}{ "code": l.Code, "name": l.Name, "field": l.Field, "type": "clear", "luatext": l.LuaText, "before": beforeval, "value": clearResult[l.Field], } tmparr = append(tmparr, logdata) } } //封装日志信息 tmpmaparr[field] = tmparr tmpmaparr["resultid"] = _id lock.Lock() CltLogs = append(CltLogs, tmpmaparr) lock.Unlock() //封装更新信息 updatearr := []map[string]interface{}{ map[string]interface{}{ "_id": _id, }, map[string]interface{}{ "$set": map[string]interface{}{ field: (*doc)[field], }, }, } lock.Lock() UpdateResult = append(UpdateResult, updatearr) lock.Unlock() }, func(err interface{}) { log.Println((*doc)["_id"], err) <-c.ClearTaskInfo.ProcessPool }) <-c.ClearTaskInfo.ProcessPool } //保存清理日志 func SaveCltLog() { tmpLogs := []map[string]interface{}{} lock.Lock() tmpLogs = CltLogs CltLogs = []map[string]interface{}{} lock.Unlock() if len(tmpLogs) < saveLimit { db.Mgo.SaveBulk("clearlogs", tmpLogs...) } else { for { if len(tmpLogs) > saveLimit { tmp := tmpLogs[:saveLimit] db.Mgo.SaveBulk("clearlogs", tmp...) tmpLogs = tmpLogs[saveLimit:] } else { db.Mgo.SaveBulk("clearlogs", tmpLogs...) break } } } time.AfterFunc(10*time.Second, SaveCltLog) } //批量更新抽取结果的值(todo) func (c *ClearTask) UpdateResultVal() { // defer qu.Catch() // e.ResultChanel = make(chan bool, 5) // e.ResultArr = [][]map[string]interface{}{} // for { // if len(e.ResultArr) > 500 { // e.ResultChanel <- true // arr := e.ResultArr[:500] // go func(tmp *[][]map[string]interface{}) { // qu.Try(func() { // db.Mgo.UpSertBulk("extract_result", *tmp...) // <-e.ResultChanel // }, func(err interface{}) { // log.Println(err) // <-e.ResultChanel // }) // }(&arr) // e.ResultArr = e.ResultArr[500:] // } else { // e.ResultChanel <- true // arr := e.ResultArr // go func(tmp *[][]map[string]interface{}) { // qu.Try(func() { // db.Mgo.UpSertBulk("extract_result", *tmp...) // <-e.ResultChanel // }, func(err interface{}) { // log.Println(err) // <-e.ResultChanel // }) // }(&arr) // e.ResultArr = [][]map[string]interface{}{} // time.Sleep(10 * time.Second) // } // } }