maxiaoshan hace 4 años
padre
commit
04b9400750
Se han modificado 7 ficheros con 930 adiciones y 8 borrados
  1. 725 0
      src/code.go
  2. 6 3
      src/config.json
  3. 15 2
      src/downloadnum.go
  4. 20 0
      src/logs/task.log
  5. 11 2
      src/main.go
  6. 136 0
      src/summary.go
  7. 17 1
      src/task.go

+ 725 - 0
src/code.go

@@ -1 +1,726 @@
 package main
+
+import (
+	"fmt"
+	"math"
+	qu "qfw/util"
+	"sort"
+	"sync"
+	"time"
+)
+
+var (
+	YearMinCodeMap      map[string]bool                               //luayearmincode中,爬虫代码:循环周期
+	SendFirstMap        map[string]*Lua                               //
+	YearMinDownloadNum  int                                           //一年下载最低值
+	IntervalMaxNum      int                                           //区间最大值
+	PublishtimeInterval = []float64{1.0, 3.0, 10.0, 20.0, 31.0, 93.0} //[0,1),[1,3),[3,10),[10,20),[20,31),[31,31*3),[31*3,···)
+	IntervalMap         = map[int]string{
+		1: "[0,1)",
+		2: "[1,3)",
+		3: "[3,10)",
+		4: "[10,20)",
+		5: "[20,31)",
+		6: "[31,93)",
+		7: "[93,···)",
+	}
+	IntervalRotateTime = map[string]int{ //区间爬虫一轮次时间(月)
+		"[0,1)":    3,
+		"[1,3)":    3,
+		"[3,10)":   6,
+		"[10,20)":  6,
+		"[20,31)":  6,
+		"[31,93)":  12,
+		"[93,···)": 12,
+	}
+)
+
+type Lua struct {
+	Site     string
+	Channel  string
+	Modify   string
+	Modifyid string
+	Code     string
+	Event    int
+	Count    int
+}
+
+func LuaYearMinCodeCreateTask() {
+	defer qu.Catch()
+	GetAllLuaYearMinCode() //获取luayearmincode所有爬虫
+	CreateTask()           //
+}
+
+func GetAllLuaYearMinCode() {
+	defer qu.Catch()
+	YearMinCodeMap = map[string]bool{}
+	SendFirstMap = map[string]*Lua{}
+	list, _ := MgoE.Find("luayearmincode", nil, nil, `{"publishtime":0}`, false, -1, -1)
+	for _, l := range *list {
+		code := qu.ObjToString(l["code"])
+		YearMinCodeMap[code] = true
+		sf, _ := l["sendfirst"].(bool)
+		sd, _ := l["send"].(bool)
+		if sf && !sd {
+			lua := &Lua{
+				Site:     qu.ObjToString(l["site"]),
+				Channel:  qu.ObjToString(l["channel"]),
+				Modify:   qu.ObjToString(l["modify"]),
+				Modifyid: qu.ObjToString(l["modifyid"]),
+				Code:     code,
+				Count:    qu.IntAll(l["count"]),
+				Event:    qu.IntAll(l["event"]),
+			}
+			SendFirstMap[code] = lua
+		}
+	}
+}
+
+func CreateTask() {
+	defer qu.Catch()
+	//1.sendfirst建任务(只建一次该任务)
+	CreateFirstCodeTask()
+	//2.根据区间轮循建任务
+	list, _ := MgoE.Find("luayearmincodeinterval", nil, nil, nil, false, -1, -1)
+	for _, l := range *list {
+		CreateTaskByInterval(l)
+	}
+
+}
+
+//根据区间建任务
+func CreateTaskByInterval(l map[string]interface{}) {
+	defer qu.Catch()
+	interval := qu.ObjToString(l["interval"])
+	qu.Debug(interval, "区间开始创建任务...")
+	timesnum := qu.IntAll(l["timesnum"])
+	cycletime := qu.IntAll(l["cycletime"])
+	ct_wg := &sync.WaitGroup{}
+	ct_lock := &sync.Mutex{}
+	ct_ch := make(chan bool, 3)
+	savetaskArr := []map[string]interface{}{}
+	updateArr := [][]map[string]interface{}{}
+	list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`","send":false}`, ``, `{"publishtime":0}`, false, 0, timesnum)
+	for _, l := range *list {
+		ct_wg.Add(1)
+		ct_ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ct_ch
+				ct_wg.Done()
+			}()
+			update := []map[string]interface{}{ //更新
+				map[string]interface{}{"_id": tmp["_id"]},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"send": true,
+					},
+				},
+			}
+			code := qu.ObjToString(tmp["code"])
+			description := ""
+			state := 0 //任务状态
+			/*
+				统计是否有已下几种情况,时间定为一周内数据:
+				1、统计spider_highlistdata是否有下载异常数据
+				2、统计spider_warn异常数据(发布时间异常、乱码)
+				3、统计spider_sitecheck 站点异常爬虫(404)
+			*/
+			stime, etime := GetTime(-cycletime), GetTime(0)
+			//统计周期内下载量
+			query := map[string]interface{}{
+				"spidercode": code,
+				"l_np_publishtime": map[string]interface{}{
+					"$gte": stime,
+					"$lte": etime,
+				},
+			}
+			downloadnum := MgoS.Count("data_bak", query)
+			//1、下载异常
+			query = map[string]interface{}{
+				"comeintime": map[string]interface{}{
+					"$gte": stime,
+					"$lte": etime,
+				},
+				"state":      -1,
+				"spidercode": code,
+			}
+			data_downloaderr, _ := MgoS.Find("spider_highlistdata", query, `{"_id":-1}`, `{"href":1}`, false, 0, 10)
+			if data_downloaderr != nil && len(*data_downloaderr) > 0 {
+				if len(*data_downloaderr) == 10 {
+					state = 1
+				}
+				description += "下载异常:\n"
+				for _, derr := range *data_downloaderr {
+					description += qu.ObjToString(derr["href"]) + "\n"
+				}
+			}
+			//2、发布时间异常、乱码
+			query = map[string]interface{}{
+				"comeintime": map[string]interface{}{
+					"$gte": stime,
+					"$lte": etime,
+				},
+				"level": 2, //2:error数据 1:warn数据
+				"code":  code,
+			}
+			data_warn, _ := MgoS.Find("spider_warn", query, `{"_id":-1}`, `{"href":1,"field":1}`, false, 0, 10)
+			if data_warn != nil && len(*data_warn) > 0 {
+				destmp_publishtime := "发布时间异常:\n"
+				destmp_code := "正文标题异常:\n"
+				for _, dw := range *data_warn {
+					field := qu.ObjToString(dw["field"])
+					if field == "publishtime" {
+						state = 1
+						destmp_publishtime += qu.ObjToString(dw["href"]) + "\n"
+					} else {
+						destmp_code += qu.ObjToString(dw["href"]) + "\n"
+					}
+				}
+				description += destmp_code
+				description += destmp_publishtime
+			}
+			//3、404
+			query = map[string]interface{}{
+				"comeintime": map[string]interface{}{
+					"$gte": stime,
+					"$lte": etime,
+				},
+				"statuscode": 404,
+				"code":       code,
+			}
+			data_404, _ := MgoS.FindOne("spider_sitecheck", query)
+			if data_404 != nil && len(*data_404) > 0 {
+				if downloadnum == 0 { //有采集数据,不认为是404
+					state = 1
+					description += "网站监测:404\n" + qu.ObjToString((*data_404)["url"]) + "\n"
+				}
+			}
+			result := map[string]interface{}{}
+			result["s_code"] = code
+			result["s_site"] = tmp["site"]
+			result["s_channel"] = tmp["channel"]
+			result["s_descript"] = description
+			result["l_comeintime"] = time.Now().Unix()
+			result["l_complete"] = time.Now().AddDate(0, 0, cycletime).Unix()
+			result["s_modifyid"] = tmp["modifyid"]
+			result["s_modify"] = tmp["modify"]
+			result["i_event"] = tmp["event"]
+			result["s_source"] = "程序"
+			result["i_num"] = downloadnum
+			result["i_min"] = 0
+			result["i_state"] = state
+			result["s_type"] = "7"
+			result["s_urgency"] = "1"
+			result["i_times"] = 0
+			result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
+			ct_lock.Lock()
+			savetaskArr = append(savetaskArr, result)
+			updateArr = append(updateArr, update)
+			ct_lock.Unlock()
+		}(l)
+	}
+	ct_wg.Wait()
+	ct_lock.Lock()
+	if len(savetaskArr) > 0 {
+		MgoE.SaveBulk("task", savetaskArr...)
+		savetaskArr = []map[string]interface{}{}
+	}
+	if len(updateArr) > 0 {
+		MgoE.UpdateBulk("luayearmincode", updateArr...)
+		updateArr = [][]map[string]interface{}{}
+	}
+	ct_lock.Unlock()
+	//time.AfterFunc(time.Duration(cycletime)*time.Second, func() { CreateTaskByInterval(l) })
+	time.AfterFunc(time.Duration(cycletime*24)*time.Hour, func() { CreateTaskByInterval(l) })
+}
+
+//历史数据采集为0的建任务
+func CreateFirstCodeTask() {
+	defer qu.Catch()
+	qu.Debug("开始创建sendfirst任务...")
+	stime := time.Now().AddDate(-1, 0, 0).Unix()
+	etime := GetTime(0)
+	cl_wg := &sync.WaitGroup{}
+	cl_lock := &sync.Mutex{}
+	cl_ch := make(chan bool, 3)
+	savetaskArr := []map[string]interface{}{}
+	updateArr := [][]map[string]interface{}{}
+	for _, lua := range SendFirstMap {
+		cl_wg.Add(1)
+		cl_ch <- true
+		go func(l *Lua) {
+			defer func() {
+				<-cl_ch
+				cl_wg.Done()
+			}()
+			update := []map[string]interface{}{ //更新
+				map[string]interface{}{"code": l.Code},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"send": true,
+					},
+				},
+			}
+			result := map[string]interface{}{}
+			result["s_code"] = l.Code
+			result["s_site"] = l.Site
+			result["s_channel"] = l.Channel
+			result["s_descript"] = "下载量异常:\n一年内数据下载量:" + fmt.Sprint(l.Count)
+			result["l_comeintime"] = time.Now().Unix()
+			result["l_complete"] = time.Now().AddDate(1, 0, 0).Unix()
+			result["s_modifyid"] = l.Modifyid
+			result["s_modify"] = l.Modify
+			result["i_event"] = l.Event
+			result["s_source"] = "程序"
+			result["i_num"] = l.Count
+			result["i_min"] = 0
+			result["i_state"] = 0
+			result["s_type"] = "7"
+			result["s_urgency"] = "1"
+			result["i_times"] = 0
+			result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
+			cl_lock.Lock()
+			savetaskArr = append(savetaskArr, result)
+			updateArr = append(updateArr, update)
+			if len(savetaskArr) > 500 {
+				MgoE.SaveBulk("task", savetaskArr...)
+				savetaskArr = []map[string]interface{}{}
+			}
+			if len(updateArr) > 500 {
+				MgoE.UpdateBulk("luayearmincode", updateArr...)
+				updateArr = [][]map[string]interface{}{}
+			}
+			cl_lock.Unlock()
+		}(lua)
+	}
+	cl_wg.Wait()
+	cl_lock.Lock()
+	if len(savetaskArr) > 0 {
+		MgoE.SaveBulk("task", savetaskArr...)
+		savetaskArr = []map[string]interface{}{}
+	}
+	if len(updateArr) > 0 {
+		MgoE.UpdateBulk("luayearmincode", updateArr...)
+		updateArr = [][]map[string]interface{}{}
+	}
+	cl_lock.Unlock()
+	SendFirstMap = map[string]*Lua{}
+	qu.Debug("sendfirst任务创建完毕...")
+}
+
+//计算循环周期和每轮新建任务爬虫的个数
+func CycleTime() {
+	defer qu.Catch()
+	for k, interval := range IntervalMap {
+		cycletime := -1
+		if k == 1 { //区间在[0,1),循环周期设置为10天
+			cycletime = 10
+		} else if k == 2 || k == 3 { //confinval最大值都在x以下,可设置为x天
+			list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":-1}`, `{"confinval":1}`, false, 0, 1)
+			if list != nil && len(*list) == 1 {
+				cycletime = qu.IntAll((*list)[0]["confinval"])
+			}
+		} else if k == 4 || k == 5 || k == 6 { //最大值90%都在x以下,可设置为x天
+			percent := 0.9
+			if k == 6 {
+				percent = 0.5
+			}
+			count := MgoE.Count("luayearmincode", `{"interval":"`+interval+`"}`)
+			index := int(math.Floor(float64(count) * percent))
+			list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":1}`, `{"confinval":1}`, false, 0, index+1)
+			if list != nil && len(*list) == index+1 {
+				cycletime = qu.IntAll((*list)[index]["confinval"])
+			}
+		} else if k == 7 {
+			cycletime = 180
+		}
+		updata := map[string]interface{}{
+			"$set": map[string]interface{}{
+				"cycletime": cycletime,
+				"send":      false,
+			},
+		}
+		MgoE.Update("luayearmincode", `{"interval":"`+interval+`"}`, updata, false, true)
+		q := map[string]interface{}{
+			"interval": interval,
+			"sendfirst": map[string]interface{}{
+				"$exists": false,
+			},
+		}
+		count := MgoE.Count("luayearmincode", q)
+		t := float64((count * cycletime)) / float64((30 * IntervalRotateTime[interval]))
+		rotateNum := math.Ceil(t)
+		text := interval + ",总数:" + fmt.Sprint(count) + "," + fmt.Sprint(30*IntervalRotateTime[interval]) + "天发送完毕。每" + fmt.Sprint(cycletime) + "天轮循一次,一次发送" + fmt.Sprint(rotateNum) + "条"
+		qu.Debug(text)
+		MgoE.Save("luayearmincodeinterval", map[string]interface{}{"interval": interval, "timesnum": int(rotateNum), "cycletime": cycletime, "text": text})
+	}
+}
+
+//标记数据
+func TagCode() {
+	defer qu.Catch()
+	sess := MgoE.GetMgoConn()
+	defer MgoE.DestoryMongoConn(sess)
+	ch := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	arr := [][]map[string]interface{}{}
+	it := sess.DB("editor").C("luayearmincode").Find(nil).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			update := []map[string]interface{}{}
+			update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			set := map[string]interface{}{}
+			//code := qu.ObjToString(tmp["code"])
+			count := qu.IntAll(tmp["count"])
+			if count == 1 || count == 0 { //爬虫下载量为1,放入第7区间
+				set["interval"] = IntervalMap[7]
+				if count == 0 {
+					set["sendfirst"] = true
+				}
+			} else {
+				var tmpArr Int64Slice
+				for _, tp := range tmp["publishtime"].([]interface{}) {
+					tmpArr = append(tmpArr, tp.(int64))
+				}
+				sort.Sort(tmpArr) //发布时间排序
+				//
+				intervalNumArr := map[int][]float64{} //记录每个区间发布时间间隔信息
+				for i, p := range tmpArr {
+					if i == 0 {
+						continue
+					}
+					dval := float64(p-tmpArr[i-1]) / 86400
+					//计算区间
+					intervalNum := -1                        //区间
+					for j, pi := range PublishtimeInterval { //1.0, 3.0, 10.0, 20.0, 31.0, 93.0
+						if dval == pi {
+							intervalNum = j + 2
+							break
+						} else if dval < pi {
+							intervalNum = j + 1
+							break
+						}
+					}
+					if intervalNum == -1 { //如果为初始值,证明dval大于93
+						intervalNum = 7
+					}
+					intervalNumArr[intervalNum] = append(intervalNumArr[intervalNum], dval)
+				}
+				//
+				maxIn := 0    //记录最大区间
+				maxInLen := 0 //记录最大区间长度
+				flag := true  //记录是否只有第一区间有值
+				for in := 1; in <= 7; in++ {
+					lens := len(intervalNumArr[in])
+					if (in == 1 && lens == 0) || (in != 1 && lens > 0) {
+						flag = false
+					}
+					if in != 1 && lens >= maxInLen {
+						maxInLen = lens
+						maxIn = in
+					}
+				}
+				//qu.Debug(flag, "最大区间:", maxIn, "最大区间长度:", maxInLen)
+				if flag { //只有第一区间有值
+					if count < IntervalMaxNum { //划分到第七区间,直接新建任务
+						set["sendfirst"] = true
+						set["interval"] = IntervalMap[7]
+					} else {
+						set["interval"] = IntervalMap[1]
+					}
+				} else if maxIn != 0 && maxInLen != 0 {
+					sumInval := float64(0)
+					for _, inval := range intervalNumArr[maxIn] {
+						sumInval += inval
+					}
+					mean := sumInval / float64(maxInLen)
+					se := mean / math.Pow(float64(maxInLen), 0.5)
+					confInval := math.Ceil(mean + se*2.32)
+					set["confinval"] = int(confInval) //置信区间
+					set["interval"] = IntervalMap[maxIn]
+				} else {
+					qu.Debug("错误数据id:", tmp["_id"])
+				}
+			}
+			if len(set) > 0 {
+				update = append(update, map[string]interface{}{"$set": set})
+			}
+			lock.Lock()
+			if len(update) == 2 {
+				arr = append(arr, update)
+			}
+			if len(arr) >= 500 {
+				tmps := arr
+				MgoE.UpdateBulk("luayearmincode", tmps...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoE.UpdateBulk("luayearmincode", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	qu.Debug("标记完成")
+}
+
+//统计爬虫下载量
+func GetSpidercode() {
+	defer qu.Catch()
+	query := map[string]interface{}{
+		"$or": []interface{}{
+			map[string]interface{}{"state": 5},
+			map[string]interface{}{
+				"state": map[string]interface{}{
+					"$in": []int{0, 1, 2},
+				},
+				"event": map[string]interface{}{
+					"$ne": 7000,
+				},
+			},
+		},
+	}
+	codeMap := map[string]*Lua{}
+	luas, _ := MgoE.Find("luaconfig", query, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
+	for _, l := range *luas {
+		pc := l["param_common"].([]interface{})
+		lua := &Lua{
+			Modify:   qu.ObjToString(l["createuser"]),
+			Modifyid: qu.ObjToString(l["createuserid"]),
+			Event:    qu.IntAll(l["event"]),
+		}
+		if len(pc) > 2 {
+			lua.Site = qu.ObjToString(pc[1])
+			lua.Channel = qu.ObjToString(pc[2])
+		}
+		code := qu.ObjToString(l["code"])
+		codeMap[code] = lua
+	}
+	qu.Debug("开始统计...", len(codeMap))
+	sess := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess)
+	q := map[string]interface{}{
+		"publishtime": map[string]interface{}{
+			"$gte": time.Now().AddDate(-1, 0, 0).Unix(),
+			"$lte": time.Now().Unix(),
+		},
+	}
+	f := map[string]interface{}{
+		"spidercode":  1,
+		"publishtime": 1,
+	}
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	codeNum := map[string]int{}
+	codePublishtime := map[string][]int64{}
+	i := 0
+	it1 := sess.DB("spider").C("data_bak").Find(&q).Select(&f).Iter()
+	for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			publishtime := qu.Int64All(tmp["publishtime"])
+			if publishtime > 0 {
+				spidercode := qu.ObjToString(tmp["spidercode"])
+				if codeMap[spidercode] != nil {
+					lock.Lock()
+					codeNum[spidercode] += 1
+					if codeNum[spidercode] > YearMinDownloadNum {
+						lock.Unlock()
+						return
+					}
+					codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
+					lock.Unlock()
+				}
+			}
+		}(tmp)
+		if i%1000 == 0 {
+			qu.Debug(i)
+		}
+		tmp = map[string]interface{}{}
+	}
+	qu.Debug("data_bak查询完毕", len(codeNum))
+	i = 0
+	it2 := sess.DB("spider").C("data_bak_202011030854").Find(&q).Select(&f).Iter()
+	for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			publishtime := qu.Int64All(tmp["publishtime"])
+			if publishtime > 0 {
+				spidercode := qu.ObjToString(tmp["spidercode"])
+				if codeMap[spidercode] != nil {
+					lock.Lock()
+					codeNum[spidercode] += 1
+					if codeNum[spidercode] > YearMinDownloadNum {
+						lock.Unlock()
+						return
+					}
+					codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
+					lock.Unlock()
+				}
+			}
+		}(tmp)
+		if i%1000 == 0 {
+			qu.Debug(i)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	qu.Debug("data_bak_202011030854查询完毕", len(codeNum))
+	for code, num := range codeNum {
+		lua := codeMap[code]
+		delete(codeMap, code)
+		if num <= YearMinDownloadNum {
+			parr := codePublishtime[code]
+			//sort.Sort(parr)
+			MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": num, "publishtime": parr, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid})
+		}
+	}
+	for code, lua := range codeMap { //下载量为0
+		MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": 0, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid, "publishtime": []int64{}})
+	}
+	qu.Debug("统计完毕...")
+}
+
+//补充信息
+func getlua() {
+	luas, _ := MgoE.Find("luaconfig", nil, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
+	for i, l := range *luas {
+		qu.Debug(i)
+		pc := l["param_common"].([]interface{})
+		Site := ""
+		Channel := ""
+		if len(pc) > 2 {
+			Site = qu.ObjToString(pc[1])
+			Channel = qu.ObjToString(pc[2])
+		}
+		Modify := qu.ObjToString(l["createuser"])
+		Modifyid := qu.ObjToString(l["createuserid"])
+		code := qu.ObjToString(l["code"])
+		MgoE.Update("luayearmincode", `{"code":"`+code+`"}`, map[string]interface{}{"$set": map[string]interface{}{"site": Site, "channel": Channel, "modify": Modify, "modifyid": Modifyid}}, false, false)
+	}
+}
+
+//分组查询
+func GetSpidercode_back() {
+	defer qu.Catch()
+	qu.Debug("开始统计...")
+	sess := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess)
+	q := map[string]interface{}{
+		"publishtime": map[string]interface{}{
+			"$gte": time.Now().AddDate(-1, 0, 0).Unix(),
+			"$lte": time.Now().Unix(),
+		},
+	}
+	g := map[string]interface{}{
+		"_id":   "$spidercode",
+		"count": map[string]interface{}{"$sum": 1},
+	}
+	pro := map[string]interface{}{
+		"spidercode": 1,
+	}
+	s := map[string]interface{}{
+		"count": 1,
+	}
+	p := []map[string]interface{}{
+		map[string]interface{}{"$match": q},
+		map[string]interface{}{"$project": pro},
+		map[string]interface{}{"$group": g},
+		map[string]interface{}{"$sort": s},
+	}
+	it1 := sess.DB("spider").C("data_bak").Pipe(p).Iter()
+	codeCount := map[string]int{}
+	i := 0
+	for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
+		code := qu.ObjToString(tmp["_id"])
+		count := qu.IntAll(tmp["count"])
+		qu.Debug(code, count)
+		if count <= YearMinDownloadNum {
+			codeCount[code] = count
+		} else {
+			break
+		}
+		if i%50 == 0 {
+			qu.Debug(i)
+		}
+	}
+	i = 0
+	it2 := sess.DB("spider").C("data_bak_202011030854").Pipe(p).Iter()
+	for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
+		code := qu.ObjToString(tmp["_id"])
+		count := qu.IntAll(tmp["count"])
+		qu.Debug(code, count)
+		if count <= YearMinDownloadNum {
+			codeCount[code] += count
+		} else {
+			break
+		}
+		if i%50 == 0 {
+			qu.Debug(i)
+		}
+	}
+	for code, count := range codeCount {
+		if count <= 100 {
+			MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": count})
+		}
+	}
+	qu.Debug("统计数量完毕...")
+	list, _ := MgoE.Find("luayearmincode", nil, nil, nil, false, -1, -1)
+	for _, l := range *list {
+		code := qu.ObjToString(l["code"])
+		count := qu.IntAll(l["count"])
+		if count > YearMinDownloadNum {
+			continue
+		}
+		d1s, _ := MgoS.Find("data_bak", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
+		d2s, _ := MgoS.Find("data_bak_202011030854", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
+		var publishtimeArr Int64Slice
+		for _, d1 := range *d1s {
+			publishtime := qu.Int64All(d1["publishtime"])
+			if publishtime > 0 {
+				publishtimeArr = append(publishtimeArr, publishtime)
+			}
+		}
+		for _, d2 := range *d2s {
+			publishtime := qu.Int64All(d2["publishtime"])
+			if publishtime > 0 {
+				publishtimeArr = append(publishtimeArr, publishtime)
+			}
+		}
+		sort.Sort(publishtimeArr)
+		MgoE.Update("luayearmincode", map[string]interface{}{"_id": l["_id"]}, map[string]interface{}{"$set": map[string]interface{}{"publishtime": publishtimeArr}}, false, false)
+	}
+	qu.Debug("统计完毕...")
+}
+
+//自定义[]int64数组排序
+type Int64Slice []int64
+
+func (p Int64Slice) Len() int           { return len(p) }
+func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p Int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }

