package front import ( "encoding/json" "fmt" "io/ioutil" "luaweb/spider" "net/http" qu "qfw/util" mgdb "qfw/util/mongodb" "sort" util "spiderutil" "strings" "time" "github.com/go-xweb/xweb" "gopkg.in/mgo.v2/bson" ) type LuaMove struct { *xweb.Action luaMoveManager xweb.Mapper `xweb:"/center/luamove"` //站点列表 luaMoveSite xweb.Mapper `xweb:"/center/luamove/site/(.*)"` // luaMoveCodeInfo xweb.Mapper `xweb:"/center/luamove/codeinfo"` //爬虫列表 luaMoveBySite xweb.Mapper `xweb:"/center/luamove/luamovebysite"` //按站点迁移、关闭 luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //按爬虫迁移、关闭 updateEventBySite xweb.Mapper `xweb:"/center/spider/updateventbysite"` //更新某站点下爬虫的节点 } type OtherBase struct { IsFlow int //爬虫所采集数据是否参与数据流程标识 SpiderType string //爬虫类型:increment增量;history历史 SpiderHistoryMaxPage int //采集历史数据时的采集最大页 SpiderMoveEvent string //爬虫采集完历史后要转移到的节点 comm:队列模式、bid:高性能模式、7700 } func (lm *LuaMove) LuaMoveManager() { auth := qu.IntAll(lm.GetSession("auth")) if auth == role_admin { if lm.Method() == "GET" { events := []string{} for k, _ := range util.Config.Uploadevents { events = append(events, k) } sort.Strings(events) lm.T["events"] = events lm.Render("luamovesite.html") } else { query := bson.M{"lm_ismove": true} lualist := *mgdb.Find("luaconfig", query, nil, nil, false, -1, -1) data := []map[string]interface{}{} siteMap := map[string][]map[string]interface{}{} for _, l := range lualist { pc := l["param_common"].([]interface{}) site := qu.ObjToString(pc[1]) href := "" if len(pc) >= 12 { href = qu.ObjToString(pc[11]) } lm_movevent := l["lm_movevent"] event := l["event"] if tmp := siteMap[site]; tmp == nil { data = append(data, map[string]interface{}{"site": site, "lm_movevent": lm_movevent, "event": event}) siteMap[site] = []map[string]interface{}{ map[string]interface{}{ "code": l["code"], "channel": pc[2], "event": event, "lm_movevent": lm_movevent, "href": href, "lm_download": l["lm_download"], "state": l["state"], }, } } else { tmp = append(tmp, map[string]interface{}{ "code": l["code"], "channel": pc[2], "event": event, "lm_movevent": lm_movevent, "href": href, "lm_download": l["lm_download"], "state": l["state"], }) siteMap[site] = tmp } } lm.SetSession("sitemap", siteMap) lm.ServeJson(map[string]interface{}{"data": data}) } } else { lm.Write("您没有权限") } } func (lm *LuaMove) LuaMoveSite(site string) { defer qu.Catch() if lm.Method() == "GET" { events := []string{} for k, _ := range util.Config.Uploadevents { events = append(events, k) } sort.Strings(events) lm.T["events"] = events lm.T["site"] = site lm.Render("luamovecode.html", &lm.T) } } func (lm *LuaMove) LuaMoveCodeInfo() { site := lm.GetString("site") sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{}) lm.ServeJson(map[string]interface{}{"data": sitemap[site]}) } func (lm *LuaMove) LuaMoveBySite() { defer qu.Catch() site := lm.GetString("site") stype := lm.GetString("stype") //movevent, _ := lm.GetInteger("movevent") sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{}) codesinfo := sitemap[site] codes := []string{} for _, info := range codesinfo { code := qu.ObjToString(info["code"]) codes = append(codes, code) } ok := false if stype == "move" { //迁移 ok = SpiderMoveLua(codes) } else if stype == "close" { //关闭 ok, _ = lm.SpiderCloseMoveLua(site, codes, "site") } lm.ServeJson(map[string]interface{}{"ok": ok}) } func (lm *LuaMove) LuaMoveByCode() { defer qu.Catch() site := lm.GetString("site") code := lm.GetString("code") stype := lm.GetString("stype") //movevent, _ := lm.GetInteger("movevent") codes := strings.Split(code, ",") ok := false flush := false if stype == "move" { //迁移 ok = SpiderMoveLua(codes) } else if stype == "close" { //关闭 ok, flush = lm.SpiderCloseMoveLua(site, codes, "code") } qu.Debug(ok) lm.ServeJson(map[string]interface{}{"ok": ok, "flush": flush}) } func (lm *LuaMove) UpdateEventBySite() { defer qu.Catch() site := lm.GetString("site") movevent, _ := lm.GetInteger("movevent") sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{}) codes := []string{} for _, info := range sitemap[site] { code := qu.ObjToString(info["code"]) codes = append(codes, code) info["lm_movevent"] = movevent } lm.SetSession("sitemap", sitemap) query := bson.M{ "code": bson.M{ "$in": codes, }, } set := bson.M{ "$set": bson.M{ "lm_movevent": movevent, }, } mgdb.Update("luaconfig", query, set, false, true) } //爬虫迁移 func SpiderMoveLua(codes []string) bool { defer qu.Catch() for _, code := range codes { lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) movevent := qu.IntAll(lua["lm_movevent"]) event := qu.IntAll(lua["event"]) state := qu.IntAll(lua["state"]) upresult := true var err interface{} if state < 7 || state == 4 { //无发布、需登录、无法处理、已删除状态无需下架 //下架 upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架 } if upresult && err == nil { //下架成功,更新节点 //更新节点 query := bson.M{ "code": code, } set := bson.M{ "$set": bson.M{ "event": movevent, }, } mgdb.Update("luaconfig", query, set, false, false) //上架 if state == 5 { //只有是已上架状态的爬虫上架 upresult, err = spider.UpdateSpiderByCodeState(code, "5", movevent) //脚本上架 if !upresult || err != nil { SendMail("爬虫定期节点迁移" + code + "上架失败") qu.Debug("定期节点迁移", code, "上架失败") return false } } } else { SendMail("爬虫定期节点迁移" + code + "下架失败") qu.Debug("定期节点迁移", code, "下架失败") return false } } return true } //更新爬虫是否要迁移的状态 func (lm *LuaMove) SpiderCloseMoveLua(site string, codes []string, by string) (bool, bool) { defer qu.Catch() flush := false query := bson.M{ "code": bson.M{ "$in": codes, }, } set := bson.M{ "$set": bson.M{ "lm_ismove": false, }, } sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{}) //清除session if by == "code" { tmpArr := []map[string]interface{}{} infoArr := sitemap[site] for _, info := range infoArr { flag := false for _, code := range codes { if qu.ObjToString(info["code"]) == code { flag = true break } } if !flag { tmpArr = append(tmpArr, info) } } if len(tmpArr) == 0 { delete(sitemap, site) flush = true } else { sitemap[site] = tmpArr } lm.SetSession("sitemap", sitemap) } return mgdb.Update("luaconfig", query, set, false, true), flush } // func SpiderMoveEvent(data string) { //解析爬虫代码 data = util.Se.DecodeString(data) infos := []interface{}{} err := json.Unmarshal([]byte(data), &infos) if err != nil { qu.Debug("历史迁移到增量节点失败:", data) return } code := qu.ObjToString(infos[0]) //迁移节点并上架 lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) spidertype := qu.ObjToString(lua["spidertype"]) event := qu.IntAll(lua["event"]) var upresult bool qu.Debug("lua move:", code, event) if spidertype == "history" { newevent := GetEvent(code, lua) qu.Debug("new event:", newevent) mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": newevent, "spidertype": "increment"}}, false, false) upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架 } ok := false if upresult && err == nil { //上架成功 ok = true qu.Debug("Code:", code, "历史迁移到增量节点成功") } else { //上架失败 mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false) qu.Debug("Code:", code, "历史迁移到增量节点失败") } mgdb.Save("luamovelog", map[string]interface{}{ "code": code, "comeintime": time.Now().Unix(), "type": "movevent", "ok": ok, }) } // func GetEvent(code string, lua map[string]interface{}) int { defer qu.Catch() //1、历史节点 if lua["historyevent"] != nil { return qu.IntAll(lua["historyevent"]) } //2、根据站点找节点 param_common := lua["param_common"].([]interface{}) site := qu.ObjToString(param_common[1]) query := map[string]interface{}{ "code": map[string]interface{}{ "$ne": code, }, "param_common.1": site, "state": 5, } tmp := *mgdb.FindOne("luaconfig", query) if tmp != nil && len(tmp) > 0 { return qu.IntAll(tmp["event"]) } //3、7700 spidermovevent := qu.ObjToString(lua["spidermovevent"]) if spidermovevent == "7700" { return 7700 } //4、根据数量分配节点 num := 0 result := 7700 for k, t := range util.Config.Uploadevents { if qu.ObjToString(t) == spidermovevent { //bid、comm event := qu.IntAll(k) count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event}) if num == 0 || count < num { result = event num = count } } } return result } func SendMail(text string) { defer qu.Catch() for i := 1; i <= 3; i++ { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text)) if err == nil { res.Body.Close() read, err := ioutil.ReadAll(res.Body) qu.Debug("邮件发送:", string(read), err) break } } }