package luaerrdata import ( "encoding/json" "luaweb/spider" "luaweb/udp" qu "qfw/util" mgdb "qfw/util/mongodb" mgu "qfw/util/mongodbutil" util "spiderutil" "strings" "time" "github.com/go-xweb/xweb" "gopkg.in/mgo.v2/bson" ) const role_admin, role_examine, role_dev = 3, 2, 1 //管理员,审核员,开发员 var Text = "重采失败,请稍后重试" type LuaInfo struct { ModifyUser string //爬虫修改人 ReState int //重采状态 State int //爬虫状态 Event int //节点 StateTime int64 //重采状态对应的时间 AllErrNum int //所有数据对应的爬虫总量 ErrNum int //采集失败量 ReErrNum int //重采失败量 SaveErrNum int //保存失败量 } type ErrorData struct { *xweb.Action errorDataIndex xweb.Mapper `xweb:"/center/errorData"` //加载错误数据 findByCode xweb.Mapper `xweb:"/center/errorData/findByCode"` //根据爬虫找所有错误数据 regatherData xweb.Mapper `xweb:"/center/errorData/regatherData"` //数据重采 singleRegather xweb.Mapper `xweb:"/center/errorData/singleRegather"` //单个信息重采 confirmLua xweb.Mapper `xweb:"/center/errorData/confirmLua"` //确认爬虫 updateRestate xweb.Mapper `xweb:"/center/errorData/updateRestate"` //修改restae状态 updateOnlineLua xweb.Mapper `xweb:"/center/errorData/updateonlinelua"` //重新上架 confirmRepair xweb.Mapper `xweb:"/center/errorData/confirmrepair"` //确认修复 } func (ed *ErrorData) ErrorDataIndex() { defer qu.Catch() auth := qu.IntAll(ed.GetSession("auth")) searchStr := ed.GetString("search[value]") search := strings.TrimSpace(searchStr) date := ed.GetString("date") user := qu.ObjToString(ed.GetSession("loginuser")) //if auth == role_admin { if ed.Method() == "POST" { startTime, endTime := GetStartAndEndTime(date) if startTime == 0 || endTime == 0 { return } query := map[string]interface{}{ "state": bson.M{"$lte": 3}, //查询state==3及修复成功的数据是为了在页面展示该数据的爬虫方便“更新上架”,“确认修复” "from": "lua", "comeintime": bson.M{"$gte": startTime, "$lte": endTime}, } if auth == 1 { query["modifyuser"] = user } if search != "" { query = bson.M{"spidercode": bson.M{"$regex": search}} } qu.Debug("query:", query) data := []map[string]interface{}{} list := *mgu.Find("regatherdata", "spider", "spider", query, nil, nil, false, -1, -1) if len(list) > 0 { //alltmp := map[string]int{} errtmp := map[string]int{} rerrtmp := map[string]int{} saverrtmp := map[string]int{} successtmp := map[string]int{} codeLua := map[string]*LuaInfo{} for _, l := range list { code := qu.ObjToString(l["spidercode"]) state := qu.IntAll(l["state"]) //alltmp[code] = alltmp[code] + 1 if codeLua[code] == nil { data := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) restate := qu.IntAll(data["restate"]) info := &LuaInfo{ ModifyUser: qu.ObjToString(data["modifyuser"]), ReState: restate, State: qu.IntAll(data["state"]), Event: qu.IntAll(data["event"]), StateTime: int64(0), } if restate == 4 { info.StateTime = qu.Int64All(data["updatetime"]) } else if restate == 3 { info.StateTime = qu.Int64All(data["auditime"]) } else if restate == 2 { info.StateTime = qu.Int64All(data["repairtime"]) } else if restate == 1 { info.StateTime = qu.Int64All(data["confirmtime"]) } codeLua[code] = info } if state == 0 { //错误 errtmp[code] = errtmp[code] + 1 } else if state == 1 { //重下失败 rerrtmp[code] = rerrtmp[code] + 1 } else if state == 2 { //保存服务发送失败 saverrtmp[code] = saverrtmp[code] + 1 } else { //已修复数据 successtmp[code] = successtmp[code] + 1 } } vs_err := MapValueSort(errtmp) //采集错误量排序 for i := vs_err.Len() - 1; i >= 0; i-- { spidercode := vs_err.Keys[i] data = append(data, map[string]interface{}{ "spidercode": spidercode, "encode": util.Se.Encode2Hex(spidercode), "errnum": vs_err.Vals[i], "rerrnum": rerrtmp[spidercode], "saverrnum": saverrtmp[spidercode], "num": vs_err.Len() - i, "modifyuser": codeLua[spidercode].ModifyUser, "restate": codeLua[spidercode].ReState, "state": codeLua[spidercode].State, "statetime": codeLua[spidercode].StateTime, "event": codeLua[spidercode].Event, }) //delete(alltmp, spidercode) delete(rerrtmp, spidercode) //去除有采集失败的爬虫 delete(saverrtmp, spidercode) //去除有采集失败的爬虫 delete(successtmp, spidercode) //去除有采集失败的爬虫 } count := vs_err.Len() vs_reerr := MapValueSort(rerrtmp) //重采错误数据量排序 for j := vs_reerr.Len() - 1; j >= 0; j-- { count++ spidercode := vs_reerr.Keys[j] data = append(data, map[string]interface{}{ "spidercode": spidercode, "encode": util.Se.Encode2Hex(spidercode), "errnum": 0, "rerrnum": rerrtmp[spidercode], "saverrnum": saverrtmp[spidercode], "num": count, "modifyuser": codeLua[spidercode].ModifyUser, "restate": codeLua[spidercode].ReState, "state": codeLua[spidercode].State, "statetime": codeLua[spidercode].StateTime, "event": codeLua[spidercode].Event, }) delete(saverrtmp, spidercode) //去除有采集失败的爬虫 delete(successtmp, spidercode) //去除有采集失败的爬虫 } for spidercode, _ := range saverrtmp { count++ data = append(data, map[string]interface{}{ "spidercode": spidercode, "encode": util.Se.Encode2Hex(spidercode), "errnum": 0, "rerrnum": 0, "saverrnum": saverrtmp[spidercode], "num": count, "modifyuser": codeLua[spidercode].ModifyUser, "restate": codeLua[spidercode].ReState, "state": codeLua[spidercode].State, "statetime": codeLua[spidercode].StateTime, "event": codeLua[spidercode].Event, }) delete(successtmp, spidercode) //去除有采集失败的爬虫 } for spidercode, _ := range successtmp { count++ data = append(data, map[string]interface{}{ "spidercode": spidercode, "encode": util.Se.Encode2Hex(spidercode), "errnum": 0, "rerrnum": 0, "saverrnum": 0, "num": count, "modifyuser": codeLua[spidercode].ModifyUser, "restate": codeLua[spidercode].ReState, "state": codeLua[spidercode].State, "statetime": codeLua[spidercode].StateTime, "event": codeLua[spidercode].Event, }) } // for code, _ := range alltmp { // i++ // data = append(data, map[string]interface{}{ // "spidercode": code, // "encode": util.Se.Encode2Hex(code), // "errnum": 0, // "rerrnum": rerrtmp[code], // "saverrnum": saverrtmp[code], // "num": i, // "modifyuser": codeLua[code].ModifyUser, // "restate": codeLua[code].ReState, // "state": codeLua[code].State, // "statetime": codeLua[code].StateTime, // "event": codeLua[code].Event, // }) // } //alltmp = map[string]int{} errtmp = map[string]int{} rerrtmp = map[string]int{} saverrtmp = map[string]int{} codeLua = map[string]*LuaInfo{} } ed.ServeJson(map[string]interface{}{"data": data}) } else { now := time.Now().AddDate(0, 0, -1) ed.T["date"] = qu.FormatDate(&now, qu.Date_Short_Layout) ed.Render("errdata.html", &ed.T) } // } else { // ed.Write("您没有权限") // } //ed.Write("数据错误") } func (ed *ErrorData) FindByCode() { defer qu.Catch() code := ed.GetString("code") date := ed.GetString("date") if ed.Method() == "GET" { restate, _ := ed.GetInteger("restate") ed.T["date"] = date ed.T["code"] = ed.GetString("code") ed.T["restate"] = restate ed.Render("errhreflist.html", &ed.T) } else if ed.Method() == "POST" { startTime, endTime := GetStartAndEndTime(date) if startTime == 0 || endTime == 0 { return } start, _ := ed.GetInteger("start") limit, _ := ed.GetInteger("length") draw, _ := ed.GetInteger("draw") state, _ := ed.GetInteger("state") q_state := bson.M{} if state == -1 { q_state = bson.M{"$lt": 3} } else { q_state = bson.M{"$eq": state} } query := bson.M{ "from": "lua", "spidercode": code, "state": q_state, "comeintime": bson.M{"$gte": startTime, "$lte": endTime}, } qu.Debug("query:", query) page := start / 10 data := *mgu.Find("regatherdata", "spider", "spider", query, `{"state":1}`, `{"href":1,"spidercode":1,"state":1,comeintime:1}`, false, start, limit) count := mgu.Count("regatherdata", "spider", "spider", query) if data != nil { for k, d := range data { d["num"] = k + 1 + page*10 state := qu.IntAll(d["state"]) if state == 0 { d["state"] = "采集失败" } else if state == 1 { d["state"] = "重采失败" } else if state == 2 { d["state"] = "保存失败" } comeintime := qu.Int64All(d["comeintime"]) d["comeintime"] = qu.FormatDateByInt64(&comeintime, qu.Date_Full_Layout) } } ed.ServeJson(map[string]interface{}{"draw": draw, "data": data, "recordsFiltered": count, "recordsTotal": count}) } return } //重采 func (ed *ErrorData) RegatherData() { defer qu.Catch() codes := ed.GetString("codes") date := ed.GetString("date") state, _ := ed.GetInteger("state") if len(codes) > 0 { start, end := GetStartAndEndTime(date) save := map[string]interface{}{ "start": start, "end": end, "state": state, "codeorid": strings.Split(codes, ","), "isrun": false, "type": "code", } if id := mgu.Save("regatherudp", "spider", "spider", save); id != "" { qu.Debug("发送udp", id) udpMap := map[string]interface{}{ "udpid": id, } by, err := json.Marshal(udpMap) if err == nil && !udp.IsSendUdp { udp.SendUdpLock.Lock() udp.SendUdp(by) //发送udp select { case info := <-udp.Ch: udp.IsSendUdp = false ed.ServeJson(info) case <-time.After(time.Second * 10): go func() { q := map[string]interface{}{"_id": qu.StringTOBsonId(id)} s := map[string]interface{}{"$set": map[string]interface{}{"isrun": true, "remark": "Udp发送失败"}} mgu.Update("regatherudp", "spider", "spider", q, s, false, false) }() udp.IsSendUdp = false ed.ServeJson("重新采集失败,请稍后重试") } udp.SendUdpLock.Unlock() } ed.ServeJson(Text) } else { ed.ServeJson("Udp条件保存失败") } } else { ed.ServeJson("爬虫代码选择出错") } } func (ed *ErrorData) SingleRegather() { defer qu.Catch() code := ed.GetString("code") date := ed.GetString("date") state, _ := ed.GetInteger("state") id := ed.GetString("id") start, end := GetStartAndEndTime(date) save := map[string]interface{}{ "start": start, "end": end, "state": state, "isrun": false, } if id != "" { save["codeorid"] = []string{id} save["type"] = "id" } else if id == "" && state == -1 { //全部重采 save["codeorid"] = []string{code} save["type"] = "code" } if id := mgu.Save("regatherudp", "spider", "spider", save); id != "" { qu.Debug("发送udp", id) udpMap := map[string]interface{}{ "udpid": id, } by, err := json.Marshal(udpMap) if err == nil && !udp.IsSendUdp { udp.SendUdpLock.Lock() udp.SendUdp(by) //发送udp select { case info := <-udp.Ch: udp.IsSendUdp = false ed.ServeJson(info) case <-time.After(time.Second * 10): go func() { q := map[string]interface{}{"_id": qu.StringTOBsonId(id)} s := map[string]interface{}{"$set": map[string]interface{}{"isrun": true, "remark": "Udp发送失败"}} mgu.Update("regatherudp", "spider", "spider", q, s, false, false) }() udp.IsSendUdp = false ed.ServeJson("重新采集失败,请稍后重试") } udp.SendUdpLock.Unlock() } ed.ServeJson(Text) } else { ed.ServeJson("Udp条件保存失败") } } func (ed *ErrorData) ConfirmLua() { defer qu.Catch() codes := ed.GetString("codes") state := true for _, code := range strings.Split(codes, ",") { lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) if restate := qu.IntAll(lua["restate"]); restate == 0 || restate == 4 || restate == 3 { //0是未修复过的爬虫;4是已经修复过的爬虫 b := mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 1, "confirmtime": time.Now().Unix()}}, false, false) if !b { state = false } } } ed.ServeJson(map[string]interface{}{"state": state}) } func (ed *ErrorData) UpdateRestate() { defer qu.Catch() code := ed.GetString("code") restateTmp, _ := ed.GetInteger("restate") lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) restate := qu.IntAll(lua["restate"]) if restate == 0 { ed.ServeJson("该爬虫未确认") return } set := map[string]interface{}{} if restateTmp == 1 { //提交审核 if restate > 1 { ed.ServeJson("该爬虫已修复") return } set["restate"] = 2 set["repairtime"] = time.Now().Unix() } else if restateTmp == 2 { //审核通过 if restate < 2 { ed.ServeJson("该爬虫未修复") return } set["restate"] = 3 set["auditime"] = time.Now().Unix() } b := mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) text := "" if restateTmp == 1 { if b { text = "提交修复成功" } else { text = "提交修复失败" } } else if restateTmp == 2 { if b { text = "审核成功" } else { text = "审核失败" } } ed.ServeJson(text) } func (ed *ErrorData) UpdateOnlineLua() { defer qu.Catch() code := ed.GetString("code") lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code}) if qu.IntAll(lua["state"]) != 5 { //非上架爬虫,不能更新 ed.ServeJson(map[string]interface{}{"state": false}) return } event := qu.IntAll(lua["event"]) b, err := spider.UpdateSpiderByCodeState(code, "-1", event) if b && err == nil { mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false) ed.ServeJson(map[string]interface{}{"state": true}) } else { ed.ServeJson(map[string]interface{}{"state": false}) } } func (ed *ErrorData) ConfirmRepair() { defer qu.Catch() codes := ed.GetString("codes") data := []string{} for _, code := range strings.Split(codes, ",") { if code == "" { continue } if !mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false) { data = append(data, code) } } ed.ServeJson(map[string]interface{}{"data": data}) } func GetStartAndEndTime(date string) (startTime, endTime int64) { t, err := time.ParseInLocation(qu.Date_Short_Layout, date, time.Local) if err != nil { qu.Debug("Time Error:", err) return } startTime = t.Unix() endTime = t.AddDate(0, 0, 1).Unix() return }