|
- package front
- import (
- "github.com/donnie4w/go-logger/logger"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "mongodb"
- qu "qfw/util"
- "spider"
- "strings"
- . "task"
- "time"
- . "util"
- )
- func (f *Front) Task() {
- defer qu.Catch()
- if f.Method() == "POST" {
- searchStr := f.GetString("search[value]")
- search := strings.TrimSpace(searchStr)
- start, _ := f.GetInteger("start")
- limit, _ := f.GetInteger("length")
- draw, _ := f.GetInteger("draw")
- query := map[string]interface{}{
- "delete": false,
- }
- state, _ := f.GetInteger("state")
- if state != -1 {
- query["i_state"] = state
- }
- if search != "" {
- query["$or"] = []interface{}{
- map[string]interface{}{"s_name": map[string]interface{}{"$regex": search}},
- }
- }
- qu.Debug("query:", query)
- task, _ := Mgo.Find("task", query, map[string]interface{}{"_id": -1}, nil, false, start, limit)
- count := Mgo.Count("task", query)
- f.ServeJson(map[string]interface{}{
- "draw": draw,
- "data": *task,
- "recordsFiltered": count,
- "recordsTotal": count,
- })
- } else {
- f.T["datadb"] = DataBb
- f.T["datacoll"] = DataColl
- f.T["dataflows"] = FlowsArr
- f.Render("task/task.html", &f.T)
- }
- }
- func (f *Front) TaskSave() {
- defer qu.Catch()
- coll := f.GetString("coll")
- ok := false
- msg := ""
- if !RunningTask[coll] { //正在执行中的任务表
- if coll == DataColl && !MgoDT.Del("extract", nil) { //如果是bidding数据,清理extract抽取表,bidding表手动清理
- msg = "extract表清理失败"
- goto L
- }
- //if MgoDT.Count(coll, nil) == 0 { //先导数据再建任务
- // msg = "表:" + coll + "无数据,请先导入数据再创建任务"
- // goto L
- //}
- user := f.GetSession("user").(map[string]interface{})
- id := f.GetString("id")
- name := f.GetString("name")
- checkfields := f.GetString("checkfields")
- isbidding := f.GetString("isbidding")
- flows := f.GetString("flows")
- flowsArr := []string{}
- qu.Debug(flows, len(strings.Split(flows, ",")))
- if flows != "" {
- for _, flow := range strings.Split(flows, ",") {
- flowsArr = append(flowsArr, flow)
- }
- }
- qu.Debug(id, name, coll, isbidding, flows)
- query := map[string]interface{}{}
- set := map[string]interface{}{}
- set["s_name"] = name
- set["s_coll"] = coll
- set["s_checkfields"] = checkfields
- set["isbidding"] = isbidding == "是"
- set["flows"] = flowsArr
- if id != "" {
- query["_id"] = mongodb.StringTOBsonId(id)
- set["updatetime"] = time.Now().Unix()
- } else {
- query["_id"] = primitive.NewObjectID()
- set["s_db"] = DataBb
- set["l_createtime"] = time.Now().Unix()
- set["s_createuser"] = user["name"]
- set["i_state"] = 0
- set["delete"] = false
- set["issend"] = false
- }
- ok = Mgo.Update("task", query, map[string]interface{}{"$set": set}, true, false)
- } else {
- msg = "表:" + coll + "已被占用"
- }
- L:
- f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg})
- }
- func (f *Front) TaskDel() {
- defer qu.Catch()
- id := f.GetString("id")
- qu.Debug(id)
- b := Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"delete": true}})
- f.ServeJson(map[string]interface{}{"rep": b})
- }
- func (f *Front) TaskStart() {
- defer qu.Catch()
- id := f.GetString("id")
- task, _ := Mgo.FindById("task", id, nil)
- qu.Debug("启动任务:", (*task)["s_name"], id)
- msg := ""
- msgType := 0
- errCode := []string{}
- if len(*task) > 0 {
- coll := qu.ObjToString((*task)["s_coll"])
- //校验coll中数据对应爬虫是否已弃用
- errCode = CheckCodeState(coll)
- if len(errCode) == 0 {
- TaskLock.Lock()
- if RunningTask[coll] {
- msg = "表:" + coll + "已有任务正在运行,请稍后启动该任务"
- } else { //启动任务
- //更新数据状态,设置默认字段state=0,times=0
- query := map[string]interface{}{
- "$or": []interface{}{
- map[string]interface{}{"state": map[string]interface{}{"$exists": false}}, //首次执行任务检索
- map[string]interface{}{"state": -1}}, //任务重启检索
- }
- MgoDT.Update(coll, query, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}}, false, true)
- if MgoDT.Count(coll, map[string]interface{}{"state": 0}) > 0 {
- //更新任务状态
- Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"i_state": 1, "issend": false, "l_starttime": time.Now().Unix()}})
- RunningTask[coll] = true
- t := &Task{
- ID: mongodb.BsonIdToSId((*task)["_id"]),
- Name: qu.ObjToString((*task)["s_name"]),
- DB: qu.ObjToString((*task)["s_db"]),
- Coll: qu.ObjToString((*task)["s_coll"]),
- IsBidding: (*task)["isbidding"].(bool),
- Flows: qu.ObjArrToStringArr((*task)["flows"].([]interface{})),
- SpiderScriptMap: map[string]*spider.Spider{},
- }
- fieldsMap := map[string]bool{}
- if checkFields, ok := (*task)["s_checkfields"].(string); ok && checkFields != "" {
- if fieldsArr := strings.Split(checkFields, ","); len(fieldsArr) >= 1 {
- for _, field := range fieldsArr {
- fieldsMap[field] = true
- }
- }
- }
- t.CheckFields = fieldsMap
- go t.StartTask() //启动任务
- } else {
- msg = "表:" + coll + "无有效数据"
- }
- }
- TaskLock.Unlock()
- } else {
- msg = "启动任务失败,重采数据中:" + strings.Join(errCode, ",") + "爬虫已失效,是否删除对应数据!"
- msgType = 1
- }
- } else {
- msg = "启动任务失败,未找到该任务"
- }
- if msg != "" {
- logger.Info("任务启动失败:", (*task)["s_name"], id)
- }
- f.ServeJson(map[string]interface{}{"msg": msg, "msgtype": msgType, "errcode": strings.Join(errCode, ",")})
- }
- func CheckCodeState(coll string) (errCode []string) {
- defer qu.Catch()
- match := map[string]interface{}{
- "state": map[string]interface{}{
- "$ne": 1,
- },
- }
- group := map[string]interface{}{
- "_id": "$spidercode",
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group},
- }
- sess := MgoDT.GetMgoConn()
- defer MgoDT.DestoryMongoConn(sess)
- it2 := sess.DB(MgoDT.DbName).C(coll).Pipe(p).Iter()
- n2 := 0
- for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
- code := qu.ObjToString(tmp["_id"]) //爬虫代码
- q := map[string]interface{}{
- "code": code,
- "state": 5,
- "platform": "golua平台",
- }
- if MgoE.Count("luaconfig", q) == 0 {
- errCode = append(errCode, code)
- }
- tmp = map[string]interface{}{}
- }
- return
- }
|