clearesult.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package extract
  2. import (
  3. db "jy/mongodbutil"
  4. ju "jy/util"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "time"
  8. log "github.com/donnie4w/go-logger/logger"
  9. )
  10. var CltLogs []map[string]interface{} //清理日志
  11. func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
  12. qu.Try(func() {
  13. //log.Println("==============", elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"596c1b6a5d11e1c74548725f"}}}`))
  14. _id := qu.BsonIdToSId((*doc)["_id"])
  15. data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
  16. kvMap := make(map[string][]map[string]interface{})
  17. tmpmaparr := make(map[string]interface{})
  18. set := make(map[string]interface{})
  19. if data != nil && len(*data) > 0 {
  20. list := (*data)[0]["list"].([]interface{})
  21. listArr := qu.ObjArrToMapArr(list)
  22. kvMap["list"] = listArr
  23. }
  24. for field, luas := range c.ClearLuas {
  25. tmparr := []map[string]interface{}{}
  26. for _, l := range luas {
  27. beforeval := (*doc)[l.Field]
  28. lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
  29. clearResult := lua.ClearRunScript() //清理后结果
  30. (*doc)[l.Field] = clearResult[l.Field] //覆盖原来要清理的字段的值
  31. if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
  32. logdata := map[string]interface{}{
  33. "code": l.Code,
  34. "name": l.Name,
  35. "field": l.Field,
  36. "type": "clear",
  37. "luatext": l.LuaText,
  38. "before": beforeval,
  39. "value": clearResult[l.Field],
  40. }
  41. tmparr = append(tmparr, logdata)
  42. }
  43. }
  44. tmpmaparr[field] = tmparr
  45. // qu.Debug("tmpmaparr--", tmpmaparr)
  46. set[field] = (*doc)[field]
  47. }
  48. tmpmaparr["resultid"] = _id
  49. lock.Lock()
  50. CltLogs = append(CltLogs, tmpmaparr) //日志信息
  51. lock.Unlock()
  52. //封装更新信息
  53. updatearr := []map[string]interface{}{
  54. map[string]interface{}{
  55. "_id": qu.StringTOBsonId(_id),
  56. },
  57. map[string]interface{}{
  58. "$set": set,
  59. },
  60. }
  61. c.RWMutex.Lock()
  62. c.UpdateResult = append(c.UpdateResult, updatearr)
  63. c.RWMutex.Unlock()
  64. }, func(err interface{}) {
  65. log.Debug((*doc)["_id"], err)
  66. <-c.ClearTaskInfo.ProcessPool
  67. })
  68. <-c.ClearTaskInfo.ProcessPool
  69. }
  70. //保存清理日志
  71. func SaveCltLog() {
  72. tmpLogs := []map[string]interface{}{}
  73. lock.Lock()
  74. tmpLogs = CltLogs
  75. CltLogs = []map[string]interface{}{}
  76. lock.Unlock()
  77. if len(tmpLogs) > 0 {
  78. if len(tmpLogs) < saveLimit {
  79. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  80. } else {
  81. for {
  82. if len(tmpLogs) > saveLimit {
  83. tmp := tmpLogs[:saveLimit]
  84. db.Mgo.SaveBulk("clearlogs", tmp...)
  85. tmpLogs = tmpLogs[saveLimit:]
  86. } else {
  87. db.Mgo.SaveBulk("clearlogs", tmpLogs...)
  88. break
  89. }
  90. }
  91. }
  92. }
  93. time.AfterFunc(3*time.Second, SaveCltLog)
  94. }
  95. //批量更新抽取结果的值
  96. func (c *ClearTask) UpdateResultVal(init bool) {
  97. defer qu.Catch()
  98. c.RWMutex.Lock()
  99. if c.UpdateResult == nil {
  100. c.UpdateResult = [][]map[string]interface{}{}
  101. }
  102. c.RWMutex.Unlock()
  103. if init {
  104. go func() {
  105. for {
  106. c.RWMutex.Lock()
  107. if len(c.UpdateResult) > 50 {
  108. arr := c.UpdateResult[:50]
  109. c.UpdateResult = c.UpdateResult[50:]
  110. c.RWMutex.Unlock()
  111. qu.Try(func() {
  112. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  113. }, func(err interface{}) {
  114. log.Debug(err)
  115. })
  116. } else {
  117. arr := c.UpdateResult
  118. c.UpdateResult = [][]map[string]interface{}{}
  119. c.RWMutex.Unlock()
  120. qu.Try(func() {
  121. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  122. }, func(err interface{}) {
  123. log.Debug(err)
  124. })
  125. time.Sleep(3 * time.Second)
  126. }
  127. }
  128. }()
  129. } else {
  130. c.RWMutex.Lock()
  131. arr := c.UpdateResult
  132. c.UpdateResult = [][]map[string]interface{}{}
  133. c.RWMutex.Unlock()
  134. qu.Try(func() {
  135. lenarr := len(arr)
  136. for {
  137. if lenarr > 50 {
  138. arr2 := arr[:50]
  139. arr = arr[50:]
  140. lenarr = len(arr)
  141. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr2...)
  142. time.Sleep(1*time.Second)
  143. } else {
  144. c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  145. break
  146. }
  147. }
  148. //c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
  149. }, func(err interface{}) {
  150. log.Debug(err)
  151. })
  152. time.Sleep(1 * time.Second)
  153. }
  154. // c.ClearChannel = make(chan bool, 5)
  155. // c.UpdateResult = [][]map[string]interface{}{}
  156. // for {
  157. // if len(c.UpdateResult) > 500 {
  158. // c.ClearChannel <- true
  159. // arr := c.UpdateResult[:500]
  160. // go func(tmp *[][]map[string]interface{}) {
  161. // qu.Try(func() {
  162. // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
  163. // <-c.ClearChannel
  164. // }, func(err interface{}) {
  165. // log.Println(err)
  166. // <-c.ClearChannel
  167. // })
  168. // }(&arr)
  169. // c.UpdateResult = c.UpdateResult[500:]
  170. // } else {
  171. // c.ClearChannel <- true
  172. // arr := c.UpdateResult
  173. // func(tmp *[][]map[string]interface{}) {
  174. // qu.Try(func() {
  175. // c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
  176. // <-c.ClearChannel
  177. // }, func(err interface{}) {
  178. // log.Println(err)
  179. // <-c.ClearChannel
  180. // })
  181. // }(&arr)
  182. // c.UpdateResult = [][]map[string]interface{}{}
  183. // time.Sleep(10 * time.Second)
  184. // }
  185. // }
  186. }