123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- package front
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "luaweb/spider"
- "net/http"
- qu "qfw/util"
- mgdb "qfw/util/mongodb"
- "sort"
- util "spiderutil"
- "strings"
- "time"
- "github.com/go-xweb/xweb"
- "gopkg.in/mgo.v2/bson"
- )
- type LuaMove struct {
- *xweb.Action
- luaMoveManager xweb.Mapper `xweb:"/center/luamove"` //站点列表
- luaMoveSite xweb.Mapper `xweb:"/center/luamove/site/(.*)"` //
- luaMoveCodeInfo xweb.Mapper `xweb:"/center/luamove/codeinfo"` //爬虫列表
- luaMoveBySite xweb.Mapper `xweb:"/center/luamove/luamovebysite"` //按站点迁移、关闭
- luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //按爬虫迁移、关闭
- updateEventBySite xweb.Mapper `xweb:"/center/spider/updateventbysite"` //更新某站点下爬虫的节点
- }
- type OtherBase struct {
- IsFlow int //爬虫所采集数据是否参与数据流程标识
- SpiderType string //爬虫类型:increment增量;history历史
- SpiderHistoryMaxPage int //采集历史数据时的采集最大页
- SpiderMoveEvent string //爬虫采集完历史后要转移到的节点 comm:队列模式、bid:高性能模式、7700
- }
- func (lm *LuaMove) LuaMoveManager() {
- auth := qu.IntAll(lm.GetSession("auth"))
- if auth == role_admin {
- if lm.Method() == "GET" {
- events := []string{}
- for k, _ := range util.Config.Uploadevents {
- events = append(events, k)
- }
- sort.Strings(events)
- lm.T["events"] = events
- lm.Render("luamovesite.html")
- } else {
- query := bson.M{"lm_ismove": true}
- lualist := *mgdb.Find("luaconfig", query, nil, nil, false, -1, -1)
- data := []map[string]interface{}{}
- siteMap := map[string][]map[string]interface{}{}
- for _, l := range lualist {
- pc := l["param_common"].([]interface{})
- site := qu.ObjToString(pc[1])
- href := ""
- if len(pc) >= 12 {
- href = qu.ObjToString(pc[11])
- }
- lm_movevent := l["lm_movevent"]
- event := l["event"]
- if tmp := siteMap[site]; tmp == nil {
- data = append(data, map[string]interface{}{"site": site, "lm_movevent": lm_movevent, "event": event})
- siteMap[site] = []map[string]interface{}{
- map[string]interface{}{
- "code": l["code"],
- "channel": pc[2],
- "event": event,
- "lm_movevent": lm_movevent,
- "href": href,
- "lm_download": l["lm_download"],
- "state": l["state"],
- },
- }
- } else {
- tmp = append(tmp, map[string]interface{}{
- "code": l["code"],
- "channel": pc[2],
- "event": event,
- "lm_movevent": lm_movevent,
- "href": href,
- "lm_download": l["lm_download"],
- "state": l["state"],
- })
- siteMap[site] = tmp
- }
- }
- lm.SetSession("sitemap", siteMap)
- lm.ServeJson(map[string]interface{}{"data": data})
- }
- } else {
- lm.Write("您没有权限")
- }
- }
- func (lm *LuaMove) LuaMoveSite(site string) {
- defer qu.Catch()
- if lm.Method() == "GET" {
- events := []string{}
- for k, _ := range util.Config.Uploadevents {
- events = append(events, k)
- }
- sort.Strings(events)
- lm.T["events"] = events
- lm.T["site"] = site
- lm.Render("luamovecode.html", &lm.T)
- }
- }
- func (lm *LuaMove) LuaMoveCodeInfo() {
- site := lm.GetString("site")
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
- lm.ServeJson(map[string]interface{}{"data": sitemap[site]})
- }
- func (lm *LuaMove) LuaMoveBySite() {
- defer qu.Catch()
- site := lm.GetString("site")
- stype := lm.GetString("stype")
- //movevent, _ := lm.GetInteger("movevent")
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
- codesinfo := sitemap[site]
- codes := []string{}
- for _, info := range codesinfo {
- code := qu.ObjToString(info["code"])
- codes = append(codes, code)
- }
- ok := false
- if stype == "move" { //迁移
- ok = SpiderMoveLua(codes)
- } else if stype == "close" { //关闭
- ok, _ = lm.SpiderCloseMoveLua(site, codes, "site")
- }
- lm.ServeJson(map[string]interface{}{"ok": ok})
- }
- func (lm *LuaMove) LuaMoveByCode() {
- defer qu.Catch()
- site := lm.GetString("site")
- code := lm.GetString("code")
- stype := lm.GetString("stype")
- //movevent, _ := lm.GetInteger("movevent")
- codes := strings.Split(code, ",")
- ok := false
- flush := false
- if stype == "move" { //迁移
- ok = SpiderMoveLua(codes)
- } else if stype == "close" { //关闭
- ok, flush = lm.SpiderCloseMoveLua(site, codes, "code")
- }
- qu.Debug(ok)
- lm.ServeJson(map[string]interface{}{"ok": ok, "flush": flush})
- }
- func (lm *LuaMove) UpdateEventBySite() {
- defer qu.Catch()
- site := lm.GetString("site")
- movevent, _ := lm.GetInteger("movevent")
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
- codes := []string{}
- for _, info := range sitemap[site] {
- code := qu.ObjToString(info["code"])
- codes = append(codes, code)
- info["lm_movevent"] = movevent
- }
- lm.SetSession("sitemap", sitemap)
- query := bson.M{
- "code": bson.M{
- "$in": codes,
- },
- }
- set := bson.M{
- "$set": bson.M{
- "lm_movevent": movevent,
- },
- }
- mgdb.Update("luaconfig", query, set, false, true)
- }
- //爬虫迁移
- func SpiderMoveLua(codes []string) bool {
- defer qu.Catch()
- for _, code := range codes {
- lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
- movevent := qu.IntAll(lua["lm_movevent"])
- event := qu.IntAll(lua["event"])
- state := qu.IntAll(lua["state"])
- upresult := true
- var err interface{}
- if state < 7 || state == 4 { //无发布、需登录、无法处理、已删除状态无需下架
- //下架
- upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架
- }
- if upresult && err == nil { //下架成功,更新节点
- //更新节点
- query := bson.M{
- "code": code,
- }
- set := bson.M{
- "$set": bson.M{
- "event": movevent,
- },
- }
- mgdb.Update("luaconfig", query, set, false, false)
- //上架
- if state == 5 { //只有是已上架状态的爬虫上架
- upresult, err = spider.UpdateSpiderByCodeState(code, "5", movevent) //脚本上架
- if !upresult || err != nil {
- SendMail("爬虫定期节点迁移" + code + "上架失败")
- qu.Debug("定期节点迁移", code, "上架失败")
- return false
- }
- }
- } else {
- SendMail("爬虫定期节点迁移" + code + "下架失败")
- qu.Debug("定期节点迁移", code, "下架失败")
- return false
- }
- }
- return true
- }
- //更新爬虫是否要迁移的状态
- func (lm *LuaMove) SpiderCloseMoveLua(site string, codes []string, by string) (bool, bool) {
- defer qu.Catch()
- flush := false
- query := bson.M{
- "code": bson.M{
- "$in": codes,
- },
- }
- set := bson.M{
- "$set": bson.M{
- "lm_ismove": false,
- },
- }
- sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
- //清除session
- if by == "code" {
- tmpArr := []map[string]interface{}{}
- infoArr := sitemap[site]
- for _, info := range infoArr {
- flag := false
- for _, code := range codes {
- if qu.ObjToString(info["code"]) == code {
- flag = true
- break
- }
- }
- if !flag {
- tmpArr = append(tmpArr, info)
- }
- }
- if len(tmpArr) == 0 {
- delete(sitemap, site)
- flush = true
- } else {
- sitemap[site] = tmpArr
- }
- lm.SetSession("sitemap", sitemap)
- }
- return mgdb.Update("luaconfig", query, set, false, true), flush
- }
- //
- func SpiderMoveEvent(data string) {
- //解析爬虫代码
- data = util.Se.DecodeString(data)
- infos := []interface{}{}
- err := json.Unmarshal([]byte(data), &infos)
- if err != nil {
- qu.Debug("历史迁移到增量节点失败:", data)
- return
- }
- code := qu.ObjToString(infos[0])
- //迁移节点并上架
- lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
- spidertype := qu.ObjToString(lua["spidertype"])
- event := qu.IntAll(lua["event"])
- var upresult bool
- qu.Debug("lua move:", code, event)
- if spidertype == "history" {
- newevent := GetEvent(code, lua)
- qu.Debug("new event:", newevent)
- mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": newevent, "spidertype": "increment"}}, false, false)
- upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
- }
- ok := false
- if upresult && err == nil { //上架成功
- ok = true
- qu.Debug("Code:", code, "历史迁移到增量节点成功")
- } else { //上架失败
- mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
- qu.Debug("Code:", code, "历史迁移到增量节点失败")
- }
- mgdb.Save("luamovelog", map[string]interface{}{
- "code": code,
- "comeintime": time.Now().Unix(),
- "type": "movevent",
- "ok": ok,
- })
- }
- //
- func GetEvent(code string, lua map[string]interface{}) int {
- defer qu.Catch()
- //1、历史节点
- if lua["historyevent"] != nil {
- return qu.IntAll(lua["historyevent"])
- }
- //2、根据站点找节点
- param_common := lua["param_common"].([]interface{})
- site := qu.ObjToString(param_common[1])
- query := map[string]interface{}{
- "code": map[string]interface{}{
- "$ne": code,
- },
- "param_common.1": site,
- "state": 5,
- }
- tmp := *mgdb.FindOne("luaconfig", query)
- if tmp != nil && len(tmp) > 0 {
- return qu.IntAll(tmp["event"])
- }
- //3、7700
- spidermovevent := qu.ObjToString(lua["spidermovevent"])
- if spidermovevent == "7700" {
- return 7700
- }
- //4、根据数量分配节点
- num := 0
- result := 7700
- for k, t := range util.Config.Uploadevents {
- if qu.ObjToString(t) == spidermovevent { //bid、comm
- event := qu.IntAll(k)
- count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
- if num == 0 || count < num {
- result = event
- num = count
- }
- }
- }
- return result
- }
- func SendMail(text string) {
- defer qu.Catch()
- for i := 1; i <= 3; i++ {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text))
- if err == nil {
- res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- qu.Debug("邮件发送:", string(read), err)
- break
- }
- }
- }
|