package timetask import ( "fmt" "github.com/cron" "io/ioutil" "luacheck" "net/http" qu "qfw/util" "spider" sp "spiderutil" "strings" "sync" "time" "util" ) var Mail map[string]interface{} func TimeTask() { defer qu.Catch() c := cron.New() c.Start() c.AddFunc("0 20 9 ? * MON-FRI", CheckCreateTask) c.AddFunc("0 30 23 * * *", UpdateSiteInfo) //定时更新站点信息 c.AddFunc("0 0 23 * * *", UpdateCodeHeart) //定时更新爬虫心跳信息 c.AddFunc("0 0 */1 ? * *", CheckLuaMove) //7000节点转增量爬虫失败告警 c.AddFunc("0 */10 * * * *", SpiderMoveEvent) //7000节点转增量爬虫 c.AddFunc("0 0 8 * * *", UpdateImportantCode) //更新重点网站爬虫信息 c.AddFunc("0 */1 * * * *", luacheck.TimeTaskGetLua) //爬虫机检定时任务 } // 检测创建任务失败的爬虫 func CheckCreateTask() { defer qu.Catch() qu.Debug("开始检测任务创建...") query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(0), }, } codes := []string{} list, _ := util.MgoEB.Find("luacreatetaskerr", query, nil, nil, false, -1, -1) if len(*list) > 0 { for _, l := range *list { code := qu.ObjToString(l["code"]) codes = append(codes, code) } } if len(codes) > 0 { for i := 1; i <= 3; i++ { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-createtask-err", "爬虫:"+strings.Join(codes, ";"))) if err == nil { res.Body.Close() read, err := ioutil.ReadAll(res.Body) qu.Debug("邮件发送:", string(read), err) break } } } } func UpdateSiteInfo() { defer qu.Catch() qu.Debug("定时更新站点信息开始...") sites, _ := util.MgoEB.Find(sp.Config.SiteColl, map[string]interface{}{"delete": false}, ``, `{"site":1}`, false, -1, -1) for _, s := range *sites { site := qu.ObjToString(s["site"]) domain, status, event, platform, infotype, specialtype, _ := util.GetLuasInfoBySite(site, "", "", "") set := map[string]interface{}{ "$set": map[string]interface{}{ "platform": platform, "event": event, "spider_status": status, "domain": domain, "infotype": infotype, //"area": area, //"city": city, //"district": district, "updatetime": time.Now().Unix(), "special_type": specialtype, }, } util.MgoEB.UpdateById(sp.Config.SiteColl, s["_id"], set) } qu.Debug("定时更新站点信息完成...") } // 更新重点网站爬虫信息 func UpdateImportantCode() { data, _ := util.MgoEB.Find("site_code_baseinfo", nil, nil, map[string]interface{}{"spidercode": 1}, false, -1, -1) for _, d := range *data { lua, _ := util.MgoEB.FindOneByField("luaconfig", map[string]interface{}{"code": d["spidercode"]}, map[string]interface{}{"state": 1, "modifyuser": 1, "platform": 1, "channel": 1, "_id": 0}) if len(*lua) > 0 { util.MgoEB.UpdateById("site_code_baseinfo", d["_id"], map[string]interface{}{"$set": *lua}) } } } func UpdateCodeHeart() { qu.Debug("定时更新爬虫心跳信息...") defer qu.Catch() query := map[string]interface{}{ "$or": []interface{}{ map[string]interface{}{ "platform": "python", }, map[string]interface{}{ "state": map[string]interface{}{ "$in": []int{4, 6, 7, 8, 9, 10, 11}, }, }, }, } //list, _ := util.MgoE.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1) list, _ := util.MgoEB.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1) qu.Debug("定时更新爬虫心跳信息个数:", len(*list)) for _, l := range *list { util.MgoS.Update("spider_heart", map[string]interface{}{"code": l["code"]}, map[string]interface{}{ "$set": map[string]interface{}{"del": true}, }, false, true) } qu.Debug("定时更新爬虫心跳信息完成...") } // 监测爬虫由历史转增量时未成功的 func CheckLuaMove() { defer qu.Catch() qu.Debug("开始检测爬虫节点移动...") query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lte": time.Now().Add(-(time.Hour * 1)).Unix(), //"$lte": time.Now().Unix(), }, "ok": false, } qu.Debug("query:", query) list, _ := util.MgoEB.Find("luamovelog", query, nil, nil, false, -1, -1) if len(*list) > 0 { codes := []string{} arr := [][]map[string]interface{}{} for _, l := range *list { code := qu.ObjToString(l["code"]) codes = append(codes, code) update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": l["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": true, "updatetime": time.Now().Unix()}}) arr = append(arr, update) } for i := 1; i <= 3; i++ { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", strings.Join(codes, ";"))) if err == nil { res.Body.Close() read, err := ioutil.ReadAll(res.Body) qu.Debug("邮件发送:", string(read), err) break } } util.MgoEB.UpdateBulk("luamovelog", arr...) arr = [][]map[string]interface{}{} } } func SpiderMoveEvent() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) ch := make(chan bool, 2) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "ok": false, "state": map[string]interface{}{ "$ne": 1, }, } count := util.MgoEB.Count("luamovelog", query) if count == 0 { return } it := sess.DB(util.MgoEB.DbName).C("luamovelog").Find(&query).Iter() n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) //更新爬虫节点信息,上架 lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code}) spidertype := qu.ObjToString((*lua)["spidertype"]) event := qu.IntAll((*lua)["event"]) var upresult bool var err error set := map[string]interface{}{} qu.Debug("lua move:", code, event, spidertype, (*lua)["state"]) if spidertype == "history" { newevent := GetEvent(code, (*lua)) qu.Debug(code, " new event:", newevent) set["event"] = newevent set["spidertype"] = "increment" //type_content, _ := (*lua)["type_content"].(int) //iscopycontent, _ := (*lua)["iscopycontent"].(bool) //str_content := qu.ObjToString((*lua)["str_content"]) //str_recontent := qu.ObjToString((*lua)["str_recontent"]) //if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码 // set["iscopycontent"] = false // set["str_content"] = str_recontent // set["str_recontent"] = str_content //} upset := map[string]interface{}{ "$set": set, } if downevent := qu.IntAll((*lua)["downevent"]); downevent != 0 { spider.UpdateSpiderByCodeState(code, "6", downevent) qu.Debug(code, "下架历史节点:", downevent) upset["$unset"] = map[string]interface{}{"downevent": ""} } upOk := util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, upset, false, false) qu.Debug(code, "脚本更新成功:", upOk) if upOk { upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架 qu.Debug(code, "脚本上架", upresult, err) } } ok := false if upresult && err == nil { //上架成功 ok = true qu.Debug("Code:", code, "历史迁移到增量节点成功") } else { //上架失败 util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false) qu.Debug("Code:", code, "历史迁移到增量节点失败") } update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": ok, "state": 1, "updatetime": time.Now().Unix()}}) lock.Lock() arr = append(arr, update) lock.Unlock() }(tmp) tmp = map[string]interface{}{} } wg.Wait() lock.Lock() if len(arr) > 0 { util.MgoEB.UpdateBulk("luamovelog", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() } func GetEvent(code string, lua map[string]interface{}) int { defer qu.Catch() //1、历史节点 if lua["incrementevent"] != nil { return qu.IntAll(lua["incrementevent"]) } //2、根据站点找节点 query := map[string]interface{}{ "code": map[string]interface{}{ "$ne": code, }, "site": lua["site"], "state": 5, } tmp, _ := util.MgoEB.FindOne("luaconfig", query) if tmp != nil && len(*tmp) > 0 { return qu.IntAll((*tmp)["event"]) } //3、7700 //spidermovevent := qu.ObjToString(lua["spidermovevent"]) //if spidermovevent == "7700" { // return 7700 //} //4、根据数量分配节点 //num := 0 //result := 7700 //for k, t := range sp.Config.Uploadevents { // if qu.ObjToString(t) == spidermovevent { //bid、comm // event := qu.IntAll(k) // //count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event}) // count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event}) // if num == 0 || count < num { // result = event // num = count // } // } //} //5、指定节点 return 7200 }