/** 爬虫,脚本接口,需要扩展 */ package spider import ( "crypto/sha1" "crypto/sha256" "fmt" "io" "log" "math/big" "math/rand" mu "mfw/util" qu "qfw/util" "regexp" util "spiderutil" "strings" "sync" "sync/atomic" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) //爬虫() type Spider struct { Script Code string //代码 Name string //名称 DownDetail bool //是否下载详细页 Stop bool //停止标志 Pass bool //暂停标志 LastPubshTime int64 //最后发布时间 LastHeartbeat int64 //最后心跳时间 SpiderRunRate int64 //执行频率 ExecuteOkTime int64 //任务执行成功/完成时间 Collection string //写入表名 Thread int64 //线程数 LastExecTime int64 //最后执行时间 LastDowncount int32 //最后一次下载量 TodayDowncount int32 //今日下载量 YesterdayDowncount int32 //昨日下载量 TotalDowncount int32 //总下载量 RoundCount int32 //执行轮次 StoreMode int //存储模式 StoreToMsgEvent int //消息类型 CoverAttr string //按属性判重数据 SleepBase int //基本延时 SleepRand int //随机延时 TargetChannelUrl string //栏目页地址 UpperLimit, LowerLimit int //正常值上限、下限 UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间 MUserName, MUserEmail string //维护人,维护人邮箱 Index int //数组索引 //历史补漏 IsHistoricalMend bool //是否是历史补漏爬虫 IsMustDownload bool //是否强制下载 } var TimeChan = make(chan bool, 1) var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`) //高性能模式定时采集三级页信息 func DetailData() { defer qu.Catch() <-InitAllLuaOver //脚本加载完毕,执行 if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true GetListDataDownloadDetail() } } func GetListDataDownloadDetail() { defer qu.Catch() logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++") Allspiders.Range(func(k, v interface{}) bool { go DownloadHighDetail(k.(string)) time.Sleep(2 * time.Second) return true }) } //高性能模式根据列表页数据下载三级页 func DownloadHighDetail(code string) { defer qu.Catch() for { //logger.Info("爬虫代码:", s.Code, "已下架:", s.Stop) //if !s.Stop { //爬虫是运行状态 /* 1、每轮开始先查询当天下载的数据 2、本次查询无数据依次向前推一天查询数据(暂定50条数据) */ o := map[string]interface{}{"_id": 1} //排序 f := map[string]interface{}{ //查询字段 "state": 0, "comeintime": 0, "event": 0, } q := map[string]interface{}{ "spidercode": code, "state": 0, //0:入库状态;-1:采集失败;1:成功 } list := &[]map[string]interface{}{} //查询数据的集合 for day := 0; day <= util.Config.DayNum; day++ { comeintime := map[string]interface{}{"$gte": GetTime(-day)} //指定查询数据的时间 if day != 0 { //不是当天,指定数据范围 comeintime["$lt"] = GetTime(-day + 1) } q["comeintime"] = comeintime list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100) //logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list)) if list != nil && len(*list) > 0 { break } else { time.Sleep(1 * time.Second) } } if list != nil && len(*list) > 0 { spChan := make(chan *Spider, len(AllspidersMap[code])) AllspidersMapLock.Lock() for _, sp := range AllspidersMap[code] { spChan <- sp } AllspidersMapLock.Unlock() wg := &sync.WaitGroup{} for _, l := range *list { spTmp := <-spChan wg.Add(1) go func(tmp map[string]interface{}, sp *Spider) { defer func() { spChan <- sp wg.Done() }() _id := tmp["_id"] query := map[string]interface{}{"_id": _id} competehref := qu.ObjToString(tmp["competehref"]) if competehref != "" { //验证三方网站数据剑鱼是否已采集 title := qu.ObjToString(tmp["title"]) one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title}) if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息 set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1 MgoS.Update("spider_highlistdata", query, set, false, false) return } } times := qu.IntAll(tmp["times"]) success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") href := qu.ObjToString(tmp["href"]) data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = sp.DownloadDetailPage(tmp, data) if err != nil || data == nil { success = false times++ if err != nil { logger.Error(sp.Code, err, tmp) if len(tmp) > 0 { SaveErrorData(sp.MUserName, tmp, err) //保存错误信息 } } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 log.Println("beforeHref:", href, "afterHref:", tmphref) //增量 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+tmphref, tmphref, 3600*24*30) //全量 db := HexToBigIntMod(tmphref) isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref) if !isExist { util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref, "", -1) } } if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} MgoS.Update("spider_highlistdata", query, set, false, false) return } t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() //计数 tmpsp1, b := Allspiders.Load(sp.Code) if b { sp1, ok := tmpsp1.(*Spider) if ok { atomic.AddInt32(&sp1.LastDowncount, 1) atomic.AddInt32(&sp1.TodayDowncount, 1) atomic.AddInt32(&sp1.TotalDowncount, 1) } } data["spidercode"] = sp.Code data["dataging"] = 0 Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1 MgoS.Update("spider_highlistdata", query, set, false, false) }(l, spTmp) } wg.Wait() //一轮次跑完重载脚本 ReloadScript(code) } else { //没有数据 time.Sleep(2 * time.Minute) } } } //下载解析内容页 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) { defer mu.Catch() s.LastHeartbeat = time.Now().Unix() util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan) tab := s.L.NewTable() for k, v := range param { if val, ok := v.(string); ok { tab.RawSet(lua.LString(k), lua.LString(val)) } else if val, ok := v.(int64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(int32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(bool); ok { tab.RawSet(lua.LString(k), lua.LBool(val)) } } var err error if err = s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadDetailPage"), NRet: 1, Protect: true, }, tab); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) return data, err } lv := s.L.Get(-1) s.L.Pop(1) //拼map if v3, ok := lv.(*lua.LTable); ok { v3.ForEach(func(k, v lua.LValue) { if tmp, ok := k.(lua.LString); ok { key := string(tmp) if value, ok := v.(lua.LString); ok { data[key] = string(value) } else if value, ok := v.(lua.LNumber); ok { data[key] = value } else if value, ok := v.(*lua.LTable); ok { tmp := util.TableToMap(value) data[key] = tmp } } }) return data, err } else { return nil, err } } //重载脚本 func ReloadScript(code string) { scriptMap := getSpiderScriptDB(code) if codeInfo := scriptMap[code]; codeInfo != nil { AllspidersMapLock.Lock() for _, sp := range AllspidersMap[code] { sp.ScriptFile = codeInfo["script"] if codeInfo["createuser"] != "" { sp.UserName = codeInfo["createuser"] } if codeInfo["createuseremail"] != "" { sp.UserEmail = codeInfo["createuseremail"] } sp.MUserName = codeInfo["modifyuser"] sp.MUserEmail = codeInfo["modifyemail"] sp.LoadScript(code, sp.ScriptFile, true) } AllspidersMapLock.Unlock() } // for k, v := range scriptMap { // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新 // sp := spd.(*Spider) // sp.ScriptFile = v["script"] // if v["createuser"] != "" { // sp.UserName = v["createuser"] // } // if v["createuseremail"] != "" { // sp.UserEmail = v["createuseremail"] // } // sp.MUserName = v["modifyuser"] // sp.MUserEmail = v["modifyemail"] // //sp.LoadScript(k, sp.ScriptFile, true) //更新上架,重载脚本 // Allspiders.Store(k, sp) // logger.Info("上架重载脚本", sp.Code) // } // } } //获取随机数 func GetRandMath(num int) int { r := rand.New(rand.NewSource(time.Now().UnixNano())) return r.Intn(num) } //获取hascode func GetHas1(data string) string { t := sha1.New() io.WriteString(t, data) hf := Reg.FindString(data) if !strings.HasSuffix(hf, "/") { hf = hf + "/" } return hf + fmt.Sprintf("%x", t.Sum(nil)) } //对href哈希取模 func HexToBigIntMod(href string) int { //取哈希值 t := sha256.New() io.WriteString(t, href) hex := fmt.Sprintf("%x", t.Sum(nil)) //取模 n := new(big.Int) n, _ = n.SetString(hex[2:], 16) return int(n.Mod(n, big.NewInt(16)).Int64()) }