Эх сурвалжийг харах

站点信息统计定时任务

maxiaoshan 2 жил өмнө
parent
commit
f10aa174cb

+ 2 - 1
src/config.json

@@ -135,6 +135,7 @@
 	"spiderweeklyreportcron": "0 50 6 ? * SUN",
 	"luamovecron": "0 0 8 1 * *",
 	"updateluausercron": "0 30 23 ? * *",
+	"updatesitecron": "0 30 9 ? * *",
 	"closenum": 2,
 	"daynum": 6,
 	"mail": {
@@ -143,6 +144,6 @@
 		"user": "public03@topnet.net.cn",
 		"pwd": "ue9Rg9Sf4CVtdm5a",
 		"retry": 3,
-		"to": "chenjiakang@topnet.net.cn,kouheyang@topnet.net.cn,wangdanting@topnet.net.cn"
+		"to": "wangdanting@topnet.net.cn"
 	}
 }

+ 6 - 6
src/main.go

@@ -41,19 +41,19 @@ func main() {
 	c := cron.New()
 	c.Start()
 	//定时任务
-	c.AddFunc(util.RandomDataPushCron, timetask.PushSpiderWarnErrData) //数据维护平台-爬虫数据维护数据数据统计
-	c.AddFunc(util.LuamoveCron, timetask.LuaMoveEvent)                 //数据维护平台-爬虫数据维护数据数据统计
+	c.AddFunc(util.RandomDataPushCron, timetask.PushSpiderWarnErrData) //数据重采平台-爬虫数据维护数据数据统计
+	c.AddFunc(util.LuamoveCron, timetask.LuaMoveEvent)                 //数据重采平台-爬虫数据维护数据数据统计
 	//
-	c.AddFunc(util.FileWarnCron, timetask.GetFileWarn) //异常附件数据警告
+	//c.AddFunc(util.FileWarnCron, timetask.GetFileWarn) //异常附件数据警告
 	//
 	c.AddFunc(util.QyworkRemindModifyuserCron, timetask.SendInfoToWxWork_Tomodifyuser) //企业微信日常警告,爬虫开发人员告警信息
 	c.AddFunc(util.QyworkRemindAuditorCron, timetask.SendInfoToWxWork_ToAuditor)       //企业微信日常警告,审核人员告警信息
 	c.AddFunc(util.UpdateLuaUserCron, timetask.UpdateLuaUser)                          //更新外包爬虫所属人
-
-	c.AddFunc(util.MoveListDataCron, timetask.MoveListData) //列表页数据迁移
+	c.AddFunc(util.MoveListDataCron, timetask.MoveListData)                            //列表页数据迁移
+	c.AddFunc(util.UpdateSiteCron, timetask.UpdateSiteInfo)                            //站点信息更新
 	//爬虫任务
 	c.AddFunc(util.ResetDataStateCron, luatask.ResetDataState) //重置数据
-	c.AddFunc(util.StartTaskCron, luatask.StartTask)           //开始任务
+	c.AddFunc(util.StartTaskCron, luatask.StartTask)           //爬虫维护任务
 	//爬虫相关周报统计
 	c.AddFunc(util.SpiderWeeklyReportCron, timetask.SpiderWeeklyReport)
 	//lua小组周报

+ 242 - 0
src/timetask/site.go

@@ -0,0 +1,242 @@
+package timetask
+
+import (
+	qu "qfw/util"
+	"sync"
+	"time"
+	"util"
+)
+
+type SiteInfo struct {
+	Lasttime    int64
+	DownloadNum int
+}
+
+var siteMap map[string]*SiteInfo
+
+func UpdateSiteInfo() {
+	qu.Debug("开始更新站点最新数据...")
+	siteMap = map[string]*SiteInfo{}
+	GetSite()            //获取现有站点信息
+	GetSiteLasttime()    //更新最新发布数据日期
+	GetSiteDownloadNum() //更新前一天数据发布量
+	UpdateSite()         //更新信息
+	siteMap = map[string]*SiteInfo{}
+	qu.Debug("更新站点最新数据结束...")
+}
+func GetSite() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	fields := map[string]interface{}{
+		"site":     1,
+		"lasttime": 1,
+	}
+	it := sess.DB(util.MgoEB.DbName).C("site").Find(nil).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			lasttime := qu.Int64All(tmp["lasttime"])
+			lock.Lock()
+			siteMap[site] = &SiteInfo{
+				Lasttime:    lasttime,
+				DownloadNum: 0,
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+}
+
+func GetSiteLasttime() {
+	GetLuaSiteLasttime()
+	GetPythonSiteLasttime()
+}
+
+func GetLuaSiteLasttime() {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": util.GetTime(-1),
+			"$lte": time.Now().Unix(),
+		},
+	}
+	fields := map[string]interface{}{
+		"site":        1,
+		"publishtime": 1,
+	}
+	it := sess.DB(util.MgoS.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			publishtime := qu.Int64All(tmp["publishtime"])
+			lock.Lock()
+			if sInfo := siteMap[site]; sInfo != nil {
+				if sInfo.Lasttime < publishtime && publishtime > 1000000000 {
+					sInfo.Lasttime = publishtime
+				}
+			} else {
+				qu.Debug("站点信息不存在:", site)
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+}
+
+func GetPythonSiteLasttime() {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": util.GetTime(-1),
+			"$lte": time.Now().Unix(),
+		},
+	}
+	fields := map[string]interface{}{
+		"site":             1,
+		"l_np_publishtime": 1,
+	}
+	it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			publishtime := qu.Int64All(tmp["l_np_publishtime"])
+			lock.Lock()
+			if sInfo := siteMap[site]; sInfo != nil {
+				if sInfo.Lasttime < publishtime && publishtime > 1000000000 {
+					sInfo.Lasttime = publishtime
+				}
+			} else {
+				qu.Debug("站点信息不存在:", site)
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+}
+
+func GetSiteDownloadNum() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": util.GetTime(0),
+		},
+	}
+	fields := map[string]interface{}{
+		"site":              1,
+		"repeatptimeallnum": 1,
+	}
+	it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			num := qu.IntAll(tmp["repeatptimeallnum"])
+			lock.Lock()
+			if sInfo := siteMap[site]; sInfo != nil {
+				sInfo.DownloadNum += num
+			} else {
+				qu.Debug("站点信息不存在:", site)
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+}
+
+func UpdateSite() {
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	arr := [][]map[string]interface{}{}
+	for site, sInfo := range siteMap {
+		ch <- true
+		wg.Add(1)
+		go func(s string, sInfo *SiteInfo) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			update := []map[string]interface{}{
+				{"site": s},
+				{"$set": map[string]interface{}{
+					"lasttime":     sInfo.Lasttime,
+					"site_datanum": sInfo.DownloadNum,
+				}},
+			}
+			lock.Lock()
+			arr = append(arr, update)
+			if len(arr) > 500 {
+				util.MgoEB.UpdateBulk("site", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(site, sInfo)
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		util.MgoEB.UpdateBulk("site", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+}

+ 2 - 0
src/util/config.go

@@ -31,6 +31,7 @@ var (
 	SpiderWeeklyReportCron     string        //周报统计
 	LuamoveCron                string        //每月1日统计要转移节点的爬虫
 	UpdateLuaUserCron          string        //每天更新外包爬虫到内部人员
+	UpdateSiteCron             string        //每天更新站点信息
 	CloseNum                   int           //关闭几天的任务
 	DayNum                     int           //更新数据天数
 	CodeEventModel             map[int]int   //节点对应的采集模式0:老模式;1:新模式
@@ -101,6 +102,7 @@ func InitOther() {
 	SpiderWeeklyReportCron = qu.ObjToString(Config["spiderweeklyreportcron"])
 	LuamoveCron = qu.ObjToString(Config["luamovecron"])
 	UpdateLuaUserCron = qu.ObjToString(Config["updateluausercron"])
+	UpdateSiteCron = qu.ObjToString(Config["updatesitecron"])
 
 	CloseNum = qu.IntAll(Config["closenum"])
 	DayNum = qu.IntAll(Config["daynum"])