/** 爬虫,脚本接口,需要扩展 */ package spider import ( "crypto/sha1" "crypto/sha256" "fmt" "io" "log" "math/big" "math/rand" mu "mfw/util" qu "qfw/util" es "qfw/util/elastic" "regexp" util "spiderutil" "strings" "sync" "sync/atomic" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) //爬虫() type Spider struct { Script Code string //代码 Name string //名称 DownDetail bool //是否下载详细页 Stop bool //停止标志 Pass bool //暂停标志 LastPubshTime int64 //最后发布时间 LastHeartbeat int64 //最后心跳时间 SpiderRunRate int64 //执行频率 ExecuteOkTime int64 //任务执行成功/完成时间 Collection string //写入表名 Thread int64 //线程数 LastExecTime int64 //最后执行时间 LastDowncount int32 //最后一次下载量 TodayDowncount int32 //今日下载量 YesterdayDowncount int32 //昨日下载量 TotalDowncount int32 //总下载量 RoundCount int32 //执行轮次 StoreMode int //存储模式 StoreToMsgEvent int //消息类型 CoverAttr string //按属性判重数据 SleepBase int //基本延时 SleepRand int //随机延时 TargetChannelUrl string //栏目页地址 UpperLimit, LowerLimit int //正常值上限、下限 UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间 MUserName, MUserEmail string //维护人,维护人邮箱 Index int //数组索引 //历史补漏 IsHistoricalMend bool //是否是历史补漏爬虫 IsMustDownload bool //是否强制下载 IsCompete bool //区分新老爬虫 Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权 } var Es *es.Elastic var EsIndex string var EsType string var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态 var SP = make(chan bool, 5) var TimeChan = make(chan bool, 1) var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`) var DelaySiteMap map[string]*DelaySite //延迟采集站点集合 type DelaySite struct { DelayTime int Compete bool } //高性能模式定时采集三级页信息 func DetailData() { defer qu.Catch() <-InitAllLuaOver //脚本加载完毕,执行 if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true GetListDataDownloadDetail() } } func GetListDataDownloadDetail() { defer qu.Catch() logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++") Allspiders.Range(func(k, v interface{}) bool { go DownloadHighDetail(k.(string)) time.Sleep(2 * time.Second) return true }) } //高性能模式根据列表页数据下载三级页 func DownloadHighDetail(code string) { defer qu.Catch() for { //logger.Info("爬虫代码:", s.Code, "已下架:", s.Stop) //if !s.Stop { //爬虫是运行状态 /* 1、每轮开始先查询当天下载的数据 2、本次查询无数据依次向前推一天查询数据(暂定50条数据) */ o := map[string]interface{}{"_id": 1} //排序 f := map[string]interface{}{ //查询字段 "state": 0, "comeintime": 0, "event": 0, } q := map[string]interface{}{ "spidercode": code, "state": 0, //0:入库状态;-1:采集失败;1:成功 } list := &[]map[string]interface{}{} //查询数据的集合 for day := 0; day <= util.Config.DayNum; day++ { startTime := GetTime(-day) comeintime := map[string]interface{}{"$gte": startTime} //指定查询数据的时间 if day != 0 { //不是当天,指定数据范围 comeintime["$lt"] = GetTime(-day + 1) } //} else if code == "a_gcy_mcgg" { //延迟采集站点(延迟采集站点不加入多线程采集luaspecialcode库中) // endTime := time.Now().Unix() - 12*3600 // if endTime > startTime { // comeintime = map[string]interface{}{ // "$gte": startTime, // "$lt": endTime, // } // } else { // continue // } // //} q["comeintime"] = comeintime list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100) //logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list)) if list != nil && len(*list) > 0 { break } else { time.Sleep(1 * time.Second) } } if list != nil && len(*list) > 0 { spChan := make(chan *Spider, len(AllspidersMap[code])) AllspidersMapLock.Lock() for _, sp := range AllspidersMap[code] { spChan <- sp } AllspidersMapLock.Unlock() wg := &sync.WaitGroup{} for _, l := range *list { spTmp := <-spChan wg.Add(1) go func(tmp map[string]interface{}, sp *Spider) { defer func() { spChan <- sp wg.Done() }() _id := tmp["_id"] query := map[string]interface{}{"_id": _id} href := qu.ObjToString(tmp["href"]) hashHref := HexText(href) //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在 //为了避免重复下载,进行全量redis判重 isExist := util.RedisClusterExists(hashHref) if isExist { set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1 MgoS.Update("spider_highlistdata", query, set, false, false) return } //if code == "a_gcy_mcgg" { //竞品数据es title判重 // title := qu.ObjToString(tmp["title"]) // eTime := time.Now().Unix() // sTime := eTime - int64(7*86400) // esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` // count := Es.Count(EsIndex, EsType, esQuery) // if count > 0 { //es中含本title数据,不再采集,更新list表数据状态 // set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1 // MgoS.Update("spider_highlistdata", query, set, false, false) // util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365) // return // } //} //competehref := qu.ObjToString(tmp["competehref"]) //if competehref != "" { //验证三方网站数据剑鱼是否已采集 // title := qu.ObjToString(tmp["title"]) // one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title}) // if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息 // set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1 // MgoS.Update("spider_highlistdata", query, set, false, false) // return // } //} times := qu.IntAll(tmp["times"]) success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = sp.DownloadDetailPage(tmp, data) if err != nil || data == nil { success = false times++ if err != nil { logger.Error(sp.Code, err, tmp) if len(tmp) > 0 { SaveErrorData(sp.MUserName, tmp, err) //保存错误信息 } } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 util.RedisClusterSet(hashHref, "", -1) } if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} MgoS.Update("spider_highlistdata", query, set, false, false) return } else if data["delete"] != nil { //三级页过滤 util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}} MgoS.Update("spider_highlistdata", query, set, false, false) return } //正文、附件分析,下载异常数据重新下载 if AnalysisProjectInfo(data) { times++ ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 ss["detailfilerr"] = true } set := map[string]interface{}{"$set": ss} MgoS.Update("spider_highlistdata", query, set, false, false) return } t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = sp.Code data["dataging"] = 0 data["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) data["infoformat"] = sp.Infoformat //爬虫类型 Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1 MgoS.Update("spider_highlistdata", query, set, false, false) }(l, spTmp) } wg.Wait() //一轮次跑完重载脚本 ReloadScript(code) } else { //没有数据 time.Sleep(2 * time.Minute) } } } //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功 func AnalysisProjectInfo(data map[string]interface{}) bool { defer qu.Catch() detail := qu.ObjToString(data["detail"]) if detail == "详情请访问原网页!" || detail == "
详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页” if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 { if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 { fileOk := false for _, data := range attachments { if d, ok := data.(map[string]interface{}); ok { fid := qu.ObjToString(d["fid"]) if fid != "" { //附件上传成功 fileOk = true break } } } return !fileOk } else { return true } } else { return true } } return false } //下载解析内容页 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) { defer mu.Catch() s.LastHeartbeat = time.Now().Unix() util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan) tab := s.L.NewTable() for k, v := range param { if val, ok := v.(string); ok { tab.RawSet(lua.LString(k), lua.LString(val)) } else if val, ok := v.(int64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(int32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(bool); ok { tab.RawSet(lua.LString(k), lua.LBool(val)) } } var err error if err = s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadDetailPage"), NRet: 1, Protect: true, }, tab); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) return data, err } lv := s.L.Get(-1) s.L.Pop(1) //拼map if v3, ok := lv.(*lua.LTable); ok { v3.ForEach(func(k, v lua.LValue) { if tmp, ok := k.(lua.LString); ok { key := string(tmp) if value, ok := v.(lua.LString); ok { data[key] = string(value) } else if value, ok := v.(lua.LNumber); ok { data[key] = value } else if value, ok := v.(*lua.LTable); ok { tmp := util.TableToMap(value) data[key] = tmp } } }) return data, err } else { return nil, err } } //重载脚本 func ReloadScript(code string) { scriptMap := getSpiderScriptDB(code) if codeInfo := scriptMap[code]; codeInfo != nil { AllspidersMapLock.Lock() for _, sp := range AllspidersMap[code] { sp.ScriptFile = codeInfo["script"] if codeInfo["createuser"] != "" { sp.UserName = codeInfo["createuser"] } if codeInfo["createuseremail"] != "" { sp.UserEmail = codeInfo["createuseremail"] } sp.MUserName = codeInfo["modifyuser"] sp.MUserEmail = codeInfo["modifyemail"] sp.LoadScript(&sp.Name, code, sp.ScriptFile, true) } AllspidersMapLock.Unlock() } // for k, v := range scriptMap { // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新 // sp := spd.(*Spider) // sp.ScriptFile = v["script"] // if v["createuser"] != "" { // sp.UserName = v["createuser"] // } // if v["createuseremail"] != "" { // sp.UserEmail = v["createuseremail"] // } // sp.MUserName = v["modifyuser"] // sp.MUserEmail = v["modifyemail"] // //sp.LoadScript(k, sp.ScriptFile, true) //更新上架,重载脚本 // Allspiders.Store(k, sp) // logger.Info("上架重载脚本", sp.Code) // } // } } //获取随机数 func GetRandMath(num int) int { r := rand.New(rand.NewSource(time.Now().UnixNano())) return r.Intn(num) } //获取hascode func GetHas1(data string) string { t := sha1.New() io.WriteString(t, data) hf := Reg.FindString(data) if !strings.HasSuffix(hf, "/") { hf = hf + "/" } return hf + fmt.Sprintf("%x", t.Sum(nil)) } //对href哈希取模 func HexToBigIntMod(href string) int { //取哈希值 t := sha256.New() io.WriteString(t, href) hex := fmt.Sprintf("%x", t.Sum(nil)) //取模 n := new(big.Int) n, _ = n.SetString(hex[2:], 16) return int(n.Mod(n, big.NewInt(16)).Int64()) } //求hash func HexText(href string) string { h := sha256.New() h.Write([]byte(href)) return fmt.Sprintf("%x", h.Sum(nil)) }