|
@@ -18,22 +18,12 @@ import (
|
|
|
|
|
|
type LuaMove struct {
|
|
|
*xweb.Action
|
|
|
- luaMoveManager xweb.Mapper `xweb:"/center/luamove"` //站点列表
|
|
|
- luaMoveSite xweb.Mapper `xweb:"/center/luamove/site/(.*)"` //
|
|
|
- luaMoveCodeInfo xweb.Mapper `xweb:"/center/luamove/codeinfo"` //爬虫列表
|
|
|
- luaMoveBySite xweb.Mapper `xweb:"/center/luamove/luamovebysite"` //按站点迁移、关闭
|
|
|
- luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //按爬虫迁移、关闭
|
|
|
- updateEventBySite xweb.Mapper `xweb:"/center/spider/updateventbysite"` //更新某站点下爬虫的节点
|
|
|
+ luaMove xweb.Mapper `xweb:"/center/luamove"` //站点列表
|
|
|
+ luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //
|
|
|
}
|
|
|
|
|
|
-type OtherBase struct {
|
|
|
- IsFlow int //爬虫所采集数据是否参与数据流程标识
|
|
|
- SpiderType string //爬虫类型:increment增量;history历史
|
|
|
- SpiderHistoryMaxPage int //采集历史数据时的采集最大页
|
|
|
- SpiderMoveEvent string //爬虫采集完历史后要转移到的节点 comm:队列模式、bid:高性能模式、7700
|
|
|
-}
|
|
|
-
|
|
|
-func (lm *LuaMove) LuaMoveManager() {
|
|
|
+func (lm *LuaMove) LuaMove() {
|
|
|
+ defer qu.Catch()
|
|
|
auth := qu.IntAll(lm.GetSession("auth"))
|
|
|
if auth == role_admin {
|
|
|
if lm.Method() == "GET" {
|
|
@@ -43,228 +33,149 @@ func (lm *LuaMove) LuaMoveManager() {
|
|
|
}
|
|
|
sort.Strings(events)
|
|
|
lm.T["events"] = events
|
|
|
- lm.Render("luamovesite.html")
|
|
|
+ lm.Render("luamovelist.html")
|
|
|
} else {
|
|
|
- query := map[string]interface{}{"lm_ismove": true}
|
|
|
- //lualist := *mgdb.Find("luaconfig", query, nil, nil, false, -1, -1)
|
|
|
- lualist, _ := u.MgoEB.Find("luaconfig", query, nil, nil, false, -1, -1)
|
|
|
- data := []map[string]interface{}{}
|
|
|
- siteMap := map[string][]map[string]interface{}{}
|
|
|
+ state, _ := lm.GetInteger("state")
|
|
|
+ fromevent, _ := lm.GetInteger("fromevent")
|
|
|
+ toevent, _ := lm.GetInteger("toevent")
|
|
|
+ ismove, _ := lm.GetInteger("ismove")
|
|
|
+ start, _ := lm.GetInteger("start")
|
|
|
+ limit, _ := lm.GetInteger("length")
|
|
|
+ draw, _ := lm.GetInteger("draw")
|
|
|
+ searchStr := lm.GetString("search[value]")
|
|
|
+ search := strings.TrimSpace(searchStr)
|
|
|
+ query := map[string]interface{}{}
|
|
|
+ if state != -1 { //是否已处理筛选
|
|
|
+ query["state"] = state
|
|
|
+ }
|
|
|
+ if fromevent != -1 { //历史节点筛选
|
|
|
+ query["fromevent"] = fromevent
|
|
|
+ }
|
|
|
+ if toevent != -1 { //目标节点筛选
|
|
|
+ query["toevent"] = toevent
|
|
|
+ }
|
|
|
+ if ismove != -1 { //是否转移筛选
|
|
|
+ query["ismove"] = ismove == 1
|
|
|
|
|
|
- for _, l := range *lualist {
|
|
|
- pc := l["param_common"].([]interface{})
|
|
|
- site := qu.ObjToString(pc[1])
|
|
|
- href := ""
|
|
|
- if len(pc) >= 12 {
|
|
|
- href = qu.ObjToString(pc[11])
|
|
|
- }
|
|
|
- lm_movevent := l["lm_movevent"]
|
|
|
- event := l["event"]
|
|
|
- if tmp := siteMap[site]; tmp == nil {
|
|
|
- data = append(data, map[string]interface{}{"site": site, "lm_movevent": lm_movevent, "event": event})
|
|
|
- siteMap[site] = []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "code": l["code"],
|
|
|
- "channel": pc[2],
|
|
|
- "event": event,
|
|
|
- "lm_movevent": lm_movevent,
|
|
|
- "href": href,
|
|
|
- "lm_download": l["lm_download"],
|
|
|
- "state": l["state"],
|
|
|
- },
|
|
|
- }
|
|
|
- } else {
|
|
|
- tmp = append(tmp, map[string]interface{}{
|
|
|
- "code": l["code"],
|
|
|
- "channel": pc[2],
|
|
|
- "event": event,
|
|
|
- "lm_movevent": lm_movevent,
|
|
|
- "href": href,
|
|
|
- "lm_download": l["lm_download"],
|
|
|
- "state": l["state"],
|
|
|
- })
|
|
|
- siteMap[site] = tmp
|
|
|
+ }
|
|
|
+ if search != "" {
|
|
|
+ query["$or"] = []interface{}{
|
|
|
+ map[string]interface{}{"code": map[string]interface{}{"$regex": search}},
|
|
|
+ map[string]interface{}{"site": map[string]interface{}{"$regex": search}},
|
|
|
}
|
|
|
}
|
|
|
- lm.SetSession("sitemap", siteMap)
|
|
|
- lm.ServeJson(map[string]interface{}{"data": data})
|
|
|
+ year, month, _ := time.Now().Date()
|
|
|
+ //本月1日时间戳
|
|
|
+ firstOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix()
|
|
|
+ query["comeintime"] = map[string]interface{}{ //本月的信息
|
|
|
+ "$gte": firstOfMonth,
|
|
|
+ }
|
|
|
+ sort := `{"%s":%d}`
|
|
|
+ orderIndex := lm.GetString("order[0][column]")
|
|
|
+ orderName := lm.GetString(fmt.Sprintf("columns[%s][data]", orderIndex))
|
|
|
+ orderType := 1
|
|
|
+ if lm.GetString("order[0][dir]") != "asc" {
|
|
|
+ orderType = -1
|
|
|
+ }
|
|
|
+ sort = fmt.Sprintf(sort, orderName, orderType)
|
|
|
+ count := u.MgoEB.Count("luamovevent", query)
|
|
|
+ qu.Debug("query:", query, sort, count)
|
|
|
+ data, _ := u.MgoEB.Find("luamovevent", query, sort, nil, false, start, limit)
|
|
|
+ lm.ServeJson(map[string]interface{}{
|
|
|
+ "draw": draw,
|
|
|
+ "data": data,
|
|
|
+ "recordsFiltered": count,
|
|
|
+ "recordsTotal": count,
|
|
|
+ })
|
|
|
}
|
|
|
} else {
|
|
|
lm.Write("您没有权限")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (lm *LuaMove) LuaMoveSite(site string) {
|
|
|
- defer qu.Catch()
|
|
|
- if lm.Method() == "GET" {
|
|
|
- events := []string{}
|
|
|
- for k, _ := range util.Config.Uploadevents {
|
|
|
- events = append(events, k)
|
|
|
- }
|
|
|
- sort.Strings(events)
|
|
|
- lm.T["events"] = events
|
|
|
- lm.T["site"] = site
|
|
|
- lm.Render("luamovecode.html", &lm.T)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (lm *LuaMove) LuaMoveCodeInfo() {
|
|
|
- site := lm.GetString("site")
|
|
|
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
|
|
|
- lm.ServeJson(map[string]interface{}{"data": sitemap[site]})
|
|
|
-}
|
|
|
-
|
|
|
-func (lm *LuaMove) LuaMoveBySite() {
|
|
|
- defer qu.Catch()
|
|
|
- site := lm.GetString("site")
|
|
|
- stype := lm.GetString("stype")
|
|
|
- //movevent, _ := lm.GetInteger("movevent")
|
|
|
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
|
|
|
- codesinfo := sitemap[site]
|
|
|
- codes := []string{}
|
|
|
- for _, info := range codesinfo {
|
|
|
- code := qu.ObjToString(info["code"])
|
|
|
- codes = append(codes, code)
|
|
|
- }
|
|
|
- ok := false
|
|
|
- if stype == "move" { //迁移
|
|
|
- ok = SpiderMoveLua(codes)
|
|
|
- } else if stype == "close" { //关闭
|
|
|
- ok, _ = lm.SpiderCloseMoveLua(site, codes, "site")
|
|
|
- }
|
|
|
- lm.ServeJson(map[string]interface{}{"ok": ok})
|
|
|
-}
|
|
|
-
|
|
|
func (lm *LuaMove) LuaMoveByCode() {
|
|
|
defer qu.Catch()
|
|
|
- site := lm.GetString("site")
|
|
|
- code := lm.GetString("code")
|
|
|
stype := lm.GetString("stype")
|
|
|
- //movevent, _ := lm.GetInteger("movevent")
|
|
|
+ code := lm.GetString("code")
|
|
|
+ movevent := lm.GetString("movevent")
|
|
|
+ events := strings.Split(movevent, ",")
|
|
|
codes := strings.Split(code, ",")
|
|
|
ok := false
|
|
|
- flush := false
|
|
|
if stype == "move" { //迁移
|
|
|
- ok = SpiderMoveLua(codes)
|
|
|
- } else if stype == "close" { //关闭
|
|
|
- ok, flush = lm.SpiderCloseMoveLua(site, codes, "code")
|
|
|
- }
|
|
|
- qu.Debug(ok)
|
|
|
- lm.ServeJson(map[string]interface{}{"ok": ok, "flush": flush})
|
|
|
-}
|
|
|
-
|
|
|
-func (lm *LuaMove) UpdateEventBySite() {
|
|
|
- defer qu.Catch()
|
|
|
- site := lm.GetString("site")
|
|
|
- movevent, _ := lm.GetInteger("movevent")
|
|
|
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
|
|
|
- codes := []string{}
|
|
|
- for _, info := range sitemap[site] {
|
|
|
- code := qu.ObjToString(info["code"])
|
|
|
- codes = append(codes, code)
|
|
|
- info["lm_movevent"] = movevent
|
|
|
- }
|
|
|
- lm.SetSession("sitemap", sitemap)
|
|
|
- query := map[string]interface{}{
|
|
|
- "code": map[string]interface{}{
|
|
|
- "$in": codes,
|
|
|
- },
|
|
|
- }
|
|
|
- set := map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "lm_movevent": movevent,
|
|
|
- },
|
|
|
+ if len(codes) != len(events) {
|
|
|
+ qu.Debug("爬虫个数和节点个数不匹配:", codes, events)
|
|
|
+ } else {
|
|
|
+ ok = SpiderMoveLua(codes, events) //节点转移
|
|
|
+ }
|
|
|
+ } else if stype == "close" { //关闭,更新数据状态
|
|
|
+ arr := [][]map[string]interface{}{}
|
|
|
+ for _, code := range codes {
|
|
|
+ arr = append(arr, []map[string]interface{}{
|
|
|
+ map[string]interface{}{"code": code},
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "state": 1,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ ok = u.MgoEB.UpdateBulk("luamovevent", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- //mgdb.Update("luaconfig", query, set, false, true)
|
|
|
- u.MgoEB.Update("luaconfig", query, set, false, true)
|
|
|
+ lm.ServeJson(map[string]interface{}{"ok": ok})
|
|
|
}
|
|
|
|
|
|
//爬虫迁移
|
|
|
-func SpiderMoveLua(codes []string) bool {
|
|
|
+func SpiderMoveLua(codes []string, events []string) bool {
|
|
|
defer qu.Catch()
|
|
|
- for _, code := range codes {
|
|
|
- //lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
|
|
|
+ msg := []string{}
|
|
|
+ for index, code := range codes {
|
|
|
+ resultEvent := events[index]
|
|
|
+ qu.Debug("爬虫节点转移:", code, resultEvent)
|
|
|
lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
|
|
|
- movevent := qu.IntAll((*lua)["lm_movevent"])
|
|
|
- event := qu.IntAll((*lua)["event"])
|
|
|
- state := qu.IntAll((*lua)["state"])
|
|
|
- upresult := true
|
|
|
- var err interface{}
|
|
|
- if state < 7 || state == 4 { //无发布、需登录、转python、已删除状态无需下架
|
|
|
- //下架
|
|
|
- upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架
|
|
|
- }
|
|
|
- if upresult && err == nil { //下架成功,更新节点
|
|
|
- //更新节点
|
|
|
- query := map[string]interface{}{
|
|
|
- "code": code,
|
|
|
- }
|
|
|
- set := map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "event": movevent,
|
|
|
- },
|
|
|
- }
|
|
|
- //mgdb.Update("luaconfig", query, set, false, false)
|
|
|
- u.MgoEB.Update("luaconfig", query, set, false, false)
|
|
|
- //上架
|
|
|
- if state == 5 { //只有是已上架状态的爬虫上架
|
|
|
- upresult, err = spider.UpdateSpiderByCodeState(code, "5", movevent) //脚本上架
|
|
|
- if !upresult || err != nil {
|
|
|
- SendMail("爬虫定期节点迁移" + code + "上架失败")
|
|
|
- qu.Debug("定期节点迁移", code, "上架失败")
|
|
|
- return false
|
|
|
+ if len(*lua) > 0 {
|
|
|
+ state := qu.IntAll((*lua)["state"])
|
|
|
+ //event := qu.IntAll((*lua)["event"])
|
|
|
+ if state == 5 { //查询该爬虫是否是已上架状态,是则更新节点后上架,否则只更新
|
|
|
+ upresult := true
|
|
|
+ var err interface{}
|
|
|
+ //upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架
|
|
|
+ if upresult && err == nil { //下架成功,更新节点
|
|
|
+ re := qu.IntAll(resultEvent)
|
|
|
+ u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": re, "historyevent": re}}, false, false)
|
|
|
+ //上架
|
|
|
+ //upresult, err = spider.UpdateSpiderByCodeState(code, "5",re) //脚本上架
|
|
|
+ if !upresult || err != nil {
|
|
|
+ qu.Debug("爬虫节点转移", code, "上架失败")
|
|
|
+ msg = append(msg, "爬虫节点转移"+code+"上架失败")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ qu.Debug("爬虫节点转移", code, "下架失败")
|
|
|
+ msg = append(msg, "爬虫节点转移"+code+"下架失败")
|
|
|
}
|
|
|
}
|
|
|
+ u.MgoEB.Update("luamovevent", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 1}}, false, false)
|
|
|
} else {
|
|
|
- SendMail("爬虫定期节点迁移" + code + "下架失败")
|
|
|
- qu.Debug("定期节点迁移", code, "下架失败")
|
|
|
- return false
|
|
|
+ msg = append(msg, "爬虫节点转移未找到爬虫"+code)
|
|
|
}
|
|
|
}
|
|
|
+ if len(msg) > 0 {
|
|
|
+ SendMail(strings.Join(msg, ";"))
|
|
|
+ }
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
-//更新爬虫是否要迁移的状态
|
|
|
-func (lm *LuaMove) SpiderCloseMoveLua(site string, codes []string, by string) (bool, bool) {
|
|
|
+func SendMail(text string) {
|
|
|
defer qu.Catch()
|
|
|
- flush := false
|
|
|
- query := map[string]interface{}{
|
|
|
- "code": map[string]interface{}{
|
|
|
- "$in": codes,
|
|
|
- },
|
|
|
- }
|
|
|
- set := map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "lm_ismove": false,
|
|
|
- },
|
|
|
- }
|
|
|
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
|
|
|
- //清除session
|
|
|
- if by == "code" {
|
|
|
- tmpArr := []map[string]interface{}{}
|
|
|
- infoArr := sitemap[site]
|
|
|
- for _, info := range infoArr {
|
|
|
- flag := false
|
|
|
- for _, code := range codes {
|
|
|
- if qu.ObjToString(info["code"]) == code {
|
|
|
- flag = true
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if !flag {
|
|
|
- tmpArr = append(tmpArr, info)
|
|
|
- }
|
|
|
- }
|
|
|
- if len(tmpArr) == 0 {
|
|
|
- delete(sitemap, site)
|
|
|
- flush = true
|
|
|
- } else {
|
|
|
- sitemap[site] = tmpArr
|
|
|
+ for i := 1; i <= 3; i++ {
|
|
|
+ res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text))
|
|
|
+ if err == nil {
|
|
|
+ res.Body.Close()
|
|
|
+ read, err := ioutil.ReadAll(res.Body)
|
|
|
+ qu.Debug("邮件发送:", string(read), err)
|
|
|
+ break
|
|
|
}
|
|
|
- lm.SetSession("sitemap", sitemap)
|
|
|
}
|
|
|
- //return mgdb.Update("luaconfig", query, set, false, true), flush
|
|
|
- return u.MgoEB.Update("luaconfig", query, set, false, true), flush
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -367,16 +278,3 @@ func GetEvent(code string, lua map[string]interface{}) int {
|
|
|
}
|
|
|
return result
|
|
|
}
|
|
|
-
|
|
|
-func SendMail(text string) {
|
|
|
- defer qu.Catch()
|
|
|
- for i := 1; i <= 3; i++ {
|
|
|
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text))
|
|
|
- if err == nil {
|
|
|
- res.Body.Close()
|
|
|
- read, err := ioutil.ReadAll(res.Body)
|
|
|
- qu.Debug("邮件发送:", string(read), err)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
-}
|