task.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package front
  2. import (
  3. "github.com/donnie4w/go-logger/logger"
  4. "go.mongodb.org/mongo-driver/bson/primitive"
  5. "mongodb"
  6. qu "qfw/util"
  7. "spider"
  8. "strings"
  9. . "task"
  10. "time"
  11. . "util"
  12. )
  13. func (f *Front) Task() {
  14. defer qu.Catch()
  15. if f.Method() == "POST" {
  16. searchStr := f.GetString("search[value]")
  17. search := strings.TrimSpace(searchStr)
  18. start, _ := f.GetInteger("start")
  19. limit, _ := f.GetInteger("length")
  20. draw, _ := f.GetInteger("draw")
  21. query := map[string]interface{}{
  22. "delete": false,
  23. }
  24. state, _ := f.GetInteger("state")
  25. if state != -1 {
  26. query["i_state"] = state
  27. }
  28. if search != "" {
  29. query["$or"] = []interface{}{
  30. map[string]interface{}{"s_name": map[string]interface{}{"$regex": search}},
  31. }
  32. }
  33. qu.Debug("query:", query)
  34. task, _ := Mgo.Find("task", query, map[string]interface{}{"_id": -1}, nil, false, start, limit)
  35. count := Mgo.Count("task", query)
  36. f.ServeJson(map[string]interface{}{
  37. "draw": draw,
  38. "data": *task,
  39. "recordsFiltered": count,
  40. "recordsTotal": count,
  41. })
  42. } else {
  43. f.T["datadb"] = DataBb
  44. f.T["datacoll"] = DataColl
  45. f.T["dataflows"] = FlowsArr
  46. f.Render("task/task.html", &f.T)
  47. }
  48. }
  49. func (f *Front) TaskSave() {
  50. defer qu.Catch()
  51. coll := f.GetString("coll")
  52. ok := false
  53. msg := ""
  54. if !RunningTask[coll] { //正在执行中的任务表
  55. if coll == DataColl && !MgoDT.Del("extract", nil) { //如果是bidding数据,清理extract抽取表,bidding表手动清理
  56. msg = "extract表清理失败"
  57. goto L
  58. }
  59. //if MgoDT.Count(coll, nil) == 0 { //先导数据再建任务
  60. // msg = "表:" + coll + "无数据,请先导入数据再创建任务"
  61. // goto L
  62. //}
  63. user := f.GetSession("user").(map[string]interface{})
  64. id := f.GetString("id")
  65. name := f.GetString("name")
  66. checkfields := f.GetString("checkfields")
  67. isbidding := f.GetString("isbidding")
  68. flows := f.GetString("flows")
  69. flowsArr := []string{}
  70. qu.Debug(flows, len(strings.Split(flows, ",")))
  71. if flows != "" {
  72. for _, flow := range strings.Split(flows, ",") {
  73. flowsArr = append(flowsArr, flow)
  74. }
  75. }
  76. qu.Debug(id, name, coll, isbidding, flows)
  77. query := map[string]interface{}{}
  78. set := map[string]interface{}{}
  79. set["s_name"] = name
  80. set["s_coll"] = coll
  81. set["s_checkfields"] = checkfields
  82. set["isbidding"] = isbidding == "是"
  83. set["flows"] = flowsArr
  84. if id != "" {
  85. query["_id"] = mongodb.StringTOBsonId(id)
  86. set["updatetime"] = time.Now().Unix()
  87. } else {
  88. query["_id"] = primitive.NewObjectID()
  89. set["s_db"] = DataBb
  90. set["l_createtime"] = time.Now().Unix()
  91. set["s_createuser"] = user["name"]
  92. set["i_state"] = 0
  93. set["delete"] = false
  94. set["issend"] = false
  95. }
  96. ok = Mgo.Update("task", query, map[string]interface{}{"$set": set}, true, false)
  97. } else {
  98. msg = "表:" + coll + "已被占用"
  99. }
  100. L:
  101. f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg})
  102. }
  103. func (f *Front) TaskDel() {
  104. defer qu.Catch()
  105. id := f.GetString("id")
  106. qu.Debug(id)
  107. b := Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"delete": true}})
  108. f.ServeJson(map[string]interface{}{"rep": b})
  109. }
  110. func (f *Front) TaskStart() {
  111. defer qu.Catch()
  112. id := f.GetString("id")
  113. task, _ := Mgo.FindById("task", id, nil)
  114. qu.Debug("启动任务:", (*task)["s_name"], id)
  115. msg := ""
  116. msgType := 0
  117. errCode := []string{}
  118. if len(*task) > 0 {
  119. coll := qu.ObjToString((*task)["s_coll"])
  120. //校验coll中数据对应爬虫是否已弃用
  121. errCode = CheckCodeState(coll)
  122. if len(errCode) == 0 {
  123. TaskLock.Lock()
  124. if RunningTask[coll] {
  125. msg = "表:" + coll + "已有任务正在运行,请稍后启动该任务"
  126. } else { //启动任务
  127. //更新数据状态,设置默认字段state=0,times=0
  128. query := map[string]interface{}{
  129. "$or": []interface{}{
  130. map[string]interface{}{"state": map[string]interface{}{"$exists": false}}, //首次执行任务检索
  131. map[string]interface{}{"state": -1}}, //任务重启检索
  132. }
  133. MgoDT.Update(coll, query, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}}, false, true)
  134. if MgoDT.Count(coll, map[string]interface{}{"state": 0}) > 0 {
  135. //更新任务状态
  136. Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"i_state": 1, "issend": false, "l_starttime": time.Now().Unix()}})
  137. RunningTask[coll] = true
  138. t := &Task{
  139. ID: mongodb.BsonIdToSId((*task)["_id"]),
  140. Name: qu.ObjToString((*task)["s_name"]),
  141. DB: qu.ObjToString((*task)["s_db"]),
  142. Coll: qu.ObjToString((*task)["s_coll"]),
  143. IsBidding: (*task)["isbidding"].(bool),
  144. Flows: qu.ObjArrToStringArr((*task)["flows"].([]interface{})),
  145. SpiderScriptMap: map[string]*spider.Spider{},
  146. }
  147. fieldsMap := map[string]bool{}
  148. if checkFields, ok := (*task)["s_checkfields"].(string); ok && checkFields != "" {
  149. if fieldsArr := strings.Split(checkFields, ","); len(fieldsArr) >= 1 {
  150. for _, field := range fieldsArr {
  151. fieldsMap[field] = true
  152. }
  153. }
  154. }
  155. t.CheckFields = fieldsMap
  156. go t.StartTask() //启动任务
  157. } else {
  158. msg = "表:" + coll + "无有效数据"
  159. }
  160. }
  161. TaskLock.Unlock()
  162. } else {
  163. msg = "启动任务失败,重采数据中:" + strings.Join(errCode, ",") + "爬虫已失效,是否删除对应数据!"
  164. msgType = 1
  165. }
  166. } else {
  167. msg = "启动任务失败,未找到该任务"
  168. }
  169. if msg != "" {
  170. logger.Info("任务启动失败:", (*task)["s_name"], id)
  171. }
  172. f.ServeJson(map[string]interface{}{"msg": msg, "msgtype": msgType, "errcode": strings.Join(errCode, ",")})
  173. }
  174. func CheckCodeState(coll string) (errCode []string) {
  175. defer qu.Catch()
  176. match := map[string]interface{}{
  177. "state": map[string]interface{}{
  178. "$ne": 1,
  179. },
  180. }
  181. group := map[string]interface{}{
  182. "_id": "$spidercode",
  183. }
  184. p := []map[string]interface{}{
  185. map[string]interface{}{"$match": match},
  186. map[string]interface{}{"$group": group},
  187. }
  188. sess := MgoDT.GetMgoConn()
  189. defer MgoDT.DestoryMongoConn(sess)
  190. it2 := sess.DB(MgoDT.DbName).C(coll).Pipe(p).Iter()
  191. n2 := 0
  192. for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
  193. code := qu.ObjToString(tmp["_id"]) //爬虫代码
  194. q := map[string]interface{}{
  195. "code": code,
  196. "state": 5,
  197. "platform": "golua平台",
  198. }
  199. if MgoE.Count("luaconfig", q) == 0 {
  200. errCode = append(errCode, code)
  201. }
  202. tmp = map[string]interface{}{}
  203. }
  204. return
  205. }