123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- package timetask
- import (
- "fmt"
- "github.com/cron"
- "io/ioutil"
- "luacheck"
- "net/http"
- qu "qfw/util"
- "spider"
- sp "spiderutil"
- "strings"
- "sync"
- "time"
- "util"
- )
- var Mail map[string]interface{}
- func TimeTask() {
- defer qu.Catch()
- c := cron.New()
- c.Start()
- c.AddFunc("0 20 9 ? * MON-FRI", CheckCreateTask)
- c.AddFunc("0 30 23 * * *", UpdateSiteInfo) //定时更新站点信息
- c.AddFunc("0 0 23 * * *", UpdateCodeHeart) //定时更新爬虫心跳信息
- c.AddFunc("0 0 */1 ? * *", CheckLuaMove) //7000节点转增量爬虫失败告警
- c.AddFunc("0 */10 * * * *", SpiderMoveEvent) //7000节点转增量爬虫
- c.AddFunc("0 0 8 * * *", UpdateImportantCode) //更新重点网站爬虫信息
- c.AddFunc("0 */1 * * * *", luacheck.TimeTaskGetLua) //爬虫机检定时任务
- }
- // 检测创建任务失败的爬虫
- func CheckCreateTask() {
- defer qu.Catch()
- qu.Debug("开始检测任务创建...")
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": util.GetTime(0),
- },
- }
- codes := []string{}
- list, _ := util.MgoEB.Find("luacreatetaskerr", query, nil, nil, false, -1, -1)
- if len(*list) > 0 {
- for _, l := range *list {
- code := qu.ObjToString(l["code"])
- codes = append(codes, code)
- }
- }
- if len(codes) > 0 {
- for i := 1; i <= 3; i++ {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-createtask-err", "爬虫:"+strings.Join(codes, ";")))
- if err == nil {
- res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- qu.Debug("邮件发送:", string(read), err)
- break
- }
- }
- }
- }
- func UpdateSiteInfo() {
- defer qu.Catch()
- qu.Debug("定时更新站点信息开始...")
- sites, _ := util.MgoEB.Find(sp.Config.SiteColl, map[string]interface{}{"delete": false}, ``, `{"site":1}`, false, -1, -1)
- for _, s := range *sites {
- site := qu.ObjToString(s["site"])
- domain, status, event, platform, infotype, specialtype, _ := util.GetLuasInfoBySite(site, "", "", "")
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "platform": platform,
- "event": event,
- "spider_status": status,
- "domain": domain,
- "infotype": infotype,
- //"area": area,
- //"city": city,
- //"district": district,
- "updatetime": time.Now().Unix(),
- "special_type": specialtype,
- },
- }
- util.MgoEB.UpdateById(sp.Config.SiteColl, s["_id"], set)
- }
- qu.Debug("定时更新站点信息完成...")
- }
- // 更新重点网站爬虫信息
- func UpdateImportantCode() {
- data, _ := util.MgoEB.Find("site_code_baseinfo", nil, nil, map[string]interface{}{"spidercode": 1}, false, -1, -1)
- for _, d := range *data {
- lua, _ := util.MgoEB.FindOneByField("luaconfig", map[string]interface{}{"code": d["spidercode"]}, map[string]interface{}{"state": 1, "modifyuser": 1, "platform": 1, "channel": 1, "_id": 0})
- if len(*lua) > 0 {
- util.MgoEB.UpdateById("site_code_baseinfo", d["_id"], map[string]interface{}{"$set": *lua})
- }
- }
- }
- func UpdateCodeHeart() {
- qu.Debug("定时更新爬虫心跳信息...")
- defer qu.Catch()
- query := map[string]interface{}{
- "$or": []interface{}{
- map[string]interface{}{
- "platform": "python",
- },
- map[string]interface{}{
- "state": map[string]interface{}{
- "$in": []int{4, 6, 7, 8, 9, 10, 11},
- },
- },
- },
- }
- //list, _ := util.MgoE.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1)
- list, _ := util.MgoEB.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1)
- qu.Debug("定时更新爬虫心跳信息个数:", len(*list))
- for _, l := range *list {
- util.MgoS.Update("spider_heart", map[string]interface{}{"code": l["code"]}, map[string]interface{}{
- "$set": map[string]interface{}{"del": true},
- }, false, true)
- }
- qu.Debug("定时更新爬虫心跳信息完成...")
- }
- // 监测爬虫由历史转增量时未成功的
- func CheckLuaMove() {
- defer qu.Catch()
- qu.Debug("开始检测爬虫节点移动...")
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lte": time.Now().Add(-(time.Hour * 1)).Unix(),
- //"$lte": time.Now().Unix(),
- },
- "ok": false,
- }
- qu.Debug("query:", query)
- list, _ := util.MgoEB.Find("luamovelog", query, nil, nil, false, -1, -1)
- if len(*list) > 0 {
- codes := []string{}
- arr := [][]map[string]interface{}{}
- for _, l := range *list {
- code := qu.ObjToString(l["code"])
- codes = append(codes, code)
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": l["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": true, "updatetime": time.Now().Unix()}})
- arr = append(arr, update)
- }
- for i := 1; i <= 3; i++ {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", strings.Join(codes, ";")))
- if err == nil {
- res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- qu.Debug("邮件发送:", string(read), err)
- break
- }
- }
- util.MgoEB.UpdateBulk("luamovelog", arr...)
- arr = [][]map[string]interface{}{}
- }
- }
- func SpiderMoveEvent() {
- defer qu.Catch()
- sess := util.MgoEB.GetMgoConn()
- defer util.MgoEB.DestoryMongoConn(sess)
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "ok": false,
- "state": map[string]interface{}{
- "$ne": 1,
- },
- }
- count := util.MgoEB.Count("luamovelog", query)
- if count == 0 {
- return
- }
- it := sess.DB(util.MgoEB.DbName).C("luamovelog").Find(&query).Iter()
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["code"])
- //更新爬虫节点信息,上架
- lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
- spidertype := qu.ObjToString((*lua)["spidertype"])
- event := qu.IntAll((*lua)["event"])
- var upresult bool
- var err error
- set := map[string]interface{}{}
- qu.Debug("lua move:", code, event, spidertype, (*lua)["state"])
- if spidertype == "history" {
- newevent := GetEvent(code, (*lua))
- qu.Debug(code, " new event:", newevent)
- set["event"] = newevent
- set["spidertype"] = "increment"
- //type_content, _ := (*lua)["type_content"].(int)
- //iscopycontent, _ := (*lua)["iscopycontent"].(bool)
- //str_content := qu.ObjToString((*lua)["str_content"])
- //str_recontent := qu.ObjToString((*lua)["str_recontent"])
- //if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码
- // set["iscopycontent"] = false
- // set["str_content"] = str_recontent
- // set["str_recontent"] = str_content
- //}
- upset := map[string]interface{}{
- "$set": set,
- }
- if downevent := qu.IntAll((*lua)["downevent"]); downevent != 0 {
- spider.UpdateSpiderByCodeState(code, "6", downevent)
- qu.Debug(code, "下架历史节点:", downevent)
- upset["$unset"] = map[string]interface{}{"downevent": ""}
- }
- upOk := util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, upset, false, false)
- qu.Debug(code, "脚本更新成功:", upOk)
- if upOk {
- upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
- qu.Debug(code, "脚本上架", upresult, err)
- }
- }
- ok := false
- if upresult && err == nil { //上架成功
- ok = true
- qu.Debug("Code:", code, "历史迁移到增量节点成功")
- } else { //上架失败
- util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
- qu.Debug("Code:", code, "历史迁移到增量节点失败")
- }
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": ok, "state": 1, "updatetime": time.Now().Unix()}})
- lock.Lock()
- arr = append(arr, update)
- lock.Unlock()
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- util.MgoEB.UpdateBulk("luamovelog", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }
- func GetEvent(code string, lua map[string]interface{}) int {
- defer qu.Catch()
- //1、历史节点
- if lua["incrementevent"] != nil {
- return qu.IntAll(lua["incrementevent"])
- }
- //2、根据站点找节点
- query := map[string]interface{}{
- "code": map[string]interface{}{
- "$ne": code,
- },
- "site": lua["site"],
- "state": 5,
- }
- tmp, _ := util.MgoEB.FindOne("luaconfig", query)
- if tmp != nil && len(*tmp) > 0 {
- return qu.IntAll((*tmp)["event"])
- }
- //3、7700
- //spidermovevent := qu.ObjToString(lua["spidermovevent"])
- //if spidermovevent == "7700" {
- // return 7700
- //}
- //4、根据数量分配节点
- //num := 0
- //result := 7700
- //for k, t := range sp.Config.Uploadevents {
- // if qu.ObjToString(t) == spidermovevent { //bid、comm
- // event := qu.IntAll(k)
- // //count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
- // count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
- // if num == 0 || count < num {
- // result = event
- // num = count
- // }
- // }
- //}
- //5、指定节点
- return 7200
- }
|