package main import ( "fmt" "math" qu "qfw/util" "sort" "sync" "time" ) var ( YearMinCodeMap map[string]bool //luayearmincode中,爬虫代码:循环周期 SendFirstMap map[string]*Lua // YearMinDownloadNum int //一年下载最低值 IntervalMaxNum int //区间最大值 PublishtimeInterval = []float64{1.0, 3.0, 10.0, 20.0, 31.0, 93.0} //[0,1),[1,3),[3,10),[10,20),[20,31),[31,31*3),[31*3,···) IntervalMap = map[int]string{ 1: "[0,1)", 2: "[1,3)", 3: "[3,10)", 4: "[10,20)", 5: "[20,31)", 6: "[31,93)", 7: "[93,···)", } IntervalRotateTime = map[string]int{ //区间爬虫一轮次时间(月) "[0,1)": 3, "[1,3)": 3, "[3,10)": 6, "[10,20)": 6, "[20,31)": 6, "[31,93)": 12, "[93,···)": 12, } ) type Lua struct { Site string Channel string Modify string Modifyid string Code string Event int Count int } func LuaYearMinCodeCreateTask() { defer qu.Catch() GetAllLuaYearMinCode() //获取luayearmincode所有爬虫 CreateTask() // } func GetAllLuaYearMinCode() { defer qu.Catch() YearMinCodeMap = map[string]bool{} SendFirstMap = map[string]*Lua{} list, _ := MgoE.Find("luayearmincode", nil, nil, `{"publishtime":0}`, false, -1, -1) for _, l := range *list { code := qu.ObjToString(l["code"]) YearMinCodeMap[code] = true sf, _ := l["sendfirst"].(bool) sd, _ := l["send"].(bool) if sf && !sd { lua := &Lua{ Site: qu.ObjToString(l["site"]), Channel: qu.ObjToString(l["channel"]), Modify: qu.ObjToString(l["modify"]), Modifyid: qu.ObjToString(l["modifyid"]), Code: code, Count: qu.IntAll(l["count"]), Event: qu.IntAll(l["event"]), } SendFirstMap[code] = lua } } } func CreateTask() { defer qu.Catch() //1.sendfirst建任务(只建一次该任务) CreateFirstCodeTask() //2.根据区间轮循建任务 list, _ := MgoE.Find("luayearmincodeinterval", nil, nil, nil, false, -1, -1) for _, l := range *list { CreateTaskByInterval(l) } } //根据区间建任务 func CreateTaskByInterval(l map[string]interface{}) { defer qu.Catch() interval := qu.ObjToString(l["interval"]) qu.Debug(interval, "区间开始创建任务...") timesnum := qu.IntAll(l["timesnum"]) cycletime := qu.IntAll(l["cycletime"]) ct_wg := &sync.WaitGroup{} ct_lock := &sync.Mutex{} ct_ch := make(chan bool, 3) savetaskArr := []map[string]interface{}{} updateArr := [][]map[string]interface{}{} list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`","send":false}`, ``, `{"publishtime":0}`, false, 0, timesnum) for _, l := range *list { ct_wg.Add(1) ct_ch <- true go func(tmp map[string]interface{}) { defer func() { <-ct_ch ct_wg.Done() }() update := []map[string]interface{}{ //更新 map[string]interface{}{"_id": tmp["_id"]}, map[string]interface{}{ "$set": map[string]interface{}{ "send": true, }, }, } code := qu.ObjToString(tmp["code"]) description := "" state := 0 //任务状态 /* 统计是否有已下几种情况,时间定为一周内数据: 1、统计spider_highlistdata是否有下载异常数据 2、统计spider_warn异常数据(发布时间异常、乱码) 3、统计spider_sitecheck 站点异常爬虫(404) */ stime, etime := GetTime(-cycletime), GetTime(0) //统计周期内下载量 query := map[string]interface{}{ "spidercode": code, "l_np_publishtime": map[string]interface{}{ "$gte": stime, "$lte": etime, }, } downloadnum := MgoS.Count("data_bak", query) //1、下载异常 query = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, "$lte": etime, }, "state": -1, "spidercode": code, } data_downloaderr, _ := MgoS.Find("spider_highlistdata", query, `{"_id":-1}`, `{"href":1}`, false, 0, 10) if data_downloaderr != nil && len(*data_downloaderr) > 0 { if len(*data_downloaderr) == 10 { state = 1 } description += "下载异常:\n" for _, derr := range *data_downloaderr { description += qu.ObjToString(derr["href"]) + "\n" } } //2、发布时间异常、乱码 query = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, "$lte": etime, }, "level": 2, //2:error数据 1:warn数据 "code": code, } data_warn, _ := MgoS.Find("spider_warn", query, `{"_id":-1}`, `{"href":1,"field":1}`, false, 0, 10) if data_warn != nil && len(*data_warn) > 0 { destmp_publishtime := "发布时间异常:\n" destmp_code := "正文标题异常:\n" for _, dw := range *data_warn { field := qu.ObjToString(dw["field"]) if field == "publishtime" { state = 1 destmp_publishtime += qu.ObjToString(dw["href"]) + "\n" } else { destmp_code += qu.ObjToString(dw["href"]) + "\n" } } description += destmp_code description += destmp_publishtime } //3、404 query = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, "$lte": etime, }, "statuscode": 404, "code": code, } data_404, _ := MgoS.FindOne("spider_sitecheck", query) if data_404 != nil && len(*data_404) > 0 { if downloadnum == 0 { //有采集数据,不认为是404 state = 1 description += "网站监测:404\n" + qu.ObjToString((*data_404)["url"]) + "\n" } } result := map[string]interface{}{} result["s_code"] = code result["s_site"] = tmp["site"] result["s_channel"] = tmp["channel"] result["s_descript"] = description result["l_comeintime"] = time.Now().Unix() result["l_complete"] = time.Now().AddDate(0, 0, cycletime).Unix() result["s_modifyid"] = tmp["modifyid"] result["s_modify"] = tmp["modify"] result["i_event"] = tmp["event"] result["s_source"] = "程序" result["i_num"] = downloadnum result["i_min"] = 0 result["i_state"] = state result["s_type"] = "7" result["s_urgency"] = "1" result["i_times"] = 0 result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout) ct_lock.Lock() savetaskArr = append(savetaskArr, result) updateArr = append(updateArr, update) ct_lock.Unlock() }(l) } ct_wg.Wait() ct_lock.Lock() if len(savetaskArr) > 0 { MgoE.SaveBulk("task", savetaskArr...) savetaskArr = []map[string]interface{}{} } if len(updateArr) > 0 { MgoE.UpdateBulk("luayearmincode", updateArr...) updateArr = [][]map[string]interface{}{} } ct_lock.Unlock() //time.AfterFunc(time.Duration(cycletime)*time.Second, func() { CreateTaskByInterval(l) }) time.AfterFunc(time.Duration(cycletime*24)*time.Hour, func() { CreateTaskByInterval(l) }) } //历史数据采集为0的建任务 func CreateFirstCodeTask() { defer qu.Catch() qu.Debug("开始创建sendfirst任务...") stime := time.Now().AddDate(-1, 0, 0).Unix() etime := GetTime(0) cl_wg := &sync.WaitGroup{} cl_lock := &sync.Mutex{} cl_ch := make(chan bool, 3) savetaskArr := []map[string]interface{}{} updateArr := [][]map[string]interface{}{} for _, lua := range SendFirstMap { cl_wg.Add(1) cl_ch <- true go func(l *Lua) { defer func() { <-cl_ch cl_wg.Done() }() update := []map[string]interface{}{ //更新 map[string]interface{}{"code": l.Code}, map[string]interface{}{ "$set": map[string]interface{}{ "send": true, }, }, } result := map[string]interface{}{} result["s_code"] = l.Code result["s_site"] = l.Site result["s_channel"] = l.Channel result["s_descript"] = "下载量异常:\n一年内数据下载量:" + fmt.Sprint(l.Count) result["l_comeintime"] = time.Now().Unix() result["l_complete"] = time.Now().AddDate(1, 0, 0).Unix() result["s_modifyid"] = l.Modifyid result["s_modify"] = l.Modify result["i_event"] = l.Event result["s_source"] = "程序" result["i_num"] = l.Count result["i_min"] = 0 result["i_state"] = 0 result["s_type"] = "10" result["s_urgency"] = "1" result["i_times"] = 0 result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout) cl_lock.Lock() savetaskArr = append(savetaskArr, result) updateArr = append(updateArr, update) if len(savetaskArr) > 500 { MgoE.SaveBulk("task", savetaskArr...) savetaskArr = []map[string]interface{}{} } if len(updateArr) > 500 { MgoE.UpdateBulk("luayearmincode", updateArr...) updateArr = [][]map[string]interface{}{} } cl_lock.Unlock() }(lua) } cl_wg.Wait() cl_lock.Lock() if len(savetaskArr) > 0 { MgoE.SaveBulk("task", savetaskArr...) savetaskArr = []map[string]interface{}{} } if len(updateArr) > 0 { MgoE.UpdateBulk("luayearmincode", updateArr...) updateArr = [][]map[string]interface{}{} } cl_lock.Unlock() SendFirstMap = map[string]*Lua{} qu.Debug("sendfirst任务创建完毕...") } //计算循环周期和每轮新建任务爬虫的个数 func CycleTime() { defer qu.Catch() for k, interval := range IntervalMap { cycletime := -1 if k == 1 { //区间在[0,1),循环周期设置为10天 cycletime = 10 } else if k == 2 || k == 3 { //confinval最大值都在x以下,可设置为x天 list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":-1}`, `{"confinval":1}`, false, 0, 1) if list != nil && len(*list) == 1 { cycletime = qu.IntAll((*list)[0]["confinval"]) } } else if k == 4 || k == 5 || k == 6 { //最大值90%都在x以下,可设置为x天 percent := 0.9 if k == 6 { percent = 0.5 } count := MgoE.Count("luayearmincode", `{"interval":"`+interval+`"}`) index := int(math.Floor(float64(count) * percent)) list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":1}`, `{"confinval":1}`, false, 0, index+1) if list != nil && len(*list) == index+1 { cycletime = qu.IntAll((*list)[index]["confinval"]) } } else if k == 7 { cycletime = 180 } updata := map[string]interface{}{ "$set": map[string]interface{}{ "cycletime": cycletime, "send": false, }, } MgoE.Update("luayearmincode", `{"interval":"`+interval+`"}`, updata, false, true) q := map[string]interface{}{ "interval": interval, "sendfirst": map[string]interface{}{ "$exists": false, }, } count := MgoE.Count("luayearmincode", q) t := float64((count * cycletime)) / float64((30 * IntervalRotateTime[interval])) rotateNum := math.Ceil(t) text := interval + ",总数:" + fmt.Sprint(count) + "," + fmt.Sprint(30*IntervalRotateTime[interval]) + "天发送完毕。每" + fmt.Sprint(cycletime) + "天轮循一次,一次发送" + fmt.Sprint(rotateNum) + "条" qu.Debug(text) MgoE.Save("luayearmincodeinterval", map[string]interface{}{"interval": interval, "timesnum": int(rotateNum), "cycletime": cycletime, "text": text}) } } //标记数据 func TagCode() { defer qu.Catch() sess := MgoE.GetMgoConn() defer MgoE.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} lock := &sync.Mutex{} arr := [][]map[string]interface{}{} it := sess.DB("editor").C("luayearmincode").Find(nil).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) set := map[string]interface{}{} //code := qu.ObjToString(tmp["code"]) count := qu.IntAll(tmp["count"]) if count == 1 || count == 0 { //爬虫下载量为1,放入第7区间 set["interval"] = IntervalMap[7] if count == 0 { set["sendfirst"] = true } } else { var tmpArr Int64Slice for _, tp := range tmp["publishtime"].([]interface{}) { tmpArr = append(tmpArr, tp.(int64)) } sort.Sort(tmpArr) //发布时间排序 // intervalNumArr := map[int][]float64{} //记录每个区间发布时间间隔信息 for i, p := range tmpArr { if i == 0 { continue } dval := float64(p-tmpArr[i-1]) / 86400 //计算区间 intervalNum := -1 //区间 for j, pi := range PublishtimeInterval { //1.0, 3.0, 10.0, 20.0, 31.0, 93.0 if dval == pi { intervalNum = j + 2 break } else if dval < pi { intervalNum = j + 1 break } } if intervalNum == -1 { //如果为初始值,证明dval大于93 intervalNum = 7 } intervalNumArr[intervalNum] = append(intervalNumArr[intervalNum], dval) } // maxIn := 0 //记录最大区间 maxInLen := 0 //记录最大区间长度 flag := true //记录是否只有第一区间有值 for in := 1; in <= 7; in++ { lens := len(intervalNumArr[in]) if (in == 1 && lens == 0) || (in != 1 && lens > 0) { flag = false } if in != 1 && lens >= maxInLen { maxInLen = lens maxIn = in } } //qu.Debug(flag, "最大区间:", maxIn, "最大区间长度:", maxInLen) if flag { //只有第一区间有值 if count < IntervalMaxNum { //划分到第七区间,直接新建任务 set["sendfirst"] = true set["interval"] = IntervalMap[7] } else { set["interval"] = IntervalMap[1] } } else if maxIn != 0 && maxInLen != 0 { sumInval := float64(0) for _, inval := range intervalNumArr[maxIn] { sumInval += inval } mean := sumInval / float64(maxInLen) se := mean / math.Pow(float64(maxInLen), 0.5) confInval := math.Ceil(mean + se*2.32) set["confinval"] = int(confInval) //置信区间 set["interval"] = IntervalMap[maxIn] } else { qu.Debug("错误数据id:", tmp["_id"]) } } if len(set) > 0 { update = append(update, map[string]interface{}{"$set": set}) } lock.Lock() if len(update) == 2 { arr = append(arr, update) } if len(arr) >= 500 { tmps := arr MgoE.UpdateBulk("luayearmincode", tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%1000 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoE.UpdateBulk("luayearmincode", arr...) arr = [][]map[string]interface{}{} } qu.Debug("标记完成") } //统计爬虫下载量 func GetSpidercode() { defer qu.Catch() query := map[string]interface{}{ "$or": []interface{}{ map[string]interface{}{"state": 5}, map[string]interface{}{ "state": map[string]interface{}{ "$in": []int{0, 1, 2}, }, "event": map[string]interface{}{ "$ne": 7000, }, }, }, } codeMap := map[string]*Lua{} luas, _ := MgoE.Find("luaconfig", query, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1) for _, l := range *luas { pc := l["param_common"].([]interface{}) lua := &Lua{ Modify: qu.ObjToString(l["createuser"]), Modifyid: qu.ObjToString(l["createuserid"]), Event: qu.IntAll(l["event"]), } if len(pc) > 2 { lua.Site = qu.ObjToString(pc[1]) lua.Channel = qu.ObjToString(pc[2]) } code := qu.ObjToString(l["code"]) codeMap[code] = lua } qu.Debug("开始统计...", len(codeMap)) sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) q := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": time.Now().AddDate(-1, 0, 0).Unix(), "$lte": time.Now().Unix(), }, } f := map[string]interface{}{ "spidercode": 1, "publishtime": 1, } ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} codeNum := map[string]int{} codePublishtime := map[string][]int64{} i := 0 it1 := sess.DB("spider").C("data_bak").Find(&q).Select(&f).Iter() for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() publishtime := qu.Int64All(tmp["publishtime"]) if publishtime > 0 { spidercode := qu.ObjToString(tmp["spidercode"]) if codeMap[spidercode] != nil { lock.Lock() codeNum[spidercode] += 1 if codeNum[spidercode] > YearMinDownloadNum { lock.Unlock() return } codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime) lock.Unlock() } } }(tmp) if i%1000 == 0 { qu.Debug(i) } tmp = map[string]interface{}{} } qu.Debug("data_bak查询完毕", len(codeNum)) i = 0 it2 := sess.DB("spider").C("data_bak_202011030854").Find(&q).Select(&f).Iter() for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() publishtime := qu.Int64All(tmp["publishtime"]) if publishtime > 0 { spidercode := qu.ObjToString(tmp["spidercode"]) if codeMap[spidercode] != nil { lock.Lock() codeNum[spidercode] += 1 if codeNum[spidercode] > YearMinDownloadNum { lock.Unlock() return } codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime) lock.Unlock() } } }(tmp) if i%1000 == 0 { qu.Debug(i) } tmp = map[string]interface{}{} } wg.Wait() qu.Debug("data_bak_202011030854查询完毕", len(codeNum)) for code, num := range codeNum { lua := codeMap[code] delete(codeMap, code) if num <= YearMinDownloadNum { parr := codePublishtime[code] //sort.Sort(parr) MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": num, "publishtime": parr, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid}) } } for code, lua := range codeMap { //下载量为0 MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": 0, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid, "publishtime": []int64{}}) } qu.Debug("统计完毕...") } //补充信息 func getlua() { luas, _ := MgoE.Find("luaconfig", nil, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1) for i, l := range *luas { qu.Debug(i) pc := l["param_common"].([]interface{}) Site := "" Channel := "" if len(pc) > 2 { Site = qu.ObjToString(pc[1]) Channel = qu.ObjToString(pc[2]) } Modify := qu.ObjToString(l["createuser"]) Modifyid := qu.ObjToString(l["createuserid"]) code := qu.ObjToString(l["code"]) MgoE.Update("luayearmincode", `{"code":"`+code+`"}`, map[string]interface{}{"$set": map[string]interface{}{"site": Site, "channel": Channel, "modify": Modify, "modifyid": Modifyid}}, false, false) } } //分组查询 func GetSpidercode_back() { defer qu.Catch() qu.Debug("开始统计...") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) q := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": time.Now().AddDate(-1, 0, 0).Unix(), "$lte": time.Now().Unix(), }, } g := map[string]interface{}{ "_id": "$spidercode", "count": map[string]interface{}{"$sum": 1}, } pro := map[string]interface{}{ "spidercode": 1, } s := map[string]interface{}{ "count": 1, } p := []map[string]interface{}{ map[string]interface{}{"$match": q}, map[string]interface{}{"$project": pro}, map[string]interface{}{"$group": g}, map[string]interface{}{"$sort": s}, } it1 := sess.DB("spider").C("data_bak").Pipe(p).Iter() codeCount := map[string]int{} i := 0 for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ { code := qu.ObjToString(tmp["_id"]) count := qu.IntAll(tmp["count"]) qu.Debug(code, count) if count <= YearMinDownloadNum { codeCount[code] = count } else { break } if i%50 == 0 { qu.Debug(i) } } i = 0 it2 := sess.DB("spider").C("data_bak_202011030854").Pipe(p).Iter() for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ { code := qu.ObjToString(tmp["_id"]) count := qu.IntAll(tmp["count"]) qu.Debug(code, count) if count <= YearMinDownloadNum { codeCount[code] += count } else { break } if i%50 == 0 { qu.Debug(i) } } for code, count := range codeCount { if count <= 100 { MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": count}) } } qu.Debug("统计数量完毕...") list, _ := MgoE.Find("luayearmincode", nil, nil, nil, false, -1, -1) for _, l := range *list { code := qu.ObjToString(l["code"]) count := qu.IntAll(l["count"]) if count > YearMinDownloadNum { continue } d1s, _ := MgoS.Find("data_bak", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1) d2s, _ := MgoS.Find("data_bak_202011030854", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1) var publishtimeArr Int64Slice for _, d1 := range *d1s { publishtime := qu.Int64All(d1["publishtime"]) if publishtime > 0 { publishtimeArr = append(publishtimeArr, publishtime) } } for _, d2 := range *d2s { publishtime := qu.Int64All(d2["publishtime"]) if publishtime > 0 { publishtimeArr = append(publishtimeArr, publishtime) } } sort.Sort(publishtimeArr) MgoE.Update("luayearmincode", map[string]interface{}{"_id": l["_id"]}, map[string]interface{}{"$set": map[string]interface{}{"publishtime": publishtimeArr}}, false, false) } qu.Debug("统计完毕...") } //自定义[]int64数组排序 type Int64Slice []int64 func (p Int64Slice) Len() int { return len(p) } func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] } func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }