clearesult.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package extract
  2. import (
  3. db "jy/mongodbutil"
  4. ju "jy/util"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "time"
  8. log "github.com/donnie4w/go-logger/logger"
  9. )
  10. var CltLogs []map[string]interface{} //清理日志
  11. func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
  12. qu.Try(func() {
  13. //log.Println("==============", elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"596c1b6a5d11e1c74548725f"}}}`))
  14. _id := qu.BsonIdToSId((*doc)["_id"])
  15. data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
  16. kvMap := make(map[string][]map[string]interface{})
  17. tmpmaparr := make(map[string]interface{})
  18. set := make(map[string]interface{})
  19. if data != nil && len(*data) > 0 {
  20. list := (*data)[0]["list"].([]interface{})
  21. listArr := qu.ObjArrToMapArr(list)
  22. kvMap["list"] = listArr
  23. }
  24. for field, luas := range c.ClearLuas {
  25. tmparr := []map[string]interface{}{}
  26. for _, l := range luas {
  27. beforeval := (*doc)[l.Field]
  28. lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
  29. clearResult := lua.ClearRunScript() //清理后结果
  30. (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值
  31. if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
  32. logdata := map[string]interface{}{
  33. "code": l.Code,
  34. "name": l.Name,
  35. "field": l.Field,
  36. "type": "clear",
  37. "luatext": l.LuaText,
  38. "before": beforeval,
  39. "value": clearResult[l.Field],
  40. }
  41. tmparr = append(tmparr, logdata)
  42. }
  43. }
  44. tmpmaparr[field] = tmparr
  45. // qu.Debug("tmpmaparr--", tmpmaparr)
  46. set[field] = (*doc)[field]
  47. }
  48. tmpmaparr["resultid"] = _id
  49. lock.Lock()
  50. CltLogs = append(CltLogs, tmpmaparr) //日志信息
  51. lock.Unlock()
  52. //封装更新信息
  53. updatearr := []map[string]interface{}{
  54. map[string]interface{}{
  55. "_id": qu.StringTOBsonId(_id),
  56. },
  57. map[string]interface{}{
  58. "$set": set,
  59. },
  60. }
  61. lock.Lock()
  62. c.UpdateResult = append(c.UpdateResult, updatearr)
  63. lock.Unlock()
  64. }, func(err interface{}) {
  65. log.Debug((*doc)["_id"], err)
  66. <-c.ClearTaskInfo.ProcessPool
  67. })
  68. <-c.ClearTaskInfo.ProcessPool
  69. }
  70. //保存清理日志
  71. func SaveCltLog() {
  72. tmpLogs := []map[string]interface{}{}
  73. lock.Lock()
  74. tmpLogs = CltLogs
  75. CltLogs = []map[string]interface{}{}
  76. lock.Unlock()
  77. if len(tmpLogs) > 0 {
  78. if len(tmpLogs) < saveLimit {
  79. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  80. } else {
  81. for {
  82. if len(tmpLogs) > saveLimit {
  83. tmp := tmpLogs[:saveLimit]
  84. db.Mgo.SaveBulk("clearlogs", tmp...)
  85. tmpLogs = tmpLogs[saveLimit:]
  86. } else {
  87. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  88. break
  89. }
  90. }
  91. }
  92. }
  93. time.AfterFunc(10*time.Second, SaveCltLog)
  94. }
  95. //批量更新抽取结果的值
  96. func (c *ClearTask) UpdateResultVal(init bool) {
  97. defer qu.Catch()
  98. if c.UpdateResult == nil {
  99. c.UpdateResult = [][]map[string]interface{}{}
  100. }
  101. if init {
  102. go func() {
  103. for {
  104. if len(c.UpdateResult) > 500 {
  105. arr := c.UpdateResult[:500]
  106. c.UpdateResult = c.UpdateResult[500:]
  107. qu.Try(func() {
  108. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  109. }, func(err interface{}) {
  110. log.Debug(err)
  111. })
  112. } else {
  113. arr := c.UpdateResult
  114. c.UpdateResult = [][]map[string]interface{}{}
  115. qu.Try(func() {
  116. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  117. }, func(err interface{}) {
  118. log.Debug(err)
  119. })
  120. time.Sleep(10 * time.Second)
  121. }
  122. }
  123. }()
  124. } else {
  125. arr := c.UpdateResult
  126. c.UpdateResult = [][]map[string]interface{}{}
  127. qu.Try(func() {
  128. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  129. }, func(err interface{}) {
  130. log.Debug(err)
  131. })
  132. time.Sleep(1 * time.Second)
  133. }
  134. // c.ClearChannel = make(chan bool, 5)
  135. // c.UpdateResult = [][]map[string]interface{}{}
  136. // for {
  137. // if len(c.UpdateResult) > 500 {
  138. // c.ClearChannel <- true
  139. // arr := c.UpdateResult[:500]
  140. // go func(tmp *[][]map[string]interface{}) {
  141. // qu.Try(func() {
  142. // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
  143. // <-c.ClearChannel
  144. // }, func(err interface{}) {
  145. // log.Println(err)
  146. // <-c.ClearChannel
  147. // })
  148. // }(&arr)
  149. // c.UpdateResult = c.UpdateResult[500:]
  150. // } else {
  151. // c.ClearChannel <- true
  152. // arr := c.UpdateResult
  153. // func(tmp *[][]map[string]interface{}) {
  154. // qu.Try(func() {
  155. // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
  156. // <-c.ClearChannel
  157. // }, func(err interface{}) {
  158. // log.Println(err)
  159. // <-c.ClearChannel
  160. // })
  161. // }(&arr)
  162. // c.UpdateResult = [][]map[string]interface{}{}
  163. // time.Sleep(10 * time.Second)
  164. // }
  165. // }
  166. }