package front import ( "github.com/donnie4w/go-logger/logger" "go.mongodb.org/mongo-driver/bson/primitive" "mongodb" qu "qfw/util" "spider" "strings" . "task" "time" . "util" ) func (f *Front) Task() { defer qu.Catch() if f.Method() == "POST" { searchStr := f.GetString("search[value]") search := strings.TrimSpace(searchStr) start, _ := f.GetInteger("start") limit, _ := f.GetInteger("length") draw, _ := f.GetInteger("draw") query := map[string]interface{}{ "delete": false, } state, _ := f.GetInteger("state") if state != -1 { query["i_state"] = state } if search != "" { query["$or"] = []interface{}{ map[string]interface{}{"s_name": map[string]interface{}{"$regex": search}}, } } qu.Debug("query:", query) task, _ := Mgo.Find("task", query, map[string]interface{}{"_id": -1}, nil, false, start, limit) count := Mgo.Count("task", query) f.ServeJson(map[string]interface{}{ "draw": draw, "data": *task, "recordsFiltered": count, "recordsTotal": count, }) } else { f.T["datadb"] = DataBb f.T["datacoll"] = DataColl f.T["dataflows"] = FlowsArr f.Render("task/task.html", &f.T) } } func (f *Front) TaskSave() { defer qu.Catch() coll := f.GetString("coll") ok := false msg := "" if !RunningTask[coll] { //正在执行中的任务表 if coll == DataColl && !MgoDT.Del("extract", nil) { //如果是bidding数据,清理extract抽取表,bidding表手动清理 msg = "extract表清理失败" goto L } //if MgoDT.Count(coll, nil) == 0 { //先导数据再建任务 // msg = "表:" + coll + "无数据,请先导入数据再创建任务" // goto L //} user := f.GetSession("user").(map[string]interface{}) id := f.GetString("id") name := f.GetString("name") checkfields := f.GetString("checkfields") isbidding := f.GetString("isbidding") flows := f.GetString("flows") flowsArr := []string{} qu.Debug(flows, len(strings.Split(flows, ","))) if flows != "" { for _, flow := range strings.Split(flows, ",") { flowsArr = append(flowsArr, flow) } } qu.Debug(id, name, coll, isbidding, flows) query := map[string]interface{}{} set := map[string]interface{}{} set["s_name"] = name set["s_coll"] = coll set["s_checkfields"] = checkfields set["isbidding"] = isbidding == "是" set["flows"] = flowsArr if id != "" { query["_id"] = mongodb.StringTOBsonId(id) set["updatetime"] = time.Now().Unix() } else { query["_id"] = primitive.NewObjectID() set["s_db"] = DataBb set["l_createtime"] = time.Now().Unix() set["s_createuser"] = user["name"] set["i_state"] = 0 set["delete"] = false set["issend"] = false } ok = Mgo.Update("task", query, map[string]interface{}{"$set": set}, true, false) } else { msg = "表:" + coll + "已被占用" } L: f.ServeJson(map[string]interface{}{"rep": ok, "msg": msg}) } func (f *Front) TaskDel() { defer qu.Catch() id := f.GetString("id") qu.Debug(id) b := Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"delete": true}}) f.ServeJson(map[string]interface{}{"rep": b}) } func (f *Front) TaskStart() { defer qu.Catch() id := f.GetString("id") task, _ := Mgo.FindById("task", id, nil) qu.Debug("启动任务:", (*task)["s_name"], id) msg := "" msgType := 0 errCode := []string{} if len(*task) > 0 { coll := qu.ObjToString((*task)["s_coll"]) //校验coll中数据对应爬虫是否已弃用 errCode = CheckCodeState(coll) if len(errCode) == 0 { TaskLock.Lock() if RunningTask[coll] { msg = "表:" + coll + "已有任务正在运行,请稍后启动该任务" } else { //启动任务 //更新数据状态,设置默认字段state=0,times=0 query := map[string]interface{}{ "$or": []interface{}{ map[string]interface{}{"state": map[string]interface{}{"$exists": false}}, //首次执行任务检索 map[string]interface{}{"state": -1}}, //任务重启检索 } MgoDT.Update(coll, query, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}}, false, true) if MgoDT.Count(coll, map[string]interface{}{"state": 0}) > 0 { //更新任务状态 Mgo.UpdateById("task", id, map[string]interface{}{"$set": map[string]interface{}{"i_state": 1, "issend": false, "l_starttime": time.Now().Unix()}}) RunningTask[coll] = true t := &Task{ ID: mongodb.BsonIdToSId((*task)["_id"]), Name: qu.ObjToString((*task)["s_name"]), DB: qu.ObjToString((*task)["s_db"]), Coll: qu.ObjToString((*task)["s_coll"]), IsBidding: (*task)["isbidding"].(bool), Flows: qu.ObjArrToStringArr((*task)["flows"].([]interface{})), SpiderScriptMap: map[string]*spider.Spider{}, } fieldsMap := map[string]bool{} if checkFields, ok := (*task)["s_checkfields"].(string); ok && checkFields != "" { if fieldsArr := strings.Split(checkFields, ","); len(fieldsArr) >= 1 { for _, field := range fieldsArr { fieldsMap[field] = true } } } t.CheckFields = fieldsMap go t.StartTask() //启动任务 } else { msg = "表:" + coll + "无有效数据" } } TaskLock.Unlock() } else { msg = "启动任务失败,重采数据中:" + strings.Join(errCode, ",") + "爬虫已失效,是否删除对应数据!" msgType = 1 } } else { msg = "启动任务失败,未找到该任务" } if msg != "" { logger.Info("任务启动失败:", (*task)["s_name"], id) } f.ServeJson(map[string]interface{}{"msg": msg, "msgtype": msgType, "errcode": strings.Join(errCode, ",")}) } func CheckCodeState(coll string) (errCode []string) { defer qu.Catch() match := map[string]interface{}{ "state": map[string]interface{}{ "$ne": 1, }, } group := map[string]interface{}{ "_id": "$spidercode", } p := []map[string]interface{}{ map[string]interface{}{"$match": match}, map[string]interface{}{"$group": group}, } sess := MgoDT.GetMgoConn() defer MgoDT.DestoryMongoConn(sess) it2 := sess.DB(MgoDT.DbName).C(coll).Pipe(p).Iter() n2 := 0 for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ { code := qu.ObjToString(tmp["_id"]) //爬虫代码 q := map[string]interface{}{ "code": code, "state": 5, "platform": "golua平台", } if MgoE.Count("luaconfig", q) == 0 { errCode = append(errCode, code) } tmp = map[string]interface{}{} } return }