123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046 |
- package luatask
- import (
- "encoding/json"
- "fmt"
- qu "qfw/util"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "util"
- "github.com/donnie4w/go-logger/logger"
- )
- //采集频率异常、列表页异常、404异常、下载异常、运行异常、时间异常、数据异常
- const TASK_RATEERR, TASK_LISTERR, TASK_404ERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2
- //失败占比
- const FailedPercentLimit = 0.20
- //失败条数
- const FailedNumLimit = 3
- var CodeInfoMap map[string]*Spider
- var AllHref map[string]string
- var SameDayHref map[string]string
- var DataBakAllHref map[string]string
- var StateFeedBackErr = map[int]string{
- 0: "timeout",
- 200: "analysis",
- 404: "download",
- 500: "server",
- }
- var PythonErrTypeInfoMap = map[string]ErrTypeInfo{
- "download": ErrTypeInfo{
- ErrType: TASK_404ERR,
- Remark: "下载异常",
- },
- "server": ErrTypeInfo{
- ErrType: TASK_DOWNLOADERR,
- Remark: "服务异常",
- },
- "analysis": ErrTypeInfo{
- ErrType: TASK_RUNERR,
- Remark: "解析异常",
- },
- "timeout": ErrTypeInfo{
- ErrType: TASK_TIMEERR,
- Remark: "超时异常",
- },
- }
- var LuaErrTypeInfoMap = map[string]ErrTypeInfo{
- "download": ErrTypeInfo{
- ErrType: TASK_DOWNLOADERR,
- Remark: "下载异常",
- },
- "regather": ErrTypeInfo{
- ErrType: TASK_RUNERR,
- Remark: "运行异常",
- },
- "publishtime": ErrTypeInfo{
- ErrType: TASK_TIMEERR,
- Remark: "时间异常",
- },
- "text": ErrTypeInfo{
- ErrType: TASK_DATAERR,
- Remark: "数据异常",
- },
- }
- //spider
- type Spider struct {
- Site string `json:"site"` //站点
- Platform string `json:"platform"` //平台
- Code string `json:"spidercode"` //爬虫
- Channel string `json:"channel"` //栏目
- AuditTime int64 `json:"audittime"` //最新审核时间
- ModifyUser string `json:"modifyuser"` //维护人
- ModifyId string `json:"modifyid"` //维护人id
- Event int `json:"event"` //节点
- State int `json:"state"` //状态
- PendState int `json:"pendstate"` //挂起状态
- Weight int `json:"weight"` //爬虫权重
- FrequencyErrTimes int `json:"frequencyerrtimes"` //爬虫采集频率异常次数
- MaxPage int `json:"maxpage"` //采集最大页
- Model int `json:"model"` //采集模式(新\老) 0:老模式;1:新模式
- Working int `json:"working"` //采集模式(高低\性能)0:高性能模式;1:队列模式
- ListIsFilter bool `json:"listisfilter"` //lua列表页采集是否包含过滤
- //基于comeintime不去重的下载量
- DownloadAllNum int `json:"downloadallnum"` //总下载量
- DownloadSuccessNum int `json:"downloadsuccessnum"` //下载成功量
- DownloadFailedNum int `json:"downloadfailednum"` //下载失败量
- NoDownloadNum int `json:"nodownloadnum"` //未下载量
- //基于comeintime不去重的当天下载量
- PTimeAllNum int `json:"ptimeallnum"` //当天总下载量
- PTimeSuccessNum int `json:"ptimesuccessnum"` //当天下载成功量
- PTimeFailedNum int `json:"ptimefailednum"` //当天下载失败量
- PTimeNoDownloadNum int `json:"ptimenodownloadnum"` //当天未下载量
- //基于comeintime去重的下载量
- RepeatDownloadAllNum int `json:"repeatdownloadallnum"` //总下载量
- RepeatDownloadSuccessNum int `json:"repeatdownloadsuccessnum"` //下载成功量
- RepeatDownloadFailedNum int `json:"repeatdownloadfailednum"` //下载失败量
- RepeatNoDownloadNum int `json:"repeatnodownloadnum"` //未下载量
- //基于comeintime去重的当天下载量
- RepeatPTimeAllNum int `json:"repeatptimeallnum"` //当天总下载量
- RepeatPTimeSuccessNum int `json:"repeatptimesuccessnum"` //当天下载成功量
- RepeatPTimeSuccessDataBakNum int `json:"repeatptimesuccessdbnum"` //data_bak当天发布数据量
- RepeatPTimeFailedNum int `json:"repeatptimefailednum"` //当天下载失败量
- RepeatPTimeNoDownloadNum int `json:"repeatptimenodownloadnum"` //当天未下载量
- ListDownloadAllTimes int `json:"listdownloadalltimes"` //一天内列表页总下载次数
- ListOhPercentTimes int `json:"listohpercenttimes"` //列表页采集百分百次数
- ListNoDataTimes int `json:"listnodatatimes"` //一天内列表页下载无数据次数
- Comeintime int64 `json:"comeintime"` //入库时间
- ListHeart int64 `json:"listheart"` //列表页执行心跳
- DetailHeart int64 `json:"detailheart"` //详情页执行心跳
- FindListHeart int64 `json:"findlistheart"` //列表页获得数据量心跳
- DetailExecuteHeart int64 `json:"detailexecuteheart"` //详情页下载成功心跳
- Error map[string]*ErrorInfo `json:"error"`
- //OhPercentTimes int `json:"ohpercentimes"` //采集量占总下载量100%的次数
- //NtPercentTime int `json:"ntpercentimes"` //采集量占总下载量90%-100%的次数
- //EtPercentTime int `json:"etpercentimes"` //采集量占总下载量80%-90%的次数
- }
- //spider:错误异常
- type ErrorInfo struct {
- Num int //错误条数
- Err []*ErrRemark //错误详情
- }
- //spider
- type ErrRemark struct {
- Href string //链接
- Remark string //异常说明
- }
- //task
- type Task struct {
- Platform string //平台
- Code string //爬虫代码
- Site string //站点
- Channel string //栏目
- ModifyUser string //维护人员
- ModifyId string //维护人员id
- ErrType int //异常类型:8:采集频率异常;7:列表页异常;5:下载异常;4:运行异常;3:发布时间异常;2:数据异常;1:数据量异常
- Description string //描述
- State int //状态
- Event int //节点
- Num int //下载量
- FrequencyErrTimes int //爬虫采集频率异常次数
- DescribeMap map[int]string
- //ErrInfo map[string]map[string]interface{} //异常集合
- }
- //task:任务异常类型信息
- type ErrTypeInfo struct {
- ErrType int //任务异常类型
- Remark string //异常类型说明
- }
- var (
- StartTime int64 //上一个工作日的起始时间
- EndTime int64 //上一个工作日的结束时间
- Publishtime string //发布时间
- TaskMap map[string]*Task //任务集合
- UserTaskNum map[string]map[string]int //记录每人每天新建任务量
- CodeLock = &sync.Mutex{}
- //
- )
- func StartTask() {
- InitInfo() //初始化时间
- logger.Debug(StartTime, EndTime, Publishtime)
- GetCodeBaseInfo() //初始化爬虫基本信息
- GetBiddingCount() //统计bidding表爬虫采集量
- GetCodeHeart() //初始化爬虫心跳信息
- GetSpiderHighListDownloadNum() //统计spider_highlistdata爬虫列表页下载量、下载失败量、未下载量
- GetSpiderListDownloadNum() //统计spider_listdata爬虫列表页下载量、下载失败量、未下载量
- GetSpiderDataBakDownloadNum() //统计data_bak爬虫下载量
- GetSpiderDownloadRateDataNew() //下载率
- GetSpiderWarnErrData() //异常信息
- GetPythonWarnErrData() //python相关
- //SaveCodeInfo()
- CreateTaskProcess()
- // GetDownloadNumber() //统计下载量
- //CloseTask() //关闭任务
- SendInfoToWxWork_SiteDataCount()
- SendLuaPythonAllNum()
- }
- //初始化
- func InitInfo() {
- defer qu.Catch()
- CodeInfoMap = map[string]*Spider{} //初始化
- AllHref = map[string]string{}
- SameDayHref = map[string]string{}
- DataBakAllHref = map[string]string{}
- UserTaskNum = map[string]map[string]int{}
- StartTime, EndTime = util.GetWorkDayTimeUnix()
- Publishtime = qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
- //StartTime = util.GetTime(-1)
- //EndTime = util.GetTime(0)
- }
- // GetCodeBaseInfo 准备爬虫基本信息
- func GetCodeBaseInfo() {
- defer qu.Catch()
- sess := util.MgoE.GetMgoConn()
- defer util.MgoE.DestoryMongoConn(sess)
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- query := map[string]interface{}{
- "$or": []interface{}{
- //lua、python上线爬虫
- map[string]interface{}{
- "state": map[string]interface{}{
- "$in": []int{5, 11}, //上架、上线爬虫
- },
- },
- //lua正在被维护的爬虫和上架爬虫
- map[string]interface{}{
- "platform": "golua平台",
- "state": map[string]interface{}{
- "$in": []int{0, 1, 2}, //待完成、待审核、未通过
- },
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- },
- //python正在被维护的爬虫和上线爬虫
- map[string]interface{}{
- "platform": "python",
- "state": map[string]interface{}{
- "$in": []int{1, 2, 6}, //待审核、未通过、已下架
- },
- },
- },
- }
- fieles := map[string]interface{}{
- "event": 1,
- "param_common": 1,
- "platform": 1,
- "modifyuser": 1,
- "modifyuserid": 1,
- "state": 1,
- "pendstate": 1,
- "weight": 1,
- "l_uploadtime": 1,
- "listisfilter": 1,
- "frequencyerrtimes": 1,
- "code": 1,
- }
- count := util.MgoE.Count("luaconfig", query)
- logger.Debug("共加载线上爬虫个数:", count)
- it := sess.DB(util.MgoE.DbName).C("luaconfig").Find(&query).Select(&fieles).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- sp := &Spider{
- Error: map[string]*ErrorInfo{},
- }
- if param_common, ok := tmp["param_common"].([]interface{}); ok && len(param_common) >= 6 {
- //sp.Code = qu.ObjToString(param_common[0])
- sp.Site = qu.ObjToString(param_common[1])
- sp.Channel = qu.ObjToString(param_common[2])
- sp.MaxPage = qu.IntAll(param_common[5])
- } else {
- logger.Debug("加载爬虫出错:", tmp["_id"])
- }
- sp.Code = qu.ObjToString(tmp["code"])
- sp.ModifyUser = qu.ObjToString(tmp["modifyuser"])
- sp.ModifyId = qu.ObjToString(tmp["modifyuserid"])
- sp.AuditTime = qu.Int64All(tmp["l_uploadtime"])
- sp.Platform = qu.ObjToString(tmp["platform"])
- sp.Event = qu.IntAll(tmp["event"])
- sp.State = qu.IntAll(tmp["state"])
- sp.PendState = qu.IntAll(tmp["pendstate"])
- sp.Weight = qu.IntAll(tmp["weight"])
- if sp.Platform == "python" {
- sp.ListIsFilter = false
- } else {
- sp.ListIsFilter = tmp["listisfilter"].(bool)
- }
- sp.FrequencyErrTimes = qu.IntAll(tmp["frequencyerrtimes"])
- sp.Model = util.CodeEventModel[sp.Event]
- sp.Working = util.CodeEventWorking[sp.Event]
- sp.Comeintime = time.Now().Unix()
- lock.Lock()
- CodeInfoMap[sp.Code] = sp
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("爬虫基本信息准备完成...", len(CodeInfoMap))
- }
- func GetBiddingCount() {
- defer qu.Catch()
- sess := util.MgoB.GetMgoConn()
- defer util.MgoB.DestoryMongoConn(sess)
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fieles := map[string]interface{}{
- "spidercode": 1,
- }
- count := util.MgoB.Count("bidding", query)
- logger.Debug("bidding采集数据量:", count)
- it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- if sp.Platform == "golua平台" {
- LuaBiddingDownloadAllNum++
- } else if sp.Platform == "python" {
- PythonBiddingDownloadAllNum++
- }
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum)
- }
- // GetCodeHeart 获取爬虫的心跳信息
- func GetCodeHeart() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "del": false,
- }
- fields := map[string]interface{}{
- "code": 1,
- "list": 1,
- "detail": 1,
- "findlist": 1,
- "detailexecute": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["code"])
- listHeart := qu.Int64All(tmp["list"])
- detailHeart := qu.Int64All(tmp["detail"])
- findListHeart := qu.Int64All(tmp["findlist"])
- detailExecuteHeart := qu.Int64All(tmp["detailexecute"])
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- sp.ListHeart = listHeart
- sp.DetailHeart = detailHeart
- sp.FindListHeart = findListHeart
- sp.DetailExecuteHeart = detailExecuteHeart
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("统计采集量spider_heart完成...")
- }
- // GetSpiderHighListDownloadNum 统计爬虫列表页下载量和下载失败量
- func GetSpiderHighListDownloadNum() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "href": 1,
- "state": 1,
- "times": 1,
- "publishtime": 1,
- "site": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- //1、统计spider_highlistdata
- it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- href := qu.ObjToString(tmp["href"])
- state := qu.IntAll(tmp["state"])
- site := qu.ObjToString(tmp["site"])
- ptime := qu.ObjToString(tmp["publishtime"])
- sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- //href不去重统计
- success := true
- sp.DownloadAllNum++
- if sameDay {
- sp.PTimeAllNum++
- }
- if state == 1 { //下载成功
- sp.DownloadSuccessNum++
- if sameDay {
- sp.PTimeSuccessNum++
- }
- } else if state == -1 { //下载失败
- success = false
- sp.DownloadFailedNum++
- if sameDay {
- sp.PTimeFailedNum++
- }
- } else {
- if tmp["times"] == nil { //未下载
- sp.NoDownloadNum++
- if sameDay {
- sp.PTimeNoDownloadNum++
- }
- } else { //下载失败,状态被重置
- success = false
- sp.DownloadFailedNum++
- if sameDay {
- sp.PTimeFailedNum++
- }
- }
- }
- //按当天发布时间href去重
- if sameDay && SameDayHref[href] != site {
- sp.RepeatDownloadAllNum++
- sp.RepeatPTimeAllNum++
- if state == 1 {
- sp.RepeatDownloadSuccessNum++
- sp.RepeatPTimeSuccessNum++
- } else if state == -1 { //下载失败
- sp.RepeatDownloadFailedNum++
- sp.RepeatPTimeFailedNum++
- } else {
- if tmp["times"] == nil { //未下载
- sp.RepeatNoDownloadNum++
- sp.RepeatPTimeNoDownloadNum++
- } else { //下载失败,状态被重置
- sp.RepeatPTimeFailedNum++
- sp.RepeatDownloadFailedNum++
- }
- }
- SameDayHref[href] = site
- AllHref[href] = site
- } else if AllHref[href] != site { //按全量href去重
- sp.RepeatDownloadAllNum++
- if state == 1 { //下载成功
- sp.RepeatDownloadSuccessNum++
- } else if state == -1 { //下载失败
- sp.RepeatDownloadFailedNum++
- } else {
- if tmp["times"] == nil { //未下载
- sp.RepeatNoDownloadNum++
- } else { //下载失败,状态被重置
- sp.RepeatDownloadFailedNum++
- }
- }
- AllHref[href] = site
- }
- //href站点内去重统计
- //if AllHref[href] != site {
- // sp.RepeatDownloadAllNum++
- // if sameDay {
- // sp.RepeatPTimeAllNum++
- // }
- // if state == 1 { //下载成功
- // sp.RepeatDownloadSuccessNum++
- // if sameDay {
- // sp.RepeatPTimeSuccessNum++
- // }
- // } else if state == -1 { //下载失败
- // sp.RepeatDownloadFailedNum++
- // if sameDay {
- // sp.RepeatPTimeFailedNum++
- // }
- // } else {
- // if tmp["times"] == nil { //未下载
- // sp.RepeatNoDownloadNum++
- // if sameDay {
- // sp.RepeatPTimeNoDownloadNum++
- // }
- // } else { //下载失败,状态被重置
- // sp.RepeatDownloadFailedNum++
- // if sameDay {
- // sp.RepeatPTimeFailedNum++
- // }
- // }
- // }
- // AllHref[href] = site
- //}
- if !success { //下载失败记录href
- if errorInfo := sp.Error["download"]; errorInfo == nil {
- sp.Error["download"] = &ErrorInfo{
- Num: sp.DownloadFailedNum,
- Err: []*ErrRemark{
- &ErrRemark{
- Href: href,
- Remark: "Download Failed",
- },
- },
- }
- } else {
- errorInfo.Num = sp.DownloadFailedNum
- if len(errorInfo.Err) < 3 {
- errorInfo.Err = append(errorInfo.Err, &ErrRemark{
- Href: qu.ObjToString(tmp["href"]),
- Remark: "Download Failed",
- })
- }
- }
- }
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("统计采集量spider_highlistdata完成...")
- }
- func GetSpiderListDownloadNum() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "href": 1,
- "state": 1,
- "site": 1,
- "times": 1,
- "publishtime": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- href := qu.ObjToString(tmp["href"])
- state := qu.IntAll(tmp["state"])
- site := qu.ObjToString(tmp["site"])
- ptime := qu.ObjToString(tmp["publishtime"])
- sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- //href不去重统计
- success := true
- sp.DownloadAllNum++
- if sameDay {
- sp.PTimeAllNum++
- }
- if state == 1 { //下载成功
- sp.DownloadSuccessNum++
- if sameDay {
- sp.PTimeSuccessNum++
- }
- } else if state == -1 { //下载失败
- success = false
- sp.DownloadFailedNum++
- if sameDay {
- sp.PTimeFailedNum++
- }
- } else { //未下载
- sp.NoDownloadNum++
- if sameDay {
- sp.PTimeNoDownloadNum++
- }
- }
- //按当天发布时间href去重
- if sameDay && SameDayHref[href] != site {
- sp.RepeatDownloadAllNum++
- sp.RepeatPTimeAllNum++
- if state == 1 {
- sp.RepeatDownloadSuccessNum++
- sp.RepeatPTimeSuccessNum++
- } else if state == -1 { //下载失败
- sp.RepeatDownloadFailedNum++
- sp.RepeatPTimeFailedNum++
- } else {
- if tmp["times"] == nil { //未下载
- sp.RepeatNoDownloadNum++
- sp.RepeatPTimeNoDownloadNum++
- } else { //下载失败,状态被重置
- sp.RepeatPTimeFailedNum++
- sp.RepeatDownloadFailedNum++
- }
- }
- SameDayHref[href] = site
- AllHref[href] = site
- } else if AllHref[href] != site { //按全量href去重
- sp.RepeatDownloadAllNum++
- if state == 1 { //下载成功
- sp.RepeatDownloadSuccessNum++
- } else if state == -1 { //下载失败
- sp.RepeatDownloadFailedNum++
- } else {
- if tmp["times"] == nil { //未下载
- sp.RepeatNoDownloadNum++
- } else { //下载失败,状态被重置
- sp.RepeatDownloadFailedNum++
- }
- }
- AllHref[href] = site
- }
- //href站点内去重统计
- //if AllHref[href] != site {
- // sp.RepeatDownloadAllNum++
- // if samaDay {
- // sp.RepeatPTimeAllNum++
- // }
- // if state == 1 { //下载成功
- // sp.RepeatDownloadSuccessNum++
- // if samaDay {
- // sp.RepeatPTimeSuccessNum++
- // }
- // } else if state == -1 { //下载失败
- // sp.RepeatDownloadFailedNum++
- // if samaDay {
- // sp.RepeatPTimeFailedNum++
- // }
- // } else { //未下载
- // sp.RepeatNoDownloadNum++
- // if samaDay {
- // sp.RepeatPTimeNoDownloadNum++
- // }
- // }
- // AllHref[href] = site
- //}
- if !success { //下载失败记录href
- if errorInfo := sp.Error["download"]; errorInfo == nil {
- sp.Error["download"] = &ErrorInfo{
- Num: sp.DownloadFailedNum,
- Err: []*ErrRemark{
- &ErrRemark{
- Href: href,
- Remark: "Download Failed",
- },
- },
- }
- } else {
- errorInfo.Num = sp.DownloadFailedNum
- if len(errorInfo.Err) < 3 {
- errorInfo.Err = append(errorInfo.Err, &ErrRemark{
- Href: href,
- Remark: "Download Failed",
- })
- }
- }
- }
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- AllHref = map[string]string{}
- SameDayHref = map[string]string{}
- logger.Debug("统计spider_listdata采集量完成...")
- }
- func GetSpiderDataBakDownloadNum() {
- defer qu.Catch()
- logger.Debug("统计采集量data_bak开始...")
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- "l_np_publishtime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "href": 1,
- "site": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- it := sess.DB(util.MgoS.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- href := qu.ObjToString(tmp["href"])
- site := qu.ObjToString(tmp["site"])
- lock.Lock()
- defer lock.Unlock()
- if sp := CodeInfoMap[code]; sp != nil {
- //单独统计data_bak每个爬虫当天发布的数据量
- if DataBakAllHref[href] != site {
- sp.RepeatPTimeSuccessDataBakNum++
- DataBakAllHref[href] = site
- }
- if sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 {
- return
- }
- sp.PTimeAllNum++
- sp.RepeatPTimeAllNum++
- sp.PTimeSuccessNum++
- sp.RepeatPTimeSuccessNum++
- }
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- DataBakAllHref = map[string]string{}
- //wg := &sync.WaitGroup{}
- //ch := make(chan bool, 5)
- //n := 0
- //for _, sp := range CodeInfoMap {
- // n++
- // if n%100 == 0 {
- // logger.Debug("current:", n)
- // }
- // if sp.Platform != "golua平台" || sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 { //根据发布时间统计无数据,统计data_bak
- // continue
- // }
- // //logger.Info("列表页未匹配到当天发布数据的爬虫:", sp.Code)
- // wg.Add(1)
- // ch <- true
- // go func(tmpSp *Spider) {
- // defer func() {
- // <-ch
- // wg.Done()
- // }()
- // query := map[string]interface{}{
- // "comeintime": map[string]interface{}{
- // "$gte": StartTime,
- // "$lt": EndTime,
- // },
- // "l_np_publishtime": map[string]interface{}{
- // "$gte": StartTime,
- // "$lt": EndTime,
- // },
- // "spidercode": tmpSp.Code,
- // }
- // count := util.MgoS.Count("data_bak", query)
- // tmpSp.PTimeAllNum = count
- // tmpSp.RepeatPTimeAllNum = count
- // tmpSp.PTimeSuccessNum = count
- // tmpSp.RepeatPTimeSuccessNum = count
- // }(sp)
- //}
- //wg.Wait()
- logger.Debug("统计采集量data_bak完成...")
- }
- // GetSpiderListDownloadNum 统计爬虫列表页下载量和下载失败量
- func GetSpiderListDownloadNum_Back() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- //2、统计spider_listdata
- match := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- }
- group1 := map[string]interface{}{
- "_id": map[string]interface{}{
- "spidercode": "$spidercode",
- "state": "$state",
- },
- "datacount": map[string]interface{}{
- "$sum": 1,
- },
- }
- group2 := map[string]interface{}{
- "_id": "$_id.spidercode",
- "stateinfo": map[string]interface{}{
- "$push": map[string]interface{}{
- "state": "$_id.state",
- "count": "$datacount",
- },
- },
- "count": map[string]interface{}{
- "$sum": "$datacount",
- },
- }
- project := map[string]interface{}{
- "statearr": "$stateinfo",
- "count": 1,
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group1},
- map[string]interface{}{"$group": group2},
- map[string]interface{}{"$project": project},
- }
- it := sess.DB(util.MgoS.DbName).C("spider_listdata").Pipe(p).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"]) //下载总量(不准确,含重复数据)
- successCount := 0 //下载成功量(不准确,含重复数据)
- failedCount := 0 //下载失败量(不准确,含重复数据)
- noCount := 0 //未下载量
- if stateArr, ok := tmp["statearr"].([]interface{}); ok {
- for _, stateInfo := range stateArr {
- infoMap := stateInfo.(map[string]interface{})
- state := qu.IntAll(infoMap["state"])
- if state == 1 { //state:1,下载成功量
- successCount = qu.IntAll(infoMap["count"])
- } else if state == -1 { //state:-1,下载失败量
- failedCount = qu.IntAll(infoMap["count"])
- } else if state == 0 { //state:0,未下载量
- noCount = qu.IntAll(infoMap["count"])
- }
- }
- }
- errArr := []*ErrRemark{}
- if failedCount > 0 { //有采集失败的数据,查询失败链接
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- "spidercode": code,
- "state": -1,
- }
- logger.Debug("采集失败数据query:", query)
- data, _ := util.MgoS.FindOne("spider_listdata", query)
- if data != nil {
- errArr = append(errArr, &ErrRemark{
- Href: qu.ObjToString((*data)["href"]),
- Remark: "Download Failed",
- })
- }
- }
- lock.Lock()
- if spider := CodeInfoMap[code]; spider != nil {
- spider.DownloadAllNum = count
- spider.DownloadSuccessNum = successCount
- spider.DownloadFailedNum = failedCount
- spider.NoDownloadNum = noCount
- if len(errArr) > 0 {
- spider.Error["download"] = &ErrorInfo{
- Num: failedCount,
- Err: errArr,
- }
- }
- } else {
- logger.Debug("-------------", code)
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("统计spider_listdata采集量完成...")
- }
- // GetSpiderDownloadRateDataNew 汇总列表页采集频率情况
- func GetSpiderDownloadRateDataNew() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
- query := map[string]interface{}{
- "date": date,
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "alltimes": 1,
- "zero": 1,
- "oh_percent": 1,
- }
- logger.Debug("query:", query)
- it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- alltimes := qu.IntAll(tmp["alltimes"])
- zero := qu.IntAll(tmp["zero"])
- oh_percent := qu.IntAll(tmp["oh_percent"])
- lock.Lock()
- if spider := CodeInfoMap[code]; spider != nil {
- spider.ListDownloadAllTimes = alltimes
- spider.ListNoDataTimes = zero
- //含有100%采集,及为采集频率异常(由于7410、7500、7510、7700队列模式节点,不建采集频率异常任务)
- //上轮数据下载不成功,下轮采集会被任务是新数据(应该建下载异常任务)
- if oh_percent > 0 && spider.Model != 0 {
- spider.FrequencyErrTimes++
- spider.ListOhPercentTimes = oh_percent
- }
- } else {
- logger.Debug("-------------", code)
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("列表页采集统计完成...")
- }
- //汇总lua错误信息数据
- func GetSpiderWarnErrData() {
- defer qu.Catch()
- logger.Debug("错误信息数据统计...")
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- match := map[string]interface{}{
- "level": 2,
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- group1 := map[string]interface{}{
- "_id": map[string]interface{}{
- "code": "$code",
- "info": "$info",
- },
- "datacount": map[string]interface{}{
- "$sum": 1,
- },
- }
- group2 := map[string]interface{}{
- "_id": "$_id.code",
- "infotext": map[string]interface{}{
- "$push": map[string]interface{}{
- "info": "$_id.info",
- "count": "$datacount",
- },
- },
- "count": map[string]interface{}{
- "$sum": "$datacount",
- },
- }
- project := map[string]interface{}{
- "infoarr": "$infotext",
- "count": 1,
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group1},
- map[string]interface{}{"$group": group2},
- map[string]interface{}{"$project": project},
- }
- logger.Debug("spider_warn:", match)
- //1、统计spider_warn
- it1 := sess.DB(util.MgoS.DbName).C("spider_warn").Pipe(p).Iter()
- n1 := 0
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["_id"])
- //spider.Error = map[string]*ErrorInfo{} //初始化
- if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
- for _, info := range infoArr {
- stype := ""
- query := map[string]interface{}{
- "level": 2,
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- infoMap := info.(map[string]interface{})
- infoText := qu.ObjToString(infoMap["info"]) //错误信息
- errCount := qu.IntAll(infoMap["count"]) //错误数量
- if infoText == "Publishtime Is Too Late" { //发布时间超前
- query["info"] = infoText
- stype = "publishtime"
- } else if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0
- query["info"] = infoText
- stype = "publishtime"
- } else if infoText == "Publishtime Is Too Early" { //发布时间过小
- query["info"] = infoText
- stype = "publishtime"
- } else if infoText == "Field Value Not Contains Chinese" { //title、detail不含中文
- query["info"] = infoText
- stype = "text"
- } else if infoText == "Field Value Contains Random Code" { //title、detail含乱码
- query["info"] = infoText
- stype = "text"
- } else {
- continue
- }
- query["code"] = code
- //logger.Debug(query)
- //errArr := []*ErrRemark{}
- //list, _ := util.MgoS.Find("spider_warn", query, nil, map[string]interface{}{"href": 1}, false, 0, 3)
- //for _, l := range *list {
- // errArr = append(errArr, &ErrRemark{
- // Href: qu.ObjToString(l["href"]),
- // Remark: infoText,
- // })
- //}
- one, _ := util.MgoS.FindOne("spider_warn", query) //查询该错误信息类型的一条href
- oneErrInfo := &ErrRemark{
- Href: qu.ObjToString((*one)["href"]),
- Remark: infoText,
- }
- lock.Lock()
- if spider := CodeInfoMap[code]; spider != nil {
- if errMap := spider.Error[stype]; errMap != nil {
- errMap.Num += errCount
- errMap.Err = append(errMap.Err, oneErrInfo)
- } else {
- spider.Error[stype] = &ErrorInfo{
- Num: errCount,
- Err: []*ErrRemark{
- oneErrInfo,
- },
- }
- }
- }
- lock.Unlock()
- }
- }
- }(tmp)
- if n1%10 == 0 {
- logger.Debug(n1)
- }
- tmp = map[string]interface{}{}
- }
- //2、统计regatherdata
- //match = map[string]interface{}{
- // "state": map[string]interface{}{
- // "$lte": 1,
- // },
- // "from": "lua",
- // "comeintime": map[string]interface{}{
- // "$gte": StartTime,
- // "$lt": EndTime,
- // },
- //}
- //group1 = map[string]interface{}{
- // "_id": "$spidercode",
- // "count": map[string]interface{}{
- // "$sum": 1,
- // },
- //}
- //p = []map[string]interface{}{
- // map[string]interface{}{"$match": match},
- // map[string]interface{}{"$group": group1},
- //}
- //logger.Debug("regather query:", match)
- //it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter()
- //n2 := 0
- //for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
- // wg.Add(1)
- // ch <- true
- // go func(tmp map[string]interface{}) {
- // defer func() {
- // <-ch
- // wg.Done()
- // }()
- // code := qu.ObjToString(tmp["_id"]) //爬虫代码
- // count := qu.IntAll(tmp["count"]) //异常数据量
- // query := map[string]interface{}{
- // "state": map[string]interface{}{
- // "$lte": 1,
- // },
- // "from": "lua",
- // "comeintime": map[string]interface{}{
- // "$gte": StartTime,
- // "$lt": EndTime,
- // },
- // "spidercode": code,
- // }
- // //logger.Debug("query:", query)
- //
- // errArr := []*ErrRemark{}
- // list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3)
- // for _, l := range *list {
- // errText := qu.ObjToString(l["error"])
- // errText = strings.Replace(errText, "<string>:", "", 1)
- // errArr = append(errArr, &ErrRemark{
- // Href: qu.ObjToString(l["href"]),
- // Remark: errText,
- // })
- // }
- // //one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href
- // //oneErrInfo := &ErrRemark{
- // // Href: qu.ObjToString((*one)["href"]),
- // // Remark: qu.ObjToString((*one)["error"]),
- // //}
- // if spider := CodeInfoMap[code]; spider != nil {
- // spider.Error["regather"] = &ErrorInfo{
- // Num: count,
- // Err: errArr,
- // }
- // // if spider_err := spider.Error; spider_err != nil {
- // // spider_err["regather"] = &ErrorInfo{
- // // Num: count,
- // // Err: []map[string]interface{}{
- // // oneErrInfo,
- // // },
- // // }
- // // } else {
- // // spider.Error = map[string]*ErrorInfo{
- // // "regather": &ErrorInfo{
- // // Num: count,
- // // Err: []map[string]interface{}{
- // // oneErrInfo,
- // // },
- // // },
- // // }
- // // }
- // }
- // }(tmp)
- // if n2%10 == 0 {
- // logger.Debug(n2)
- // }
- // tmp = map[string]interface{}{}
- //}
- wg.Wait()
- logger.Debug("错误信息数据统计完成...")
- }
- //汇总python错误信息数据
- func GetPythonWarnErrData() {
- GetPythonListDownloadNum() //统计列表页采集量
- GetPythonDetailDownloadNum() //统计data_bak总下载量
- GetPythonErrData() //统计异常信息
- }
- //python统计列表页采集量
- func GetPythonListDownloadNum() {
- defer qu.Catch()
- logger.Debug("python列表页数据下载量统计开始...")
- sess := util.MgoPy.GetMgoConn()
- defer util.MgoPy.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "runtime": Publishtime,
- "rel_count": map[string]interface{}{
- "$gt": 0,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "rel_count": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- count := qu.IntAll(tmp["rel_count"])
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- //href不去重统计
- sp.DownloadAllNum += count
- sp.RepeatDownloadAllNum += count
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("python数据下载量统计完成...")
- }
- //python三级页统计总下载量
- func GetPythonDetailDownloadNum() {
- defer qu.Catch()
- logger.Debug("python三级页数据下载量统计开始...")
- sess := util.MgoPy.GetMgoConn()
- defer util.MgoPy.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fields := map[string]interface{}{
- "spidercode": 1,
- "publishtime": 1,
- "sendflag": 1,
- }
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- ptime := qu.ObjToString(tmp["publishtime"])
- sendflag := qu.ObjToString(tmp["sendflag"])
- samaDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- //sp.DownloadAllNum++
- //sp.RepeatDownloadAllNum++
- if sendflag == "true" {
- sp.DownloadSuccessNum++
- sp.RepeatDownloadSuccessNum++
- }
- if samaDay {
- sp.PTimeAllNum++
- sp.RepeatPTimeAllNum++
- if sendflag == "true" {
- sp.PTimeSuccessNum++
- sp.RepeatPTimeSuccessNum++
- sp.RepeatPTimeSuccessDataBakNum++
- }
- }
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("python数据下载量统计完成...")
- }
- //python统计总下载量
- func GetPythonDownloadNum_back() {
- defer qu.Catch()
- sess := util.MgoPy.GetMgoConn()
- defer util.MgoPy.DestoryMongoConn(sess)
- match := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- group1 := map[string]interface{}{
- "_id": map[string]interface{}{
- "spidercode": "$spidercode",
- "sendflag": "$sendflag",
- },
- "datacount": map[string]interface{}{
- "$sum": 1,
- },
- }
- group2 := map[string]interface{}{
- "_id": "$_id.spidercode",
- "sendflagarr": map[string]interface{}{
- "$push": map[string]interface{}{
- "sendflag": "$_id.sendflag",
- "count": "$datacount",
- },
- },
- "count": map[string]interface{}{
- "$sum": "$datacount",
- },
- }
- project := map[string]interface{}{
- "infoarr": "$sendflagarr",
- "count": 1,
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group1},
- map[string]interface{}{"$group": group2},
- map[string]interface{}{"$project": project},
- }
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- it1 := sess.DB(util.MgoPy.DbName).C("data_bak").Pipe(p).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it1.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"]) //下载总量
- successCount := 0 //下载成功总量
- if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
- for _, info := range infoArr {
- infoMap := info.(map[string]interface{})
- if sendflag := qu.ObjToString(infoMap["sendflag"]); sendflag == "true" {
- successCount = qu.IntAll(infoMap["count"])
- }
- }
- }
- lock.Lock()
- if sp := CodeInfoMap[code]; sp != nil {
- sp.DownloadAllNum = count
- sp.DownloadSuccessNum = successCount //保存服务发送成功数
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("python数据下载量统计完成...")
- }
- //python统计异常信息
- func GetPythonErrData() {
- defer qu.Catch()
- sess := util.MgoPy.GetMgoConn()
- defer util.MgoPy.DestoryMongoConn(sess)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lt": EndTime,
- },
- }
- fieles := map[string]interface{}{
- "spidercode": 1,
- "parser_name": 1,
- "parse_url": 1,
- "failed": 1,
- "code": 1,
- }
- it := sess.DB(util.MgoPy.DbName).C("mgp_list").Find(&query).Select(&fieles).Iter()
- n := 0
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- state := qu.IntAll(tmp["code"])
- if state == -1 { //状态码为-1表示详情页未执行下载操作,不统计
- return
- }
- spidercode := qu.ObjToString(tmp["spidercode"])
- remark := qu.ObjToString(tmp["parser_name"])
- href := qu.ObjToString(tmp["parse_url"])
- failed := qu.IntAll(tmp["failed"])
- errType := StateFeedBackErr[state]
- oneErrInfo := &ErrRemark{
- Href: href,
- Remark: remark,
- }
- lock.Lock()
- if spider := CodeInfoMap[spidercode]; spider != nil {
- if failed == 0 { //未采集
- spider.NoDownloadNum++
- } else { //下载失败
- spider.DownloadFailedNum++
- if spider_err := spider.Error; spider_err != nil {
- if errInfo := spider_err[errType]; errInfo != nil {
- errInfo.Num++
- if len(errInfo.Err) < 3 { //最多存放三个错误数据连接
- errInfo.Err = append(errInfo.Err, oneErrInfo)
- }
- } else {
- spider.Error[errType] = &ErrorInfo{
- Num: 1,
- Err: []*ErrRemark{
- oneErrInfo,
- },
- }
- }
- } else {
- spider.Error = map[string]*ErrorInfo{
- errType: &ErrorInfo{
- Num: 1,
- Err: []*ErrRemark{
- oneErrInfo,
- },
- },
- }
- }
- }
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("python下载异常数据统计完成...")
- }
- //根据爬虫监控信息创建任务流程
- func CreateTaskProcess() {
- defer qu.Catch()
- logger.Debug("开始生成爬虫任务...")
- //arr := []map[string]interface{}{}
- upsertBulk := [][]map[string]interface{}{} //任务更新集
- arr := []map[string]interface{}{} //当天爬虫信息集
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- ch := make(chan bool, 10)
- logger.Debug("CodeInfoMap:", len(CodeInfoMap))
- for code, spider := range CodeInfoMap {
- wg.Add(1)
- ch <- true
- go func(code string, spider *Spider) {
- defer func() {
- <-ch
- wg.Done()
- }()
- //整理新任务的信息
- task := &Task{
- DescribeMap: map[int]string{},
- }
- //task.Platform = spider.Platform
- //task.Site = spider.Site
- //task.Code = spider.Code
- //task.Channel = spider.Channel
- //task.ModifyUser = spider.ModifyUser
- //task.ModifyId = spider.ModifyId
- //task.FrequencyErrTimes = spider.FrequencyErrTimes
- //lua、python共有异常publishtime、text
- if len(spider.Error) > 0 {
- //1、download:下载异常errtype:5;
- //2、regather:运行异常errtype:4;
- //3、publishtime:时间异常errtype:3;
- //4、text:数据异常errtype:2;
- for stype, info := range LuaErrTypeInfoMap {
- if err := spider.Error[stype]; err != nil {
- taskStateOk := false
- if stype == "download" {
- if spider.Model == 1 { //新模式(7100、7110、7200、7210、7300、7310、7400)根据异常总量和占比建任务
- moreThanLimit := false
- //1、异常条数;2、异常占比
- if spider.DownloadFailedNum > FailedNumLimit {
- moreThanLimit = true
- } else if spider.DownloadAllNum > 0 && (float64(spider.DownloadFailedNum)/float64(spider.DownloadAllNum)) > FailedPercentLimit {
- moreThanLimit = true
- }
- if !moreThanLimit { //不在异常范围,不建该类型任务
- continue
- }
- } else if spider.Model == 0 && spider.Working == 1 { //老模式,队列模式(7500,7700)有下载异常数据直接建任务
- if spider.DownloadFailedNum > 0 { //只有7500、7700出现一条下载异常时,任务状态即为待处理
- task.State = 1 //待处理
- taskStateOk = true
- } else {
- continue
- }
- } else if spider.Model == 0 && spider.Working == 0 { //老模式,高性能模式(7410)不建下载异常任务
- continue
- }
- }
- //取最大的错误异常类型
- if task.ErrType < info.ErrType {
- task.ErrType = info.ErrType
- }
- //download、regather、publishtime、text错误中有一个类型错误个数大于10,任务状态即为待处理
- if !taskStateOk && err.Num > 10 { //错误个数大于10为待处理
- task.State = 1 //待处理
- }
- //错误描述
- descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
- for _, errRemark := range err.Err {
- if stype == "regather" { //特殊处理运行异常描述
- descript += errRemark.Remark + ":" + errRemark.Href + "\n"
- } else {
- descript += errRemark.Href + "\n"
- }
- }
- task.DescribeMap[info.ErrType] = descript
- }
- }
- }
- if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理)
- //5、列表页异常 errtype:7
- if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes {
- if !spider.ListIsFilter || (spider.FindListHeart < util.GetTime(0) && spider.ListIsFilter) { //列表页不含过滤代码或者有过滤无心跳
- task.State = 1 //待处理
- task.ErrType = TASK_LISTERR
- task.DescribeMap[TASK_LISTERR] = "列表页异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListNoDataTimes) + "轮无数据\n"
- }
- // if !spider.ListIsFilter { //列表页不含过滤代码
- // task.State = 1 //待处理
- // task.ErrType = TASK_LISTERR
- // } else if len(task.DescribeMap) == 0 { //只有列表页异常且有过滤代码
- // task.State = 0 //待确认
- // task.ErrType = TASK_LISTERR
- // }
- }
- //6、采集频率异常 errtype:8
- if spider.ListOhPercentTimes > 0 { //采集频率异常
- //UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架
- //只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入)
- if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认
- task.State = 1 //待处理
- task.ErrType = TASK_RATEERR
- } else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3
- task.State = 0 //待确认
- task.ErrType = TASK_RATEERR
- }
- task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n"
- }
- } else if spider.Platform == "python" { //python异常
- for stype, info := range PythonErrTypeInfoMap {
- if err := spider.Error[stype]; err != nil {
- //取最大的错误异常类型
- if task.ErrType < info.ErrType {
- task.ErrType = info.ErrType
- }
- if info.ErrType > 3 { //python404异常、下载异常、运行异常任务状态均为待处理
- task.State = 1
- }
- //错误描述
- descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
- for _, errRemark := range err.Err {
- descript += errRemark.Remark + ":" + errRemark.Href + "\n"
- }
- //lua和python的info.ErrType:3、4可能同时存在,描述累加
- task.DescribeMap[info.ErrType] = descript + task.DescribeMap[info.ErrType]
- }
- }
- }
- //存储爬虫统计信息
- byteText, err := json.Marshal(spider)
- if err != nil {
- logger.Debug("Json Marshal Error", code)
- return
- }
- tmp := map[string]interface{}{}
- if json.Unmarshal(byteText, &tmp) == nil {
- lock.Lock()
- arr = append(arr, tmp)
- lock.Unlock()
- } else {
- logger.Debug("Json UnMarshal Error", code)
- return
- }
- //根据爬虫信息新建任务
- CreateTask(task, spider, &upsertBulk, lock) //比对历史任务,新建任务
- if spider.Platform == "golua平台" {
- //列表页总下载量
- atomic.AddInt64(&LuaListDownloadAllNum, int64(spider.RepeatDownloadAllNum))
- //列表页总下载成功量
- atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum))
- } else {
- //列表页总下载量
- atomic.AddInt64(&PythonListDownloadAllNum, int64(spider.RepeatDownloadAllNum))
- //列表页总下载成功量
- atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum))
- }
- lock.Lock()
- if len(arr) > 500 {
- util.MgoE.SaveBulk("luacodeinfo", arr...)
- arr = []map[string]interface{}{}
- }
- if len(upsertBulk) > 500 {
- util.MgoE.UpSertBulk("task", upsertBulk...)
- upsertBulk = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(code, spider)
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- util.MgoE.SaveBulk("luacodeinfo", arr...)
- arr = []map[string]interface{}{}
- }
- if len(upsertBulk) > 0 {
- util.MgoE.UpSertBulk("task", upsertBulk...)
- upsertBulk = [][]map[string]interface{}{}
- }
- lock.Unlock()
- logger.Debug("生成任务完成...")
- CodeInfoMap = map[string]*Spider{}
- }
- //新任务与历史任务整合
- func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, lock *sync.Mutex) {
- defer qu.Catch()
- if t.ErrType == 0 { //不是异常任务
- return
- }
- if sp.PendState == 1 {
- if sp.DownloadAllNum == 0 { //挂起状态爬虫,且下载量为0,不建任务
- return
- } else { //挂起状态有下载量,更新爬虫挂起状态
- sp.PendState = 0 //影响任务i_pendstate状态
- util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": map[string]interface{}{"pendstate": 0}}, false, false)
- }
- }
- diff := time.Now().Unix() - sp.AuditTime
- if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
- logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
- return
- }
- descript_new := "" //新任务的异常描述
- for _, text := range t.DescribeMap {
- descript_new += text
- }
- query := map[string]interface{}{
- "s_code": sp.Code,
- "i_state": map[string]interface{}{
- "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务
- },
- }
- fields := map[string]interface{}{
- "i_state": 1,
- "s_type": 1,
- "s_descript": 1,
- "i_times": 1,
- "s_urgency": 1,
- }
- list, _ := util.MgoE.Find("task", query, nil, fields, false, -1, -1)
- update := []map[string]interface{}{}
- if list != nil && len(*list) > 0 { //已有任务
- if len(*list) > 1 {
- logger.Error("Code:", sp.Code, "任务异常")
- util.MgoE.Save("luacreatetaskerr", map[string]interface{}{
- "code": sp.Code,
- "comeintime": time.Now().Unix(),
- "tasknum": len(*list),
- })
- return
- }
- task := (*list)[0] //唯一任务
- state_old := qu.IntAll(task["i_state"]) //历史任务状态
- times_old := qu.IntAll(task["i_times"]) //历史任务次数
- type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型
- urgency_old := qu.ObjToString(task["s_urgency"]) //历史任务紧急度
- descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述
- result := map[string]interface{}{
- "i_frequencyerrtimes": sp.FrequencyErrTimes,
- "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
- "l_updatetime": time.Now().Unix(),
- "i_times": times_old + 1,
- "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + descript_new,
- }
- if state_old == 0 || state_old == 1 { //如果历史任务状态为待确认、待处理,更新任务信息,其它状态只追加任务描述、任务次数、下载量
- //任务状态state、任务类型s_type
- if state_old == 1 || t.State == 1 { //新任务、历史任务有一个任务状态为待处理,更新后任务状态为待处理
- result["i_state"] = 1
- if t.State == 1 && state_old == 1 { //新任务和历史任务均为待处理时,取异常类型等级高者
- if t.ErrType > qu.IntAll(type_old) {
- result["s_type"] = fmt.Sprint(t.ErrType)
- }
- } else if t.State == 1 { //新任务为待处理历史任务为待确认,取新任务的类型
- result["s_type"] = fmt.Sprint(t.ErrType)
- } /*else if state_old == 1 {
- }*/
- } else if state_old == 0 && t.State == 0 && t.ErrType > qu.IntAll(type_old) { //新任务、历史任务均为待确认,取异常类型等级高者
- result["s_type"] = fmt.Sprint(t.ErrType)
- }
- if times_old >= 3 { //某爬虫第四次建任务时,任务状态变为待处理
- result["i_state"] = 1
- }
- //任务紧急度urgency
- urgency := qu.IntAll(urgency_old)
- if urgency < 4 {
- result["s_urgency"] = fmt.Sprint(urgency + 1)
- }
- //最迟完成时间
- if qu.IntAll(result["i_state"]) == 1 && state_old == 0 { //新任务综合处理后任务状态为待处理,历史任务为待确认时,更新最迟完成时间
- result["l_complete"] = util.CompleteTime(fmt.Sprint(urgency + 1))
- }
- }
- update = append(update, map[string]interface{}{"_id": task["_id"]})
- update = append(update, map[string]interface{}{"$set": result})
- lock.Lock()
- *upsertBulk = append(*upsertBulk, update)
- lock.Unlock()
- } else { //无历史任务
- //times := 0
- //if t.State == 1 { //待处理times=1
- // times = 1
- //}
- saveMap := map[string]interface{}{
- "s_modify": sp.ModifyUser,
- "s_modifyid": sp.ModifyId,
- "s_code": sp.Code,
- "s_site": sp.Site,
- "s_channel": sp.Channel,
- "i_event": sp.Event,
- "i_state": t.State,
- "s_source": "程序",
- "s_type": fmt.Sprint(t.ErrType),
- "s_descript": descript_new,
- "i_times": 1,
- "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
- "l_comeintime": time.Now().Unix(),
- //"l_updatetime": time.Now().Unix(),
- "l_complete": util.CompleteTime("1"),
- "s_urgency": "1",
- "i_frequencyerrtimes": sp.FrequencyErrTimes,
- "i_pendstate": sp.PendState, //爬虫挂起状态
- }
- update = append(update, query)
- update = append(update, saveMap)
- lock.Lock()
- *upsertBulk = append(*upsertBulk, update)
- lock.Unlock()
- }
- }
- //更新爬虫最大页、爬虫上下架
- func UpdateLuaInfo(sp *Spider) {
- defer qu.Catch()
- //1、更新爬虫信息
- set := map[string]interface{}{
- "frequencyerrtimes": sp.FrequencyErrTimes, //更新次数
- }
- if sp.FrequencyErrTimes <= 3 {
- set["param_common.5"] = sp.MaxPage + 1
- }
- logger.Debug("Code:", sp.Code, " ", sp.FrequencyErrTimes)
- b := util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": set}, false, false)
- if b && sp.FrequencyErrTimes <= 3 { //FrequencyErrTimes>3时会建采集频率异常的待处理任务,不再上下架
- //爬虫下架、上加
- qu.Debug("爬虫上下架 code:", sp.Code)
- CodeLock.Lock()
- ok, err := util.UpdateSpiderByCodeState(sp.Code, "6", sp.Event) //下架
- if ok && err == nil {
- logger.Debug(sp.Code, "下架成功")
- ok, err = util.UpdateSpiderByCodeState(sp.Code, "5", sp.Event) //上架
- if ok && err == nil {
- logger.Debug(sp.Code, "上架成功")
- }
- }
- CodeLock.Unlock()
- }
- }
- //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
- func ResetDataState() {
- defer qu.Catch()
- logger.Info("-----更新数据状态-----")
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": util.GetTime(-util.DayNum),
- "$lt": util.GetTime(0),
- },
- "state": -1,
- }
- field := map[string]interface{}{
- "_id": 1,
- }
- it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
- logger.Info("更新数据状态数量:", count)
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 500 {
- tmps := arr
- util.MgoS.UpdateBulk("spider_highlistdata", tmps...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- util.MgoS.UpdateBulk("spider_highlistdata", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- logger.Info("-----更新数据状态完毕-----")
- }
- //关闭任务
- func CloseTask() {
- qu.Catch()
- logger.Debug("---清理未更新任务---")
- decreaseDay, day := 0, 0
- var cleanDay string
- for {
- decreaseDay--
- weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
- if weekDay != "Saturday" && weekDay != "Sunday" {
- day++
- }
- if day == util.CloseNum {
- cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
- break
- }
- }
- the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
- unix_time := the_time.Unix() //凌晨时间戳
- query := map[string]interface{}{
- "i_state": 0,
- "l_complete": map[string]interface{}{
- "$lt": unix_time + 86400,
- },
- "s_type": "1",
- // "s_type": map[string]interface{}{
- // "$ne": "7",
- // },
- }
- logger.Debug("query:", query)
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "i_state": 6,
- },
- }
- util.MgoE.Update("task", query, set, false, true)
- logger.Debug("---清理未更新任务完毕---")
- }
- //保存爬虫每日监控信息
- func SaveCodeInfo() {
- defer qu.Catch()
- arr := []map[string]interface{}{}
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- ch := make(chan bool, 10)
- logger.Debug("CodeInfoMap:", len(CodeInfoMap))
- for code, spider := range CodeInfoMap {
- wg.Add(1)
- ch <- true
- go func(code string, sp Spider) {
- defer func() {
- <-ch
- wg.Done()
- }()
- byteText, err := json.Marshal(sp)
- if err != nil {
- logger.Debug("Json Marshal Error", code)
- return
- }
- tmp := map[string]interface{}{}
- if json.Unmarshal(byteText, &tmp) == nil {
- lock.Lock()
- arr = append(arr, tmp)
- lock.Unlock()
- } else {
- logger.Debug("Json UnMarshal Error", code)
- return
- }
- if sp.Platform == "golua平台" {
- //列表页总下载量
- atomic.AddInt64(&LuaListDownloadAllNum, int64(sp.RepeatDownloadAllNum))
- //列表页总下载成功量
- atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum))
- } else {
- //列表页总下载量
- atomic.AddInt64(&PythonListDownloadAllNum, int64(sp.RepeatDownloadAllNum))
- //列表页总下载成功量
- atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum))
- }
- lock.Lock()
- if len(arr) > 500 {
- util.MgoE.SaveBulk("luacodeinfo_back", arr...)
- arr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(code, *spider)
- }
- wg.Wait()
- if len(arr) > 0 {
- util.MgoE.SaveBulk("luacodeinfo_back", arr...)
- arr = []map[string]interface{}{}
- }
- logger.Debug("爬虫基本信息生成完成...")
- }
- func SaveUserCreateTaskNum() {
- defer qu.Catch()
- for user, sn := range UserTaskNum {
- save := map[string]interface{}{}
- save["user"] = user
- save["comeintime"] = time.Now().Unix()
- for s, n := range sn {
- save[s] = n
- }
- util.MgoE.Save("luausertask", save)
- }
- UserTaskNum = map[string]map[string]int{}
- }
|