+ 6 - 3
src/config.json

@@ -2,17 +2,18 @@
 	"spider":{
     	"addr": "192.168.3.207:27092",
 		"db": "spider",
-		"size": 10
+		"size": 15
     },
     "editor": {
     	"addr": "192.168.3.207:27092",
 		"db": "editor",
-		"size": 10
+		"size": 15
     },
     "everydaydownload": "0 0 1 ? * MON-FRI",
     "updatestatecron": "0 0 6 ? * *",
 	"createtaskcron": "0 0 8 ? * MON-FRI",
 	"closetaskcron": "0 0 9 ? * MON-FRI",
+	"codesummarycron": "0 30 8 ? * *",
 	"closenum": 2,
 	"daynum": 6,
 	"downloadcheck": {
@@ -31,5 +32,7 @@
             "downratio": 40,
             "upratio": 300
         }
-    }
+    },
+    "yearmindownload": 100,
+    "intervalmaxnum": 30
 }

+ 15 - 2
src/downloadnum.go

@@ -37,7 +37,7 @@ func GetDownloadNumber() {
 		timeStr := time.Now().AddDate(0, 0, yesterday).Format("2006-01-02")
 		startTime := GetTime(yesterday)
 		endTime := startTime + 86400
-		spiders := getAllSpider() //获取所有爬虫
+		spiders := getAllSpider() //获取所有已运行爬虫
 		logger.Debug(timeStr, "上架的爬虫个数:", len(spiders))
 		lock := &sync.Mutex{}
 		wg := &sync.WaitGroup{}
@@ -153,7 +153,20 @@ func getAllSpider() map[string]map[string]interface{} {
 		"code":         1,
 		"param_common": 1,
 	}
-	luas, _ := MgoE.Find("luaconfig", map[string]interface{}{"state": 5}, nil, fields, false, -1, -1)
+	query := map[string]interface{}{
+		"$or": []interface{}{
+			map[string]interface{}{"state": 5},
+			map[string]interface{}{
+				"state": map[string]interface{}{
+					"$in": []int{0, 1, 2},
+				},
+				"event": map[string]interface{}{
+					"$ne": 7000,
+				},
+			},
+		},
+	}
+	luas, _ := MgoE.Find("luaconfig", query, nil, fields, false, -1, -1)
 	reps := map[string]map[string]interface{}{}
 	for _, lua := range *luas {
 		rep := map[string]interface{}{}

+ 20 - 0
src/logs/task.log

@@ -486,3 +486,23 @@
 2021/04/25 13:34:08 task.go:616: debug  共有异常爬虫数据量 2147 条
 2021/04/25 13:35:30 task.go:824: debug  ---任务创建完成---
 2021/05/21 15:58:05 task.go:895: info  -----更新数据状态-----
+2021/07/08 15:40:29 task.go:895: info  -----更新数据状态-----
+2021/07/08 15:42:35 task.go:897: info  -----更新数据状态-----
+2021/07/08 15:42:35 task.go:94: debug  ---开始统计下载失败信息---
+2021/07/08 15:42:35 task.go:199: debug  ---开始统计重采失败信息---
+2021/07/08 15:42:35 task.go:221: debug  共有重采失败数据 0 条
+2021/07/08 15:42:35 task.go:293: debug  ---统计重采失败信息完成---
+2021/07/08 15:42:35 task.go:299: debug  ---开始统计信息异常数据---
+2021/07/08 15:42:35 task.go:321: debug  共有信息异常数据 0 条
+2021/07/08 15:42:35 task.go:386: debug  ---统计信息异常数据完成---
+2021/07/08 15:42:35 task.go:392: debug  ---开始统计栏目地址404数据---
+2021/07/08 15:42:35 task.go:407: debug  共有404地址 0 条
+2021/07/08 15:42:35 task.go:437: debug  ---统计栏目地址404数据完成---
+2021/07/08 15:42:35 task.go:443: debug  ---开始统计下载量异常数据---
+2021/07/08 15:42:35 task.go:461: debug  共有下载量异常数据 0 条
+2021/07/08 15:42:35 task.go:532: debug  ---统计下载量异常数据完成---
+2021/07/08 15:42:35 task.go:538: debug  ---开始保存信息---
+2021/07/08 15:42:35 task.go:586: debug  ---保存信息完成---
+2021/07/08 15:42:35 task.go:607: debug  ---开始创建任务---
+2021/07/08 15:42:35 task.go:623: debug  共有异常爬虫数据量 0 条
+2021/07/08 15:42:35 task.go:843: debug  ---任务创建完成---

+ 11 - 2
src/main.go

@@ -63,19 +63,28 @@ func init() {
 	UpdateStateCron = qu.ObjToString(Config["updatestatecron"])
 	CreateTaskCron = qu.ObjToString(Config["createtaskcron"])
 	CloseTaskCron = qu.ObjToString(Config["closetaskcron"])
+	CodeSummaryCron = qu.ObjToString(Config["codesummarycron"])
 	CloseNum = qu.IntAll(Config["closenum"])
 	DayNum = qu.IntAll(Config["daynum"])
+	//
+	YearMinDownloadNum = qu.IntAll(Config["yearmindownload"])
+	IntervalMaxNum = qu.IntAll(Config["intervalmaxnum"])
 }
 
 func main() {
+	//低采集量爬虫新建任务
+	LuaYearMinCodeCreateTask()
 	c := cron.New()
 	c.Start()
 	c.AddFunc(EveryDayDownloadTime, GetDownloadNumber) //统计下载量
 	c.AddFunc(UpdateStateCron, ResetDataState)         //更新数据状态
 	c.AddFunc(CreateTaskCron, CreateTaskProcess)       //创建任务
 	c.AddFunc(CloseTaskCron, CloseTask)                //关闭任务
-
-	defer c.Stop()
+	c.AddFunc(CodeSummaryCron, SummaryCode)            //上架爬虫信息汇总
+	//统计爬虫历史下载量制定任务周期
+	// GetSpidercode()
+	// TagCode()
+	// CycleTime()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 136 - 0
src/summary.go

@@ -0,0 +1,136 @@
+package main
+
+import (
+	qu "qfw/util"
+	"sync"
+)
+
+func SummaryCode() {
+	defer qu.Catch()
+	query := map[string]interface{}{
+		"$or": []interface{}{
+			map[string]interface{}{"state": 5},
+			map[string]interface{}{
+				"state": map[string]interface{}{
+					"$in": []int{0, 1, 2},
+				},
+				"event": map[string]interface{}{
+					"$ne": 7000,
+				},
+			},
+		},
+	}
+	sm_ch := make(chan bool, 1)
+	sm_wg := &sync.WaitGroup{}
+	sm_lock := &sync.Mutex{}
+	sm_stime, sm_etime := GetTime(-1), GetTime(0)
+	arr := []map[string]interface{}{}
+	luas, _ := MgoE.Find("luaconfig", query, nil, `{"code":1,"event":1,"param_common":1,"model":1,"platform":1,"createuser":1,"createuserid":1}`, false, -1, -1)
+	for _, l := range *luas {
+		sm_wg.Add(1)
+		sm_ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-sm_ch
+				sm_wg.Done()
+			}()
+			result := map[string]interface{}{}
+			code := qu.ObjToString(tmp["code"])
+			result["code"] = code
+			result["modify"] = tmp["createuser"]
+			result["modifyid"] = tmp["createuserid"]
+			result["event"] = tmp["event"]
+			result["platform"] = tmp["platform"]
+			//1、统计data_bak下载量
+			//统计周期内下载量
+			query := map[string]interface{}{
+				"spidercode": code,
+				"l_np_publishtime": map[string]interface{}{
+					"$gte": sm_stime,
+					"$lte": sm_etime,
+				},
+			}
+			databaknum := MgoS.Count("data_bak", query)
+			result["download"] = databaknum
+			//2、统计spider_highlistdata下载量和下载失败量
+			timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout)
+			query = map[string]interface{}{
+				"spidercode": code,
+				"publishtime": map[string]interface{}{
+					"$regex": timestr,
+				},
+			}
+			hl_allnum := MgoS.Count("spider_highlistdata", query)
+			result["hl_download"] = hl_allnum
+			query = map[string]interface{}{
+				"spidercode": code,
+				"publishtime": map[string]interface{}{
+					"$regex": timestr,
+				},
+				"state": -1,
+			}
+			hl_errnum := MgoS.Count("spider_highlistdata", query)
+			result["hl_downloaderr"] = hl_errnum
+			//3、查询spider_sitecheck中url状态码
+			query = map[string]interface{}{
+				"code": code,
+				"comeintime": map[string]interface{}{
+					"$gte": sm_stime,
+					"$lte": sm_etime,
+				},
+			}
+			data, _ := MgoS.FindOne("spider_sitecheck", query)
+			if data != nil && len(*data) > 0 {
+				result["statuscode"] = qu.Int64All((*data)["statuscode"])
+			}
+			//4、查询spider_warn爬虫的下载错误信息
+			errinfo := map[string]interface{}{}
+			fnMap_lev1 := map[string]int{}
+			fnMap_lev2 := map[string]int{}
+			warnDatas, _ := MgoS.Find("spider_warn", query, nil, `{"field":1,"level":1}`, false, -1, -1)
+			for _, d := range *warnDatas {
+				field := qu.ObjToString(d["field"])
+				level := qu.IntAll(d["level"])
+				if level == 1 {
+					fnMap_lev1[field] += 1
+				} else {
+					fnMap_lev2[field] += 1
+				}
+			}
+			if len(fnMap_lev1) > 0 {
+				errinfo["1"] = fnMap_lev1
+			}
+			if len(fnMap_lev2) > 0 {
+				errinfo["2"] = fnMap_lev2
+			}
+			result["errinfo"] = errinfo
+			pc := l["param_common"].([]interface{})
+			if len(pc) > 2 {
+				result["site"] = pc[1]
+				result["channel"] = pc[2]
+			}
+			if len(pc) > 12 {
+				result["url"] = pc[11]
+			}
+			if model, ok := tmp["model"].(map[string]interface{}); ok && model != nil {
+				result["area"] = model["area"]
+				result["city"] = model["city"]
+				result["district"] = model["district"]
+			}
+			sm_lock.Lock()
+			arr = append(arr, result)
+			if len(arr) > 500 {
+				tmps := arr
+				MgoS.SaveBulk("spider_summaryinfo", tmps...)
+				arr = []map[string]interface{}{}
+			}
+			sm_lock.Unlock()
+		}(l)
+	}
+	sm_wg.Wait()
+	if len(arr) > 0 {
+		MgoS.SaveBulk("spider_summaryinfo", arr...)
+		arr = []map[string]interface{}{}
+	}
+
+}

