maxiaoshan 4 anni fa
parent
commit
2878e06fb2
2 ha cambiato i file con 111 aggiunte e 53 eliminazioni
  1. 2 2
      src/main.go
  2. 109 51
      src/summary.go

+ 2 - 2
src/main.go

@@ -77,10 +77,10 @@ func main() {
 	c := cron.New()
 	c.Start()
 	c.AddFunc(EveryDayDownloadTime, GetDownloadNumber) //统计下载量
-	c.AddFunc(UpdateStateCron, ResetDataState)         //更新数据状态
 	c.AddFunc(CreateTaskCron, CreateTaskProcess)       //创建任务
-	c.AddFunc(CloseTaskCron, CloseTask)                //关闭任务
 	c.AddFunc(CodeSummaryCron, SummaryCode)            //上架爬虫信息汇总
+	c.AddFunc(UpdateStateCron, ResetDataState)         //更新数据状态
+	c.AddFunc(CloseTaskCron, CloseTask)                //关闭任务
 	//统计爬虫历史下载量制定任务周期
 	// GetSpidercode()
 	// TagCode()

+ 109 - 51
src/summary.go

@@ -3,12 +3,88 @@ package main
 import (
 	qu "qfw/util"
 	"sync"
+	"time"
 )
 
 func SummaryCode() {
-	qu.Debug("上架爬虫信息汇总开始...")
 	defer qu.Catch()
+	qu.Debug("上架爬虫信息汇总开始...")
+	qu.Debug("开始统计spider_highlisthdata信息...")
+	//统计spider_highlisthdata信息
+	codeHlistDnumMap := map[string]int{} //记录爬虫昨天下载量
+	codeErrDnumMap := map[string]int{}   //记录爬虫昨天下载失败量
+	sm_ch1 := make(chan bool, 5)
+	sm_wg1 := &sync.WaitGroup{}
+	sm_lock1 := &sync.Mutex{}
+	sm_stime, sm_etime := GetTime(-1), GetTime(0)
+	sess_s := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess_s)
+	timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout)
 	query := map[string]interface{}{
+		"publishtime": map[string]interface{}{
+			"$regex": timestr,
+		},
+	}
+	fs := map[string]interface{}{
+		"spidercode": 1,
+		"state":      1,
+	}
+	count, _ := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Count()
+	qu.Debug(timestr, "spider_highlisthdata共采集数据:", count)
+	it_sh := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Select(&fs).Iter()
+	for tmp := make(map[string]interface{}); it_sh.Next(&tmp); {
+		sm_wg1.Add(1)
+		sm_ch1 <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-sm_ch1
+				sm_wg1.Done()
+			}()
+			state := qu.IntAll(tmp["state"])
+			code := qu.ObjToString(tmp["spidercode"])
+			sm_lock1.Lock()
+			if state == -1 {
+				codeErrDnumMap[code]++
+			}
+			codeHlistDnumMap[code]++
+			sm_lock1.Unlock()
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	qu.Debug("spider_highlistdata采集信息的爬虫总量:", len(codeHlistDnumMap), "	下载失败爬虫的总量:", len(codeErrDnumMap))
+	qu.Debug("开始统计data_bak信息...")
+	codeDbakDnumMap := map[string]int{} //记录爬虫昨天下载量
+	query = map[string]interface{}{
+		"l_np_publishtime": map[string]interface{}{
+			"$gte": sm_stime,
+			"$lte": sm_etime,
+		},
+	}
+	fs = map[string]interface{}{
+		"spidercode": 1,
+	}
+	count, _ = sess_s.DB("spider").C("data_bak").Find(&query).Count()
+	qu.Debug(timestr, "data_bak共采集数据:", count)
+	it_sd := sess_s.DB("spider").C("data_bak").Find(&query).Select(&fs).Iter()
+	for tmp := make(map[string]interface{}); it_sd.Next(&tmp); {
+		sm_wg1.Add(1)
+		sm_ch1 <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-sm_ch1
+				sm_wg1.Done()
+			}()
+			code := qu.ObjToString(tmp["spidercode"])
+			sm_lock1.Lock()
+			codeDbakDnumMap[code]++
+			sm_lock1.Unlock()
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	sm_wg1.Wait()
+	qu.Debug("data_bak采集信息的爬虫总量:", len(codeDbakDnumMap))
+	//统计爬虫
+	query = map[string]interface{}{
 		"$or": []interface{}{
 			map[string]interface{}{"state": 5},
 			map[string]interface{}{
@@ -21,14 +97,13 @@ func SummaryCode() {
 			},
 		},
 	}
-	sm_ch := make(chan bool, 3)
-	sm_wg := &sync.WaitGroup{}
-	sm_lock := &sync.Mutex{}
-	sm_stime, sm_etime := GetTime(-1), GetTime(0)
+	sm_ch2 := make(chan bool, 5)
+	sm_wg2 := &sync.WaitGroup{}
+	sm_lock2 := &sync.Mutex{}
 	arr := []map[string]interface{}{}
-	sess := MgoE.GetMgoConn()
-	defer MgoE.DestoryMongoConn(sess)
-	f := map[string]interface{}{
+	sess_e := MgoE.GetMgoConn()
+	defer MgoE.DestoryMongoConn(sess_e)
+	fe := map[string]interface{}{
 		"code":         1,
 		"event":        1,
 		"param_common": 1,
@@ -37,15 +112,15 @@ func SummaryCode() {
 		"createuser":   1,
 		"createuserid": 1,
 	}
-	it := sess.DB("editor").C("luaconfig").Find(&query).Select(&f).Iter()
+	it_e := sess_e.DB("editor").C("luaconfig").Find(&query).Select(&fe).Iter()
 	n := 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		sm_wg.Add(1)
-		sm_ch <- true
+	for tmp := make(map[string]interface{}); it_e.Next(&tmp); n++ {
+		sm_wg2.Add(1)
+		sm_ch2 <- true
 		go func(tmp map[string]interface{}) {
 			defer func() {
-				<-sm_ch
-				sm_wg.Done()
+				<-sm_ch2
+				sm_wg2.Done()
 			}()
 			result := map[string]interface{}{}
 			code := qu.ObjToString(tmp["code"])
@@ -54,53 +129,31 @@ func SummaryCode() {
 			result["modifyid"] = tmp["createuserid"]
 			result["event"] = tmp["event"]
 			result["platform"] = tmp["platform"]
+			result["comeintime"] = time.Now().Unix()
 			//1、统计data_bak下载量
-			//统计周期内下载量
-			query := map[string]interface{}{
-				"spidercode": code,
-				"l_np_publishtime": map[string]interface{}{
-					"$gte": sm_stime,
-					"$lte": sm_etime,
-				},
-			}
-			databaknum := MgoS.Count("data_bak", query)
-			result["download"] = databaknum
+			result["download"] = codeDbakDnumMap[code]
 			//2、统计spider_highlistdata下载量和下载失败量
-			timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout)
-			query = map[string]interface{}{
-				"spidercode": code,
-				"publishtime": map[string]interface{}{
-					"$regex": timestr,
-				},
-			}
-			hl_allnum := MgoS.Count("spider_highlistdata", query)
-			result["hl_download"] = hl_allnum
-			query = map[string]interface{}{
-				"spidercode": code,
-				"publishtime": map[string]interface{}{
-					"$regex": timestr,
-				},
-				"state": -1,
-			}
-			hl_errnum := MgoS.Count("spider_highlistdata", query)
-			result["hl_downloaderr"] = hl_errnum
+			result["hl_download"] = codeHlistDnumMap[code]
+			result["hl_downloaderr"] = codeErrDnumMap[code]
 			//3、查询spider_sitecheck中url状态码
-			query = map[string]interface{}{
+			q := map[string]interface{}{
 				"code": code,
 				"comeintime": map[string]interface{}{
 					"$gte": sm_stime,
 					"$lte": sm_etime,
 				},
 			}
-			data, _ := MgoS.FindOne("spider_sitecheck", query)
+			data, _ := MgoS.FindOne("spider_sitecheck", q) //spider_sitecheck只记录了错误状态码爬虫
 			if data != nil && len(*data) > 0 {
 				result["statuscode"] = qu.Int64All((*data)["statuscode"])
+			} else {
+				result["statuscode"] = 200
 			}
 			//4、查询spider_warn爬虫的下载错误信息
 			errinfo := map[string]interface{}{}
 			fnMap_lev1 := map[string]int{}
 			fnMap_lev2 := map[string]int{}
-			warnDatas, _ := MgoS.Find("spider_warn", query, nil, `{"field":1,"level":1}`, false, -1, -1)
+			warnDatas, _ := MgoS.Find("spider_warn", q, nil, `{"field":1,"level":1}`, false, -1, -1)
 			for _, d := range *warnDatas {
 				field := qu.ObjToString(d["field"])
 				level := qu.IntAll(d["level"])
@@ -117,6 +170,7 @@ func SummaryCode() {
 				errinfo["2"] = fnMap_lev2
 			}
 			result["errinfo"] = errinfo
+			//
 			pc := tmp["param_common"].([]interface{})
 			if len(pc) > 2 {
 				result["site"] = pc[1]
@@ -126,21 +180,25 @@ func SummaryCode() {
 				result["url"] = pc[11]
 			}
 			if model, ok := tmp["model"].(map[string]interface{}); ok && model != nil {
-				result["area"] = model["area"]
-				result["city"] = model["city"]
-				result["district"] = model["district"]
+				result["area"] = qu.ObjToString(model["area"])
+				result["city"] = qu.ObjToString(model["city"])
+				result["district"] = qu.ObjToString(model["district"])
 			}
-			sm_lock.Lock()
+			sm_lock2.Lock()
 			arr = append(arr, result)
 			if len(arr) > 500 {
 				tmps := arr
 				MgoS.SaveBulk("spider_summaryinfo", tmps...)
 				arr = []map[string]interface{}{}
 			}
-			sm_lock.Unlock()
+			sm_lock2.Unlock()
 		}(tmp)
+		if n%500 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
 	}
-	sm_wg.Wait()
+	sm_wg2.Wait()
 	if len(arr) > 0 {
 		MgoS.SaveBulk("spider_summaryinfo", arr...)
 		arr = []map[string]interface{}{}