package front import ( "fmt" "io/ioutil" "net/http" qu "qfw/util" "sort" "spider" util "spiderutil" "strings" "time" u "util" "github.com/go-xweb/xweb" ) type LuaMove struct { *xweb.Action luaMove xweb.Mapper `xweb:"/center/luamove"` //站点列表 luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` // closeAll xweb.Mapper `xweb:"/center/luamove/closeall"` //关闭所有爬虫 updateToEvent xweb.Mapper `xweb:"/center/luamove/updatetoevent"` //更新目标节点 } func (lm *LuaMove) LuaMove() { defer qu.Catch() auth := qu.IntAll(lm.GetSession("auth")) if auth == u.Role_Admin { if lm.Method() == "GET" { events := []string{} for k, _ := range util.Config.Uploadevents { events = append(events, k) } sort.Strings(events) lm.T["events"] = events lm.Render("luamovelist.html") } else { state, _ := lm.GetInteger("state") fromevent, _ := lm.GetInteger("fromevent") toevent, _ := lm.GetInteger("toevent") ismove, _ := lm.GetInteger("ismove") start, _ := lm.GetInteger("start") limit, _ := lm.GetInteger("length") draw, _ := lm.GetInteger("draw") searchStr := lm.GetString("search[value]") search := strings.TrimSpace(searchStr) query := map[string]interface{}{} if state != -1 { //是否已处理筛选 query["state"] = state } if fromevent != -1 { //历史节点筛选 query["fromevent"] = fromevent } if toevent != -1 { //目标节点筛选 query["toevent"] = toevent } if ismove != -1 { //是否转移筛选 query["ismove"] = ismove == 1 } if search != "" { query["$or"] = []interface{}{ map[string]interface{}{"code": map[string]interface{}{"$regex": search}}, map[string]interface{}{"site": map[string]interface{}{"$regex": search}}, } } year, month, _ := time.Now().Date() //本月1日时间戳 firstOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix() query["comeintime"] = map[string]interface{}{ //本月的信息 "$gte": firstOfMonth, } sort := `{"%s":%d}` orderIndex := lm.GetString("order[0][column]") orderName := lm.GetString(fmt.Sprintf("columns[%s][data]", orderIndex)) orderType := 1 if lm.GetString("order[0][dir]") != "asc" { orderType = -1 } sort = fmt.Sprintf(sort, orderName, orderType) count := u.MgoEB.Count("luamovevent", query) qu.Debug("query:", query, sort, count) data, _ := u.MgoEB.Find("luamovevent", query, sort, nil, false, start, limit) for _, d := range *data { lua, _ := u.MgoEB.FindOneByField("luaconfig", map[string]interface{}{"code": d["code"]}, `{"modifytime":1,"event":1}`) d["modifytime"] = (*lua)["modifytime"] d["nowevent"] = (*lua)["event"] d["encode"] = util.Se.Encode2Hex(qu.ObjToString(d["code"])) } lm.ServeJson(map[string]interface{}{ "draw": draw, "data": data, "recordsFiltered": count, "recordsTotal": count, }) } } else { lm.Write("您没有权限") } } func (lm *LuaMove) LuaMoveByCode() { defer qu.Catch() stype := lm.GetString("stype") code := lm.GetString("code") movevent := lm.GetString("movevent") events := strings.Split(movevent, ",") codes := strings.Split(code, ",") ok := false if stype == "move" { //迁移 if len(codes) != len(events) { qu.Debug("爬虫个数和节点个数不匹配:", codes, events) } else { ok = SpiderMoveLua(codes, events) //节点转移 } } else if stype == "close" { //关闭,更新数据状态 arr := [][]map[string]interface{}{} for _, code := range codes { arr = append(arr, []map[string]interface{}{ map[string]interface{}{"code": code}, map[string]interface{}{ "$set": map[string]interface{}{ "state": 1, "updatetime": time.Now().Unix(), }, }, }) } ok = u.MgoEB.UpdateBulk("luamovevent", arr...) arr = [][]map[string]interface{}{} } lm.ServeJson(map[string]interface{}{"ok": ok}) } func (lm *LuaMove) CloseAll() { defer qu.Catch() //ok := u.MgoEB.Update("luamovevent", map[string]interface{}{"state": 0}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}}, false, true) ok := u.MgoEB.Del("luamovevent", nil) lm.ServeJson(map[string]interface{}{"ok": ok}) } func (lm *LuaMove) UpdateToEvent() { defer qu.Catch() id := lm.GetString("id") event, _ := lm.GetInteger("event") qu.Debug(id, event) ok := u.MgoEB.UpdateById("luamovevent", id, map[string]interface{}{"$set": map[string]interface{}{"toevent": event, "updatetime": time.Now().Unix()}}) lm.ServeJson(map[string]interface{}{"ok": ok}) } // 爬虫迁移 func SpiderMoveLua(codes []string, events []string) bool { defer qu.Catch() msg := []string{} for index, code := range codes { resultEvent := events[index] qu.Debug("爬虫节点转移:", code, resultEvent) lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code}) if len(*lua) > 0 { state := qu.IntAll((*lua)["state"]) event := qu.IntAll((*lua)["event"]) if state == 5 { //查询该爬虫是否是已上架状态,是则更新节点后上架,否则只更新 upresult := true var err interface{} upresult, err = spider.UpdateSpiderByCodeState(code, "6", event, false) //脚本下架 if upresult && err == nil { //下架成功,更新节点 re := qu.IntAll(resultEvent) u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": re, "incrementevent": re}}, false, false) //上架 upresult, err = spider.UpdateSpiderByCodeState(code, "5", re, false) //脚本上架 if !upresult || err != nil { qu.Debug("爬虫节点转移", code, "上架失败") msg = append(msg, "爬虫节点转移"+code+"上架失败") } } else { qu.Debug("爬虫节点转移", code, "下架失败") msg = append(msg, "爬虫节点转移"+code+"下架失败") } } u.MgoEB.Update("luamovevent", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}}, false, false) } else { msg = append(msg, "爬虫节点转移未找到爬虫"+code) } } if len(msg) > 0 { SendMail(strings.Join(msg, ";")) } return true } func SendMail(text string) { defer qu.Catch() for i := 1; i <= 3; i++ { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text)) if err == nil { res.Body.Close() read, err := ioutil.ReadAll(res.Body) qu.Debug("邮件发送:", string(read), err) break } } } /*func SpiderMoveEvent(data string) { //解析爬虫代码 data = util.Se.DecodeString(data) infos := []interface{}{} err := json.Unmarshal([]byte(data), &infos) if err != nil { qu.Debug("历史迁移到增量节点失败:", data) return } code := qu.ObjToString(infos[0]) //迁移节点并上架 //lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code}) spidertype := qu.ObjToString((*lua)["spidertype"]) event := qu.IntAll((*lua)["event"]) var upresult bool set := map[string]interface{}{} qu.Debug("lua move:", code, event) if spidertype == "history" { newevent := GetEvent(code, (*lua)) qu.Debug("new event:", newevent) set["event"] = newevent set["spidertype"] = "increment" type_content, _ := (*lua)["type_content"].(int) iscopycontent, _ := (*lua)["iscopycontent"].(bool) str_content := qu.ObjToString((*lua)["str_content"]) str_recontent := qu.ObjToString((*lua)["str_recontent"]) if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码 set["iscopycontent"] = false set["str_content"] = str_recontent set["str_recontent"] = str_content } //if mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) { // upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架 //} if u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) { upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架 } } ok := false if upresult && err == nil { //上架成功 ok = true qu.Debug("Code:", code, "历史迁移到增量节点成功") } else { //上架失败 //mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false) u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false) qu.Debug("Code:", code, "历史迁移到增量节点失败") } u.MgoEB.Save("luamovelog", map[string]interface{}{ "code": code, "comeintime": time.Now().Unix(), "type": "movevent", "ok": ok, }) } // func GetEvent(code string, lua map[string]interface{}) int { defer qu.Catch() //1、历史节点 if lua["incrementevent"] != nil { return qu.IntAll(lua["incrementevent"]) } //2、根据站点找节点 param_common := lua["param_common"].([]interface{}) site := qu.ObjToString(param_common[1]) query := map[string]interface{}{ "code": map[string]interface{}{ "$ne": code, }, "param_common.1": site, "state": 5, } //tmp := *mgdb.FindOne("luaconfig", query) tmp, _ := u.MgoEB.FindOne("luaconfig", query) if tmp != nil && len(*tmp) > 0 { return qu.IntAll((*tmp)["event"]) } //3、7700 spidermovevent := qu.ObjToString(lua["spidermovevent"]) if spidermovevent == "7700" { return 7700 } //4、根据数量分配节点 num := 0 result := 7700 for k, t := range util.Config.Uploadevents { if qu.ObjToString(t) == spidermovevent { //bid、comm event := qu.IntAll(k) //count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event}) count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event}) if num == 0 || count < num { result = event num = count } } } return result }*/