clearesult.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package extract
  2. import (
  3. db "jy/mongodbutil"
  4. ju "jy/util"
  5. "log"
  6. qu "qfw/util"
  7. "qfw/util/elastic"
  8. "time"
  9. )
  10. var CltLogs []map[string]interface{} //清理日志
  11. var UpdateResult [][]map[string]interface{} //更新抽取结果
  12. func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
  13. qu.Try(func() {
  14. //data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"59925222e138231f45d7bb4f"}}}`)
  15. _id := qu.BsonIdToSId((*doc)["_id"])
  16. data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
  17. kvMap := make(map[string][]map[string]interface{})
  18. tmparr := []map[string]interface{}{}
  19. tmpmaparr := make(map[string]interface{})
  20. if len(*data) > 0 {
  21. list := (*data)[0]["list"].([]interface{})
  22. listArr := qu.ObjArrToMapArr(list)
  23. kvMap["list"] = listArr
  24. }
  25. field := ""
  26. for _, l := range c.ClearLuas {
  27. beforeval := (*doc)[l.Field]
  28. if field == "" {
  29. field = l.Field
  30. }
  31. lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
  32. clearResult := lua.ClearRunScript()
  33. (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值
  34. if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
  35. logdata := map[string]interface{}{
  36. "code": l.Code,
  37. "name": l.Name,
  38. "field": l.Field,
  39. "type": "clear",
  40. "luatext": l.LuaText,
  41. "before": beforeval,
  42. "value": clearResult[l.Field],
  43. }
  44. tmparr = append(tmparr, logdata)
  45. }
  46. }
  47. //封装日志信息
  48. tmpmaparr[field] = tmparr
  49. tmpmaparr["resultid"] = _id
  50. lock.Lock()
  51. CltLogs = append(CltLogs, tmpmaparr)
  52. lock.Unlock()
  53. //封装更新信息
  54. updatearr := []map[string]interface{}{
  55. map[string]interface{}{
  56. "_id": _id,
  57. },
  58. map[string]interface{}{
  59. "$set": map[string]interface{}{
  60. field: (*doc)[field],
  61. },
  62. },
  63. }
  64. lock.Lock()
  65. UpdateResult = append(UpdateResult, updatearr)
  66. lock.Unlock()
  67. }, func(err interface{}) {
  68. log.Println((*doc)["_id"], err)
  69. <-c.ClearTaskInfo.ProcessPool
  70. })
  71. <-c.ClearTaskInfo.ProcessPool
  72. }
  73. //保存清理日志
  74. func SaveCltLog() {
  75. tmpLogs := []map[string]interface{}{}
  76. lock.Lock()
  77. tmpLogs = CltLogs
  78. CltLogs = []map[string]interface{}{}
  79. lock.Unlock()
  80. if len(tmpLogs) < saveLimit {
  81. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  82. } else {
  83. for {
  84. if len(tmpLogs) > saveLimit {
  85. tmp := tmpLogs[:saveLimit]
  86. db.Mgo.SaveBulk("clearlogs", tmp...)
  87. tmpLogs = tmpLogs[saveLimit:]
  88. } else {
  89. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  90. break
  91. }
  92. }
  93. }
  94. time.AfterFunc(10*time.Second, SaveCltLog)
  95. }
  96. //批量更新抽取结果的值(todo)
  97. func (c *ClearTask) UpdateResultVal() {
  98. // defer qu.Catch()
  99. // e.ResultChanel = make(chan bool, 5)
  100. // e.ResultArr = [][]map[string]interface{}{}
  101. // for {
  102. // if len(e.ResultArr) > 500 {
  103. // e.ResultChanel <- true
  104. // arr := e.ResultArr[:500]
  105. // go func(tmp *[][]map[string]interface{}) {
  106. // qu.Try(func() {
  107. // db.Mgo.UpSertBulk("extract_result", *tmp...)
  108. // <-e.ResultChanel
  109. // }, func(err interface{}) {
  110. // log.Println(err)
  111. // <-e.ResultChanel
  112. // })
  113. // }(&arr)
  114. // e.ResultArr = e.ResultArr[500:]
  115. // } else {
  116. // e.ResultChanel <- true
  117. // arr := e.ResultArr
  118. // go func(tmp *[][]map[string]interface{}) {
  119. // qu.Try(func() {
  120. // db.Mgo.UpSertBulk("extract_result", *tmp...)
  121. // <-e.ResultChanel
  122. // }, func(err interface{}) {
  123. // log.Println(err)
  124. // <-e.ResultChanel
  125. // })
  126. // }(&arr)
  127. // e.ResultArr = [][]map[string]interface{}{}
  128. // time.Sleep(10 * time.Second)
  129. // }
  130. // }
  131. }