package extract import ( db "jy/mongodbutil" ju "jy/util" qu "qfw/util" "qfw/util/elastic" "time" log "github.com/donnie4w/go-logger/logger" ) var CltLogs []map[string]interface{} //清理日志 func (c *ClearTask) ClearProcess(doc *map[string]interface{}) { qu.Try(func() { //log.Println("==============", elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"596c1b6a5d11e1c74548725f"}}}`)) _id := qu.BsonIdToSId((*doc)["_id"]) data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`) kvMap := make(map[string][]map[string]interface{}) tmpmaparr := make(map[string]interface{}) set := make(map[string]interface{}) if data != nil && len(*data) > 0 { list := (*data)[0]["list"].([]interface{}) listArr := qu.ObjArrToMapArr(list) kvMap["list"] = listArr } for field, luas := range c.ClearLuas { tmparr := []map[string]interface{}{} for _, l := range luas { beforeval := (*doc)[l.Field] lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap} clearResult := lua.ClearRunScript() //清理后结果 (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值 if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志 logdata := map[string]interface{}{ "code": l.Code, "name": l.Name, "field": l.Field, "type": "clear", "luatext": l.LuaText, "before": beforeval, "value": clearResult[l.Field], } tmparr = append(tmparr, logdata) } } tmpmaparr[field] = tmparr // qu.Debug("tmpmaparr--", tmpmaparr) set[field] = (*doc)[field] } tmpmaparr["resultid"] = _id lock.Lock() CltLogs = append(CltLogs, tmpmaparr) //日志信息 lock.Unlock() //封装更新信息 updatearr := []map[string]interface{}{ map[string]interface{}{ "_id": qu.StringTOBsonId(_id), }, map[string]interface{}{ "$set": set, }, } c.RWMutex.Lock() c.UpdateResult = append(c.UpdateResult, updatearr) c.RWMutex.Unlock() }, func(err interface{}) { log.Debug((*doc)["_id"], err) <-c.ClearTaskInfo.ProcessPool }) <-c.ClearTaskInfo.ProcessPool } //保存清理日志 func SaveCltLog() { tmpLogs := []map[string]interface{}{} lock.Lock() tmpLogs = CltLogs CltLogs = []map[string]interface{}{} lock.Unlock() if len(tmpLogs) > 0 { if len(tmpLogs) < saveLimit { db.Mgo.SaveBulk("clearlogs", tmpLogs...) } else { for { if len(tmpLogs) > saveLimit { tmp := tmpLogs[:saveLimit] db.Mgo.SaveBulk("clearlogs", tmp...) tmpLogs = tmpLogs[saveLimit:] } else { db.Mgo.SaveBulk("clearlogs", tmpLogs...) break } } } } time.AfterFunc(3*time.Second, SaveCltLog) } //批量更新抽取结果的值 func (c *ClearTask) UpdateResultVal(init bool) { defer qu.Catch() c.RWMutex.Lock() if c.UpdateResult == nil { c.UpdateResult = [][]map[string]interface{}{} } c.RWMutex.Unlock() if init { go func() { for { c.RWMutex.Lock() if len(c.UpdateResult) > 50 { arr := c.UpdateResult[:50] c.UpdateResult = c.UpdateResult[50:] c.RWMutex.Unlock() qu.Try(func() { c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...) }, func(err interface{}) { log.Debug(err) }) } else { arr := c.UpdateResult c.UpdateResult = [][]map[string]interface{}{} c.RWMutex.Unlock() qu.Try(func() { c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...) }, func(err interface{}) { log.Debug(err) }) time.Sleep(3 * time.Second) } } }() } else { c.RWMutex.Lock() arr := c.UpdateResult c.UpdateResult = [][]map[string]interface{}{} c.RWMutex.Unlock() qu.Try(func() { lenarr := len(arr) for { if lenarr > 50 { arr2 := arr[:50] arr = arr[50:] lenarr = len(arr) c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr2...) time.Sleep(1*time.Second) } else { c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...) break } } //c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...) }, func(err interface{}) { log.Debug(err) }) time.Sleep(1 * time.Second) } // c.ClearChannel = make(chan bool, 5) // c.UpdateResult = [][]map[string]interface{}{} // for { // if len(c.UpdateResult) > 500 { // c.ClearChannel <- true // arr := c.UpdateResult[:500] // go func(tmp *[][]map[string]interface{}) { // qu.Try(func() { // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...) // <-c.ClearChannel // }, func(err interface{}) { // log.Println(err) // <-c.ClearChannel // }) // }(&arr) // c.UpdateResult = c.UpdateResult[500:] // } else { // c.ClearChannel <- true // arr := c.UpdateResult // func(tmp *[][]map[string]interface{}) { // qu.Try(func() { // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...) // <-c.ClearChannel // }, func(err interface{}) { // log.Println(err) // <-c.ClearChannel // }) // }(&arr) // c.UpdateResult = [][]map[string]interface{}{} // time.Sleep(10 * time.Second) // } // } }