+ 17 - 1
src/task.go

@@ -27,6 +27,7 @@ var (
 	UpdateStateCron string                    //每天关闭任务的时间
 	CreateTaskCron  string                    //每天创建任务的时间
 	CloseTaskCron   string                    //每天关闭任务的时间
+	CodeSummaryCron string                    //每天统计爬虫信息
 	CloseNum        int                       //关闭几天的任务
 	DayNum          int                       //更新数据天数
 	UserTaskNum     map[string]map[string]int //记录每人每天新建任务量
@@ -88,7 +89,7 @@ func CloseTask() {
 	logger.Debug("---清理未更新任务完毕---")
 }
 
-//1、统计三级页下载失败数据
+//1、统计三级页下载失败数据(放到ResetDataState后是因为不会影响统计)
 func GetDownloadFailedData() {
 	defer qu.Catch()
 	logger.Debug("---开始统计下载失败信息---")
@@ -405,6 +406,18 @@ func GetStatusCodeErrorData() {
 	logger.Debug("共有404地址", len(*list), "条")
 	for _, tmp := range *list {
 		code := qu.ObjToString(tmp["code"])
+		//判断3天内是否有采集数据,有则不建404任务
+		stime, etime := GetTime(-3), GetTime(0)
+		q := map[string]interface{}{
+			"spidercode": code,
+			"l_np_publishtime": map[string]interface{}{
+				"$gte": stime,
+				"$lte": etime,
+			},
+		}
+		if MgoS.Count("data_bak", q) > 0 { //有采集数据,不认为是404
+			continue
+		}
 		href := qu.ObjToString(tmp["url"])
 		site := qu.ObjToString(tmp["site"])
 		channel := qu.ObjToString(tmp["channel"])
@@ -546,6 +559,9 @@ func SaveResult() {
 				<-ch
 				wg.Done()
 			}()
+			if YearMinCodeMap[t.Code] { //luayearmincode中爬虫任务删除
+				return
+			}
 			result := map[string]interface{}{}
 			result["code"] = t.Code
 			result["site"] = t.Site