exportask.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // exportask
  2. package extract
  3. import (
  4. "fmt"
  5. db "jy/mongodbutil"
  6. ju "jy/util"
  7. "log"
  8. qu "qfw/util"
  9. "time"
  10. "github.com/tealeg/xlsx"
  11. )
  12. //执行导出定时任务
  13. func Export() {
  14. tk, _ := db.Mgo.Find("task_export", `{"state":0}`, nil, nil, false, -1, -1)
  15. for _, t := range *tk {
  16. extractAndExport("v1", t)
  17. extractAndExport("v2", t)
  18. time.Sleep(20 * time.Second)
  19. //生成excel
  20. filename := createFile(t)
  21. if filename != "" {
  22. db.Mgo.UpdateById("task_export", qu.BsonIdToSId(t["_id"]),
  23. map[string]interface{}{
  24. "$set": map[string]interface{}{
  25. "state": 1,
  26. "filename": filename,
  27. },
  28. })
  29. }
  30. }
  31. time.AfterFunc(30*time.Minute, Export)
  32. }
  33. func extractAndExport(v string, t map[string]interface{}) {
  34. db.Mgo.Del("task_export_result_"+v, `{}`) //清除历史数据
  35. e := &ExtractTask{}
  36. e.IsRun = true
  37. e.TaskInfo = &TaskInfo{
  38. Name: t["name"].(string),
  39. Version: t[v+"name"].(string),
  40. VersionId: t[v+"id"].(string),
  41. FromDbAddr: t["dbaddr"].(string),
  42. FromDB: t["dbname"].(string),
  43. FromColl: t["table"].(string),
  44. TestColl: "task_export_result_" + v,
  45. TrackColl: "task_export_tract_" + v,
  46. IsEtxLog: false,
  47. ProcessPool: make(chan bool, 5),
  48. }
  49. e.TaskInfo.FDB = db.MgoFactory(1, 3, 120, fmt.Sprint(t["dbaddr"]), fmt.Sprint(t["dbname"]))
  50. e.InitSite()
  51. e.InitRulePres()
  52. e.InitRuleBacks(false)
  53. e.InitRuleBacks(true)
  54. e.InitRuleCore(false)
  55. e.InitRuleCore(true)
  56. e.InitTag(false)
  57. e.InitTag(true)
  58. e.InitClearFn(false)
  59. e.InitClearFn(true)
  60. e.InfoTypeList()
  61. e.InitBlockRule()
  62. e.InitPkgCore()
  63. //品牌抽取是否开启
  64. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  65. //价格个数抽取是否开启
  66. ju.IsPriceNumber, _ = ju.Config["pricenumber"].(bool)
  67. //附件抽取是否开启
  68. e.InitFile()
  69. query := t["query"]
  70. limit := qu.IntAll(t["limit"])
  71. list, _ := e.TaskInfo.FDB.Find(e.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  72. for _, v := range *list {
  73. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  74. continue
  75. }
  76. var j, jf *ju.Job
  77. var isSite bool
  78. if e.IsFileField && (v["projectinfo"] != nil ||v["attach_text"] != nil ){
  79. v["isextFile"] = true
  80. j, jf, isSite = e.PreInfo(v)
  81. } else {
  82. j, _, isSite = e.PreInfo(v)
  83. }
  84. go e.ExtractProcess(j, jf, isSite)
  85. e.TaskInfo.ProcessPool <- true
  86. }
  87. }
  88. func createFile(info map[string]interface{}) string {
  89. filename := ""
  90. if fields, ok := info["fields"].([]interface{}); ok {
  91. qfield := ""
  92. for k, v := range fields {
  93. if k == 0 {
  94. qfield += `"` + fmt.Sprint(v) + `":1`
  95. } else {
  96. qfield += `,"` + fmt.Sprint(v) + `":1`
  97. }
  98. }
  99. list_1, _ := db.Mgo.Find("task_export_result_v1", `{}`, nil, "{"+qfield+"}", false, -1, -1)
  100. result := map[string]map[string]interface{}{}
  101. for _, v := range *list_1 {
  102. id := qu.BsonIdToSId(v["_id"])
  103. tmp := map[string]interface{}{}
  104. for _, f := range fields {
  105. field := fmt.Sprint(f)
  106. tmp["v1_"+field] = qu.ObjToString(v[field])
  107. }
  108. result[id] = tmp
  109. }
  110. list_2, _ := db.Mgo.Find("task_export_result_v2", `{}`, nil, "{"+qfield+"}", false, -1, -1)
  111. for _, v := range *list_2 {
  112. id := qu.BsonIdToSId(v["_id"])
  113. tmp := map[string]interface{}{}
  114. if result[id] != nil {
  115. tmp = result[id]
  116. }
  117. for _, f := range fields {
  118. field := fmt.Sprint(f)
  119. tmp["v2_"+field] = qu.ObjToString(v[field])
  120. }
  121. result[id] = tmp
  122. }
  123. sheetmaps := map[string][]map[string]interface{}{}
  124. for _, f := range fields {
  125. field := fmt.Sprint(f)
  126. tmps := []map[string]interface{}{}
  127. for _, v := range result {
  128. tmp := map[string]interface{}{
  129. "v1": v["v1_"+field],
  130. "v2": v["v2_"+field],
  131. }
  132. tmps = append(tmps, tmp)
  133. }
  134. sheetmaps[field] = tmps
  135. }
  136. fx := xlsx.NewFile()
  137. for k, v := range sheetmaps {
  138. sheet := xlsx.Sheet{}
  139. title := sheet.AddRow()
  140. title.AddCell().SetString(fmt.Sprint(info["v1name"]))
  141. title.AddCell().SetString(fmt.Sprint(info["v2name"]))
  142. for _, d := range v {
  143. row := sheet.AddRow()
  144. row.AddCell().SetString(fmt.Sprint(d["v1"]))
  145. row.AddCell().SetString(fmt.Sprint(d["v2"]))
  146. }
  147. fx.AppendSheet(sheet, k)
  148. }
  149. t := time.Now()
  150. filename = fmt.Sprint(info["name"]) + "_" + t.Format("20060102150405") + ".xlsx"
  151. fx.Save("./web/res/down/" + filename)
  152. log.Println("任务完成", filename)
  153. }
  154. return filename
  155. }