task.go 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046
  1. package luatask
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. qu "qfw/util"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "util"
  11. "github.com/donnie4w/go-logger/logger"
  12. )
  13. //采集频率异常、列表页异常、404异常、下载异常、运行异常、时间异常、数据异常
  14. const TASK_RATEERR, TASK_LISTERR, TASK_404ERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2
  15. //失败占比
  16. const FailedPercentLimit = 0.20
  17. //失败条数
  18. const FailedNumLimit = 3
  19. var CodeInfoMap map[string]*Spider
  20. var AllHref map[string]string
  21. var SameDayHref map[string]string
  22. var DataBakAllHref map[string]string
  23. var StateFeedBackErr = map[int]string{
  24. 0: "timeout",
  25. 200: "analysis",
  26. 404: "download",
  27. 500: "server",
  28. }
  29. var PythonErrTypeInfoMap = map[string]ErrTypeInfo{
  30. "download": ErrTypeInfo{
  31. ErrType: TASK_404ERR,
  32. Remark: "下载异常",
  33. },
  34. "server": ErrTypeInfo{
  35. ErrType: TASK_DOWNLOADERR,
  36. Remark: "服务异常",
  37. },
  38. "analysis": ErrTypeInfo{
  39. ErrType: TASK_RUNERR,
  40. Remark: "解析异常",
  41. },
  42. "timeout": ErrTypeInfo{
  43. ErrType: TASK_TIMEERR,
  44. Remark: "超时异常",
  45. },
  46. }
  47. var LuaErrTypeInfoMap = map[string]ErrTypeInfo{
  48. "download": ErrTypeInfo{
  49. ErrType: TASK_DOWNLOADERR,
  50. Remark: "下载异常",
  51. },
  52. "regather": ErrTypeInfo{
  53. ErrType: TASK_RUNERR,
  54. Remark: "运行异常",
  55. },
  56. "publishtime": ErrTypeInfo{
  57. ErrType: TASK_TIMEERR,
  58. Remark: "时间异常",
  59. },
  60. "text": ErrTypeInfo{
  61. ErrType: TASK_DATAERR,
  62. Remark: "数据异常",
  63. },
  64. }
  65. //spider
  66. type Spider struct {
  67. Site string `json:"site"` //站点
  68. Platform string `json:"platform"` //平台
  69. Code string `json:"spidercode"` //爬虫
  70. Channel string `json:"channel"` //栏目
  71. AuditTime int64 `json:"audittime"` //最新审核时间
  72. ModifyUser string `json:"modifyuser"` //维护人
  73. ModifyId string `json:"modifyid"` //维护人id
  74. Event int `json:"event"` //节点
  75. State int `json:"state"` //状态
  76. PendState int `json:"pendstate"` //挂起状态
  77. Weight int `json:"weight"` //爬虫权重
  78. FrequencyErrTimes int `json:"frequencyerrtimes"` //爬虫采集频率异常次数
  79. MaxPage int `json:"maxpage"` //采集最大页
  80. Model int `json:"model"` //采集模式(新\老) 0:老模式;1:新模式
  81. Working int `json:"working"` //采集模式(高低\性能)0:高性能模式;1:队列模式
  82. ListIsFilter bool `json:"listisfilter"` //lua列表页采集是否包含过滤
  83. //基于comeintime不去重的下载量
  84. DownloadAllNum int `json:"downloadallnum"` //总下载量
  85. DownloadSuccessNum int `json:"downloadsuccessnum"` //下载成功量
  86. DownloadFailedNum int `json:"downloadfailednum"` //下载失败量
  87. NoDownloadNum int `json:"nodownloadnum"` //未下载量
  88. //基于comeintime不去重的当天下载量
  89. PTimeAllNum int `json:"ptimeallnum"` //当天总下载量
  90. PTimeSuccessNum int `json:"ptimesuccessnum"` //当天下载成功量
  91. PTimeFailedNum int `json:"ptimefailednum"` //当天下载失败量
  92. PTimeNoDownloadNum int `json:"ptimenodownloadnum"` //当天未下载量
  93. //基于comeintime去重的下载量
  94. RepeatDownloadAllNum int `json:"repeatdownloadallnum"` //总下载量
  95. RepeatDownloadSuccessNum int `json:"repeatdownloadsuccessnum"` //下载成功量
  96. RepeatDownloadFailedNum int `json:"repeatdownloadfailednum"` //下载失败量
  97. RepeatNoDownloadNum int `json:"repeatnodownloadnum"` //未下载量
  98. //基于comeintime去重的当天下载量
  99. RepeatPTimeAllNum int `json:"repeatptimeallnum"` //当天总下载量
  100. RepeatPTimeSuccessNum int `json:"repeatptimesuccessnum"` //当天下载成功量
  101. RepeatPTimeSuccessDataBakNum int `json:"repeatptimesuccessdbnum"` //data_bak当天发布数据量
  102. RepeatPTimeFailedNum int `json:"repeatptimefailednum"` //当天下载失败量
  103. RepeatPTimeNoDownloadNum int `json:"repeatptimenodownloadnum"` //当天未下载量
  104. ListDownloadAllTimes int `json:"listdownloadalltimes"` //一天内列表页总下载次数
  105. ListOhPercentTimes int `json:"listohpercenttimes"` //列表页采集百分百次数
  106. ListNoDataTimes int `json:"listnodatatimes"` //一天内列表页下载无数据次数
  107. Comeintime int64 `json:"comeintime"` //入库时间
  108. ListHeart int64 `json:"listheart"` //列表页执行心跳
  109. DetailHeart int64 `json:"detailheart"` //详情页执行心跳
  110. FindListHeart int64 `json:"findlistheart"` //列表页获得数据量心跳
  111. DetailExecuteHeart int64 `json:"detailexecuteheart"` //详情页下载成功心跳
  112. Error map[string]*ErrorInfo `json:"error"`
  113. //OhPercentTimes int `json:"ohpercentimes"` //采集量占总下载量100%的次数
  114. //NtPercentTime int `json:"ntpercentimes"` //采集量占总下载量90%-100%的次数
  115. //EtPercentTime int `json:"etpercentimes"` //采集量占总下载量80%-90%的次数
  116. }
  117. //spider:错误异常
  118. type ErrorInfo struct {
  119. Num int //错误条数
  120. Err []*ErrRemark //错误详情
  121. }
  122. //spider
  123. type ErrRemark struct {
  124. Href string //链接
  125. Remark string //异常说明
  126. }
  127. //task
  128. type Task struct {
  129. Platform string //平台
  130. Code string //爬虫代码
  131. Site string //站点
  132. Channel string //栏目
  133. ModifyUser string //维护人员
  134. ModifyId string //维护人员id
  135. ErrType int //异常类型:8:采集频率异常;7:列表页异常;5:下载异常;4:运行异常;3:发布时间异常;2:数据异常;1:数据量异常
  136. Description string //描述
  137. State int //状态
  138. Event int //节点
  139. Num int //下载量
  140. FrequencyErrTimes int //爬虫采集频率异常次数
  141. DescribeMap map[int]string
  142. //ErrInfo map[string]map[string]interface{} //异常集合
  143. }
  144. //task:任务异常类型信息
  145. type ErrTypeInfo struct {
  146. ErrType int //任务异常类型
  147. Remark string //异常类型说明
  148. }
  149. var (
  150. StartTime int64 //上一个工作日的起始时间
  151. EndTime int64 //上一个工作日的结束时间
  152. Publishtime string //发布时间
  153. TaskMap map[string]*Task //任务集合
  154. UserTaskNum map[string]map[string]int //记录每人每天新建任务量
  155. CodeLock = &sync.Mutex{}
  156. //
  157. )
  158. func StartTask() {
  159. InitInfo() //初始化时间
  160. logger.Debug(StartTime, EndTime, Publishtime)
  161. GetCodeBaseInfo() //初始化爬虫基本信息
  162. GetBiddingCount() //统计bidding表爬虫采集量
  163. GetCodeHeart() //初始化爬虫心跳信息
  164. GetSpiderHighListDownloadNum() //统计spider_highlistdata爬虫列表页下载量、下载失败量、未下载量
  165. GetSpiderListDownloadNum() //统计spider_listdata爬虫列表页下载量、下载失败量、未下载量
  166. GetSpiderDataBakDownloadNum() //统计data_bak爬虫下载量
  167. GetSpiderDownloadRateDataNew() //下载率
  168. GetSpiderWarnErrData() //异常信息
  169. GetPythonWarnErrData() //python相关
  170. //SaveCodeInfo()
  171. CreateTaskProcess()
  172. // GetDownloadNumber() //统计下载量
  173. //CloseTask() //关闭任务
  174. SendInfoToWxWork_SiteDataCount()
  175. SendLuaPythonAllNum()
  176. }
  177. //初始化
  178. func InitInfo() {
  179. defer qu.Catch()
  180. CodeInfoMap = map[string]*Spider{} //初始化
  181. AllHref = map[string]string{}
  182. SameDayHref = map[string]string{}
  183. DataBakAllHref = map[string]string{}
  184. UserTaskNum = map[string]map[string]int{}
  185. StartTime, EndTime = util.GetWorkDayTimeUnix()
  186. Publishtime = qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
  187. //StartTime = util.GetTime(-1)
  188. //EndTime = util.GetTime(0)
  189. }
  190. // GetCodeBaseInfo 准备爬虫基本信息
  191. func GetCodeBaseInfo() {
  192. defer qu.Catch()
  193. sess := util.MgoE.GetMgoConn()
  194. defer util.MgoE.DestoryMongoConn(sess)
  195. lock := &sync.Mutex{}
  196. wg := &sync.WaitGroup{}
  197. ch := make(chan bool, 5)
  198. query := map[string]interface{}{
  199. "$or": []interface{}{
  200. //lua、python上线爬虫
  201. map[string]interface{}{
  202. "state": map[string]interface{}{
  203. "$in": []int{5, 11}, //上架、上线爬虫
  204. },
  205. },
  206. //lua正在被维护的爬虫和上架爬虫
  207. map[string]interface{}{
  208. "platform": "golua平台",
  209. "state": map[string]interface{}{
  210. "$in": []int{0, 1, 2}, //待完成、待审核、未通过
  211. },
  212. "event": map[string]interface{}{
  213. "$ne": 7000,
  214. },
  215. },
  216. //python正在被维护的爬虫和上线爬虫
  217. map[string]interface{}{
  218. "platform": "python",
  219. "state": map[string]interface{}{
  220. "$in": []int{1, 2, 6}, //待审核、未通过、已下架
  221. },
  222. },
  223. },
  224. }
  225. fieles := map[string]interface{}{
  226. "event": 1,
  227. "param_common": 1,
  228. "platform": 1,
  229. "modifyuser": 1,
  230. "modifyuserid": 1,
  231. "state": 1,
  232. "pendstate": 1,
  233. "weight": 1,
  234. "l_uploadtime": 1,
  235. "listisfilter": 1,
  236. "frequencyerrtimes": 1,
  237. "code": 1,
  238. }
  239. count := util.MgoE.Count("luaconfig", query)
  240. logger.Debug("共加载线上爬虫个数:", count)
  241. it := sess.DB(util.MgoE.DbName).C("luaconfig").Find(&query).Select(&fieles).Iter()
  242. n := 0
  243. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  244. wg.Add(1)
  245. ch <- true
  246. go func(tmp map[string]interface{}) {
  247. defer func() {
  248. <-ch
  249. wg.Done()
  250. }()
  251. sp := &Spider{
  252. Error: map[string]*ErrorInfo{},
  253. }
  254. if param_common, ok := tmp["param_common"].([]interface{}); ok && len(param_common) >= 6 {
  255. //sp.Code = qu.ObjToString(param_common[0])
  256. sp.Site = qu.ObjToString(param_common[1])
  257. sp.Channel = qu.ObjToString(param_common[2])
  258. sp.MaxPage = qu.IntAll(param_common[5])
  259. } else {
  260. logger.Debug("加载爬虫出错:", tmp["_id"])
  261. }
  262. sp.Code = qu.ObjToString(tmp["code"])
  263. sp.ModifyUser = qu.ObjToString(tmp["modifyuser"])
  264. sp.ModifyId = qu.ObjToString(tmp["modifyuserid"])
  265. sp.AuditTime = qu.Int64All(tmp["l_uploadtime"])
  266. sp.Platform = qu.ObjToString(tmp["platform"])
  267. sp.Event = qu.IntAll(tmp["event"])
  268. sp.State = qu.IntAll(tmp["state"])
  269. sp.PendState = qu.IntAll(tmp["pendstate"])
  270. sp.Weight = qu.IntAll(tmp["weight"])
  271. if sp.Platform == "python" {
  272. sp.ListIsFilter = false
  273. } else {
  274. sp.ListIsFilter = tmp["listisfilter"].(bool)
  275. }
  276. sp.FrequencyErrTimes = qu.IntAll(tmp["frequencyerrtimes"])
  277. sp.Model = util.CodeEventModel[sp.Event]
  278. sp.Working = util.CodeEventWorking[sp.Event]
  279. sp.Comeintime = time.Now().Unix()
  280. lock.Lock()
  281. CodeInfoMap[sp.Code] = sp
  282. lock.Unlock()
  283. }(tmp)
  284. if n%1000 == 0 {
  285. logger.Debug(n)
  286. }
  287. tmp = map[string]interface{}{}
  288. }
  289. wg.Wait()
  290. logger.Debug("爬虫基本信息准备完成...", len(CodeInfoMap))
  291. }
  292. func GetBiddingCount() {
  293. defer qu.Catch()
  294. sess := util.MgoB.GetMgoConn()
  295. defer util.MgoB.DestoryMongoConn(sess)
  296. lock := &sync.Mutex{}
  297. wg := &sync.WaitGroup{}
  298. ch := make(chan bool, 5)
  299. query := map[string]interface{}{
  300. "comeintime": map[string]interface{}{
  301. "$gte": StartTime,
  302. "$lt": EndTime,
  303. },
  304. }
  305. fieles := map[string]interface{}{
  306. "spidercode": 1,
  307. }
  308. count := util.MgoB.Count("bidding", query)
  309. logger.Debug("bidding采集数据量:", count)
  310. it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter()
  311. n := 0
  312. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  313. wg.Add(1)
  314. ch <- true
  315. go func(tmp map[string]interface{}) {
  316. defer func() {
  317. <-ch
  318. wg.Done()
  319. }()
  320. code := qu.ObjToString(tmp["spidercode"])
  321. lock.Lock()
  322. if sp := CodeInfoMap[code]; sp != nil {
  323. if sp.Platform == "golua平台" {
  324. LuaBiddingDownloadAllNum++
  325. } else if sp.Platform == "python" {
  326. PythonBiddingDownloadAllNum++
  327. }
  328. }
  329. lock.Unlock()
  330. }(tmp)
  331. if n%1000 == 0 {
  332. logger.Debug(n)
  333. }
  334. tmp = map[string]interface{}{}
  335. }
  336. wg.Wait()
  337. logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum)
  338. }
  339. // GetCodeHeart 获取爬虫的心跳信息
  340. func GetCodeHeart() {
  341. defer qu.Catch()
  342. sess := util.MgoS.GetMgoConn()
  343. defer util.MgoS.DestoryMongoConn(sess)
  344. query := map[string]interface{}{
  345. "del": false,
  346. }
  347. fields := map[string]interface{}{
  348. "code": 1,
  349. "list": 1,
  350. "detail": 1,
  351. "findlist": 1,
  352. "detailexecute": 1,
  353. }
  354. lock := &sync.Mutex{}
  355. wg := &sync.WaitGroup{}
  356. ch := make(chan bool, 5)
  357. it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter()
  358. n := 0
  359. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  360. wg.Add(1)
  361. ch <- true
  362. go func(tmp map[string]interface{}) {
  363. defer func() {
  364. <-ch
  365. wg.Done()
  366. }()
  367. code := qu.ObjToString(tmp["code"])
  368. listHeart := qu.Int64All(tmp["list"])
  369. detailHeart := qu.Int64All(tmp["detail"])
  370. findListHeart := qu.Int64All(tmp["findlist"])
  371. detailExecuteHeart := qu.Int64All(tmp["detailexecute"])
  372. lock.Lock()
  373. if sp := CodeInfoMap[code]; sp != nil {
  374. sp.ListHeart = listHeart
  375. sp.DetailHeart = detailHeart
  376. sp.FindListHeart = findListHeart
  377. sp.DetailExecuteHeart = detailExecuteHeart
  378. }
  379. lock.Unlock()
  380. }(tmp)
  381. if n%100 == 0 {
  382. logger.Debug(n)
  383. }
  384. tmp = map[string]interface{}{}
  385. }
  386. wg.Wait()
  387. logger.Debug("统计采集量spider_heart完成...")
  388. }
  389. // GetSpiderHighListDownloadNum 统计爬虫列表页下载量和下载失败量
  390. func GetSpiderHighListDownloadNum() {
  391. defer qu.Catch()
  392. sess := util.MgoS.GetMgoConn()
  393. defer util.MgoS.DestoryMongoConn(sess)
  394. query := map[string]interface{}{
  395. "comeintime": map[string]interface{}{
  396. "$gte": StartTime,
  397. "$lt": EndTime,
  398. },
  399. }
  400. fields := map[string]interface{}{
  401. "spidercode": 1,
  402. "href": 1,
  403. "state": 1,
  404. "times": 1,
  405. "publishtime": 1,
  406. "site": 1,
  407. }
  408. lock := &sync.Mutex{}
  409. wg := &sync.WaitGroup{}
  410. ch := make(chan bool, 5)
  411. //1、统计spider_highlistdata
  412. it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
  413. n := 0
  414. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  415. wg.Add(1)
  416. ch <- true
  417. go func(tmp map[string]interface{}) {
  418. defer func() {
  419. <-ch
  420. wg.Done()
  421. }()
  422. code := qu.ObjToString(tmp["spidercode"])
  423. href := qu.ObjToString(tmp["href"])
  424. state := qu.IntAll(tmp["state"])
  425. site := qu.ObjToString(tmp["site"])
  426. ptime := qu.ObjToString(tmp["publishtime"])
  427. sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
  428. lock.Lock()
  429. if sp := CodeInfoMap[code]; sp != nil {
  430. //href不去重统计
  431. success := true
  432. sp.DownloadAllNum++
  433. if sameDay {
  434. sp.PTimeAllNum++
  435. }
  436. if state == 1 { //下载成功
  437. sp.DownloadSuccessNum++
  438. if sameDay {
  439. sp.PTimeSuccessNum++
  440. }
  441. } else if state == -1 { //下载失败
  442. success = false
  443. sp.DownloadFailedNum++
  444. if sameDay {
  445. sp.PTimeFailedNum++
  446. }
  447. } else {
  448. if tmp["times"] == nil { //未下载
  449. sp.NoDownloadNum++
  450. if sameDay {
  451. sp.PTimeNoDownloadNum++
  452. }
  453. } else { //下载失败,状态被重置
  454. success = false
  455. sp.DownloadFailedNum++
  456. if sameDay {
  457. sp.PTimeFailedNum++
  458. }
  459. }
  460. }
  461. //按当天发布时间href去重
  462. if sameDay && SameDayHref[href] != site {
  463. sp.RepeatDownloadAllNum++
  464. sp.RepeatPTimeAllNum++
  465. if state == 1 {
  466. sp.RepeatDownloadSuccessNum++
  467. sp.RepeatPTimeSuccessNum++
  468. } else if state == -1 { //下载失败
  469. sp.RepeatDownloadFailedNum++
  470. sp.RepeatPTimeFailedNum++
  471. } else {
  472. if tmp["times"] == nil { //未下载
  473. sp.RepeatNoDownloadNum++
  474. sp.RepeatPTimeNoDownloadNum++
  475. } else { //下载失败,状态被重置
  476. sp.RepeatPTimeFailedNum++
  477. sp.RepeatDownloadFailedNum++
  478. }
  479. }
  480. SameDayHref[href] = site
  481. AllHref[href] = site
  482. } else if AllHref[href] != site { //按全量href去重
  483. sp.RepeatDownloadAllNum++
  484. if state == 1 { //下载成功
  485. sp.RepeatDownloadSuccessNum++
  486. } else if state == -1 { //下载失败
  487. sp.RepeatDownloadFailedNum++
  488. } else {
  489. if tmp["times"] == nil { //未下载
  490. sp.RepeatNoDownloadNum++
  491. } else { //下载失败,状态被重置
  492. sp.RepeatDownloadFailedNum++
  493. }
  494. }
  495. AllHref[href] = site
  496. }
  497. //href站点内去重统计
  498. //if AllHref[href] != site {
  499. // sp.RepeatDownloadAllNum++
  500. // if sameDay {
  501. // sp.RepeatPTimeAllNum++
  502. // }
  503. // if state == 1 { //下载成功
  504. // sp.RepeatDownloadSuccessNum++
  505. // if sameDay {
  506. // sp.RepeatPTimeSuccessNum++
  507. // }
  508. // } else if state == -1 { //下载失败
  509. // sp.RepeatDownloadFailedNum++
  510. // if sameDay {
  511. // sp.RepeatPTimeFailedNum++
  512. // }
  513. // } else {
  514. // if tmp["times"] == nil { //未下载
  515. // sp.RepeatNoDownloadNum++
  516. // if sameDay {
  517. // sp.RepeatPTimeNoDownloadNum++
  518. // }
  519. // } else { //下载失败,状态被重置
  520. // sp.RepeatDownloadFailedNum++
  521. // if sameDay {
  522. // sp.RepeatPTimeFailedNum++
  523. // }
  524. // }
  525. // }
  526. // AllHref[href] = site
  527. //}
  528. if !success { //下载失败记录href
  529. if errorInfo := sp.Error["download"]; errorInfo == nil {
  530. sp.Error["download"] = &ErrorInfo{
  531. Num: sp.DownloadFailedNum,
  532. Err: []*ErrRemark{
  533. &ErrRemark{
  534. Href: href,
  535. Remark: "Download Failed",
  536. },
  537. },
  538. }
  539. } else {
  540. errorInfo.Num = sp.DownloadFailedNum
  541. if len(errorInfo.Err) < 3 {
  542. errorInfo.Err = append(errorInfo.Err, &ErrRemark{
  543. Href: qu.ObjToString(tmp["href"]),
  544. Remark: "Download Failed",
  545. })
  546. }
  547. }
  548. }
  549. }
  550. lock.Unlock()
  551. }(tmp)
  552. if n%1000 == 0 {
  553. logger.Debug(n)
  554. }
  555. tmp = map[string]interface{}{}
  556. }
  557. wg.Wait()
  558. logger.Debug("统计采集量spider_highlistdata完成...")
  559. }
  560. func GetSpiderListDownloadNum() {
  561. defer qu.Catch()
  562. sess := util.MgoS.GetMgoConn()
  563. defer util.MgoS.DestoryMongoConn(sess)
  564. query := map[string]interface{}{
  565. "comeintime": map[string]interface{}{
  566. "$gte": StartTime,
  567. "$lt": EndTime,
  568. },
  569. }
  570. fields := map[string]interface{}{
  571. "spidercode": 1,
  572. "href": 1,
  573. "state": 1,
  574. "site": 1,
  575. "times": 1,
  576. "publishtime": 1,
  577. }
  578. lock := &sync.Mutex{}
  579. wg := &sync.WaitGroup{}
  580. ch := make(chan bool, 5)
  581. it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Iter()
  582. n := 0
  583. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  584. wg.Add(1)
  585. ch <- true
  586. go func(tmp map[string]interface{}) {
  587. defer func() {
  588. <-ch
  589. wg.Done()
  590. }()
  591. code := qu.ObjToString(tmp["spidercode"])
  592. href := qu.ObjToString(tmp["href"])
  593. state := qu.IntAll(tmp["state"])
  594. site := qu.ObjToString(tmp["site"])
  595. ptime := qu.ObjToString(tmp["publishtime"])
  596. sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
  597. lock.Lock()
  598. if sp := CodeInfoMap[code]; sp != nil {
  599. //href不去重统计
  600. success := true
  601. sp.DownloadAllNum++
  602. if sameDay {
  603. sp.PTimeAllNum++
  604. }
  605. if state == 1 { //下载成功
  606. sp.DownloadSuccessNum++
  607. if sameDay {
  608. sp.PTimeSuccessNum++
  609. }
  610. } else if state == -1 { //下载失败
  611. success = false
  612. sp.DownloadFailedNum++
  613. if sameDay {
  614. sp.PTimeFailedNum++
  615. }
  616. } else { //未下载
  617. sp.NoDownloadNum++
  618. if sameDay {
  619. sp.PTimeNoDownloadNum++
  620. }
  621. }
  622. //按当天发布时间href去重
  623. if sameDay && SameDayHref[href] != site {
  624. sp.RepeatDownloadAllNum++
  625. sp.RepeatPTimeAllNum++
  626. if state == 1 {
  627. sp.RepeatDownloadSuccessNum++
  628. sp.RepeatPTimeSuccessNum++
  629. } else if state == -1 { //下载失败
  630. sp.RepeatDownloadFailedNum++
  631. sp.RepeatPTimeFailedNum++
  632. } else {
  633. if tmp["times"] == nil { //未下载
  634. sp.RepeatNoDownloadNum++
  635. sp.RepeatPTimeNoDownloadNum++
  636. } else { //下载失败,状态被重置
  637. sp.RepeatPTimeFailedNum++
  638. sp.RepeatDownloadFailedNum++
  639. }
  640. }
  641. SameDayHref[href] = site
  642. AllHref[href] = site
  643. } else if AllHref[href] != site { //按全量href去重
  644. sp.RepeatDownloadAllNum++
  645. if state == 1 { //下载成功
  646. sp.RepeatDownloadSuccessNum++
  647. } else if state == -1 { //下载失败
  648. sp.RepeatDownloadFailedNum++
  649. } else {
  650. if tmp["times"] == nil { //未下载
  651. sp.RepeatNoDownloadNum++
  652. } else { //下载失败,状态被重置
  653. sp.RepeatDownloadFailedNum++
  654. }
  655. }
  656. AllHref[href] = site
  657. }
  658. //href站点内去重统计
  659. //if AllHref[href] != site {
  660. // sp.RepeatDownloadAllNum++
  661. // if samaDay {
  662. // sp.RepeatPTimeAllNum++
  663. // }
  664. // if state == 1 { //下载成功
  665. // sp.RepeatDownloadSuccessNum++
  666. // if samaDay {
  667. // sp.RepeatPTimeSuccessNum++
  668. // }
  669. // } else if state == -1 { //下载失败
  670. // sp.RepeatDownloadFailedNum++
  671. // if samaDay {
  672. // sp.RepeatPTimeFailedNum++
  673. // }
  674. // } else { //未下载
  675. // sp.RepeatNoDownloadNum++
  676. // if samaDay {
  677. // sp.RepeatPTimeNoDownloadNum++
  678. // }
  679. // }
  680. // AllHref[href] = site
  681. //}
  682. if !success { //下载失败记录href
  683. if errorInfo := sp.Error["download"]; errorInfo == nil {
  684. sp.Error["download"] = &ErrorInfo{
  685. Num: sp.DownloadFailedNum,
  686. Err: []*ErrRemark{
  687. &ErrRemark{
  688. Href: href,
  689. Remark: "Download Failed",
  690. },
  691. },
  692. }
  693. } else {
  694. errorInfo.Num = sp.DownloadFailedNum
  695. if len(errorInfo.Err) < 3 {
  696. errorInfo.Err = append(errorInfo.Err, &ErrRemark{
  697. Href: href,
  698. Remark: "Download Failed",
  699. })
  700. }
  701. }
  702. }
  703. }
  704. lock.Unlock()
  705. }(tmp)
  706. if n%1000 == 0 {
  707. logger.Debug(n)
  708. }
  709. tmp = map[string]interface{}{}
  710. }
  711. wg.Wait()
  712. AllHref = map[string]string{}
  713. SameDayHref = map[string]string{}
  714. logger.Debug("统计spider_listdata采集量完成...")
  715. }
  716. func GetSpiderDataBakDownloadNum() {
  717. defer qu.Catch()
  718. logger.Debug("统计采集量data_bak开始...")
  719. sess := util.MgoS.GetMgoConn()
  720. defer util.MgoS.DestoryMongoConn(sess)
  721. query := map[string]interface{}{
  722. "comeintime": map[string]interface{}{
  723. "$gte": StartTime,
  724. "$lt": EndTime,
  725. },
  726. "l_np_publishtime": map[string]interface{}{
  727. "$gte": StartTime,
  728. "$lt": EndTime,
  729. },
  730. }
  731. fields := map[string]interface{}{
  732. "spidercode": 1,
  733. "href": 1,
  734. "site": 1,
  735. }
  736. lock := &sync.Mutex{}
  737. wg := &sync.WaitGroup{}
  738. ch := make(chan bool, 5)
  739. it := sess.DB(util.MgoS.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
  740. n := 0
  741. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  742. wg.Add(1)
  743. ch <- true
  744. go func(tmp map[string]interface{}) {
  745. defer func() {
  746. <-ch
  747. wg.Done()
  748. }()
  749. code := qu.ObjToString(tmp["spidercode"])
  750. href := qu.ObjToString(tmp["href"])
  751. site := qu.ObjToString(tmp["site"])
  752. lock.Lock()
  753. defer lock.Unlock()
  754. if sp := CodeInfoMap[code]; sp != nil {
  755. //单独统计data_bak每个爬虫当天发布的数据量
  756. if DataBakAllHref[href] != site {
  757. sp.RepeatPTimeSuccessDataBakNum++
  758. DataBakAllHref[href] = site
  759. }
  760. if sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 {
  761. return
  762. }
  763. sp.PTimeAllNum++
  764. sp.RepeatPTimeAllNum++
  765. sp.PTimeSuccessNum++
  766. sp.RepeatPTimeSuccessNum++
  767. }
  768. }(tmp)
  769. if n%1000 == 0 {
  770. logger.Debug(n)
  771. }
  772. tmp = map[string]interface{}{}
  773. }
  774. wg.Wait()
  775. DataBakAllHref = map[string]string{}
  776. //wg := &sync.WaitGroup{}
  777. //ch := make(chan bool, 5)
  778. //n := 0
  779. //for _, sp := range CodeInfoMap {
  780. // n++
  781. // if n%100 == 0 {
  782. // logger.Debug("current:", n)
  783. // }
  784. // if sp.Platform != "golua平台" || sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 { //根据发布时间统计无数据,统计data_bak
  785. // continue
  786. // }
  787. // //logger.Info("列表页未匹配到当天发布数据的爬虫:", sp.Code)
  788. // wg.Add(1)
  789. // ch <- true
  790. // go func(tmpSp *Spider) {
  791. // defer func() {
  792. // <-ch
  793. // wg.Done()
  794. // }()
  795. // query := map[string]interface{}{
  796. // "comeintime": map[string]interface{}{
  797. // "$gte": StartTime,
  798. // "$lt": EndTime,
  799. // },
  800. // "l_np_publishtime": map[string]interface{}{
  801. // "$gte": StartTime,
  802. // "$lt": EndTime,
  803. // },
  804. // "spidercode": tmpSp.Code,
  805. // }
  806. // count := util.MgoS.Count("data_bak", query)
  807. // tmpSp.PTimeAllNum = count
  808. // tmpSp.RepeatPTimeAllNum = count
  809. // tmpSp.PTimeSuccessNum = count
  810. // tmpSp.RepeatPTimeSuccessNum = count
  811. // }(sp)
  812. //}
  813. //wg.Wait()
  814. logger.Debug("统计采集量data_bak完成...")
  815. }
  816. // GetSpiderListDownloadNum 统计爬虫列表页下载量和下载失败量
  817. func GetSpiderListDownloadNum_Back() {
  818. defer qu.Catch()
  819. sess := util.MgoS.GetMgoConn()
  820. defer util.MgoS.DestoryMongoConn(sess)
  821. lock := &sync.Mutex{}
  822. wg := &sync.WaitGroup{}
  823. ch := make(chan bool, 5)
  824. //2、统计spider_listdata
  825. match := map[string]interface{}{
  826. "comeintime": map[string]interface{}{
  827. "$gte": StartTime,
  828. "$lt": EndTime,
  829. },
  830. "event": map[string]interface{}{
  831. "$ne": 7000,
  832. },
  833. }
  834. group1 := map[string]interface{}{
  835. "_id": map[string]interface{}{
  836. "spidercode": "$spidercode",
  837. "state": "$state",
  838. },
  839. "datacount": map[string]interface{}{
  840. "$sum": 1,
  841. },
  842. }
  843. group2 := map[string]interface{}{
  844. "_id": "$_id.spidercode",
  845. "stateinfo": map[string]interface{}{
  846. "$push": map[string]interface{}{
  847. "state": "$_id.state",
  848. "count": "$datacount",
  849. },
  850. },
  851. "count": map[string]interface{}{
  852. "$sum": "$datacount",
  853. },
  854. }
  855. project := map[string]interface{}{
  856. "statearr": "$stateinfo",
  857. "count": 1,
  858. }
  859. p := []map[string]interface{}{
  860. map[string]interface{}{"$match": match},
  861. map[string]interface{}{"$group": group1},
  862. map[string]interface{}{"$group": group2},
  863. map[string]interface{}{"$project": project},
  864. }
  865. it := sess.DB(util.MgoS.DbName).C("spider_listdata").Pipe(p).Iter()
  866. n := 0
  867. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  868. wg.Add(1)
  869. ch <- true
  870. go func(tmp map[string]interface{}) {
  871. defer func() {
  872. <-ch
  873. wg.Done()
  874. }()
  875. code := qu.ObjToString(tmp["_id"])
  876. count := qu.IntAll(tmp["count"]) //下载总量(不准确,含重复数据)
  877. successCount := 0 //下载成功量(不准确,含重复数据)
  878. failedCount := 0 //下载失败量(不准确,含重复数据)
  879. noCount := 0 //未下载量
  880. if stateArr, ok := tmp["statearr"].([]interface{}); ok {
  881. for _, stateInfo := range stateArr {
  882. infoMap := stateInfo.(map[string]interface{})
  883. state := qu.IntAll(infoMap["state"])
  884. if state == 1 { //state:1,下载成功量
  885. successCount = qu.IntAll(infoMap["count"])
  886. } else if state == -1 { //state:-1,下载失败量
  887. failedCount = qu.IntAll(infoMap["count"])
  888. } else if state == 0 { //state:0,未下载量
  889. noCount = qu.IntAll(infoMap["count"])
  890. }
  891. }
  892. }
  893. errArr := []*ErrRemark{}
  894. if failedCount > 0 { //有采集失败的数据,查询失败链接
  895. query := map[string]interface{}{
  896. "comeintime": map[string]interface{}{
  897. "$gte": StartTime,
  898. "$lt": EndTime,
  899. },
  900. "event": map[string]interface{}{
  901. "$ne": 7000,
  902. },
  903. "spidercode": code,
  904. "state": -1,
  905. }
  906. logger.Debug("采集失败数据query:", query)
  907. data, _ := util.MgoS.FindOne("spider_listdata", query)
  908. if data != nil {
  909. errArr = append(errArr, &ErrRemark{
  910. Href: qu.ObjToString((*data)["href"]),
  911. Remark: "Download Failed",
  912. })
  913. }
  914. }
  915. lock.Lock()
  916. if spider := CodeInfoMap[code]; spider != nil {
  917. spider.DownloadAllNum = count
  918. spider.DownloadSuccessNum = successCount
  919. spider.DownloadFailedNum = failedCount
  920. spider.NoDownloadNum = noCount
  921. if len(errArr) > 0 {
  922. spider.Error["download"] = &ErrorInfo{
  923. Num: failedCount,
  924. Err: errArr,
  925. }
  926. }
  927. } else {
  928. logger.Debug("-------------", code)
  929. }
  930. lock.Unlock()
  931. }(tmp)
  932. if n%100 == 0 {
  933. logger.Debug(n)
  934. }
  935. tmp = map[string]interface{}{}
  936. }
  937. wg.Wait()
  938. logger.Debug("统计spider_listdata采集量完成...")
  939. }
  940. // GetSpiderDownloadRateDataNew 汇总列表页采集频率情况
  941. func GetSpiderDownloadRateDataNew() {
  942. defer qu.Catch()
  943. sess := util.MgoS.GetMgoConn()
  944. defer util.MgoS.DestoryMongoConn(sess)
  945. ch := make(chan bool, 5)
  946. wg := &sync.WaitGroup{}
  947. lock := &sync.Mutex{}
  948. date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
  949. query := map[string]interface{}{
  950. "date": date,
  951. "event": map[string]interface{}{
  952. "$ne": 7000,
  953. },
  954. }
  955. fields := map[string]interface{}{
  956. "spidercode": 1,
  957. "alltimes": 1,
  958. "zero": 1,
  959. "oh_percent": 1,
  960. }
  961. logger.Debug("query:", query)
  962. it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter()
  963. n := 0
  964. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  965. ch <- true
  966. wg.Add(1)
  967. go func(tmp map[string]interface{}) {
  968. defer func() {
  969. <-ch
  970. wg.Done()
  971. }()
  972. code := qu.ObjToString(tmp["spidercode"])
  973. alltimes := qu.IntAll(tmp["alltimes"])
  974. zero := qu.IntAll(tmp["zero"])
  975. oh_percent := qu.IntAll(tmp["oh_percent"])
  976. lock.Lock()
  977. if spider := CodeInfoMap[code]; spider != nil {
  978. spider.ListDownloadAllTimes = alltimes
  979. spider.ListNoDataTimes = zero
  980. //含有100%采集,及为采集频率异常(由于7410、7500、7510、7700队列模式节点,不建采集频率异常任务)
  981. //上轮数据下载不成功,下轮采集会被任务是新数据(应该建下载异常任务)
  982. if oh_percent > 0 && spider.Model != 0 {
  983. spider.FrequencyErrTimes++
  984. spider.ListOhPercentTimes = oh_percent
  985. }
  986. } else {
  987. logger.Debug("-------------", code)
  988. }
  989. lock.Unlock()
  990. }(tmp)
  991. if n%1000 == 0 {
  992. logger.Debug("current:", n)
  993. }
  994. tmp = map[string]interface{}{}
  995. }
  996. wg.Wait()
  997. logger.Debug("列表页采集统计完成...")
  998. }
  999. //汇总lua错误信息数据
  1000. func GetSpiderWarnErrData() {
  1001. defer qu.Catch()
  1002. logger.Debug("错误信息数据统计...")
  1003. sess := util.MgoS.GetMgoConn()
  1004. defer util.MgoS.DestoryMongoConn(sess)
  1005. match := map[string]interface{}{
  1006. "level": 2,
  1007. "comeintime": map[string]interface{}{
  1008. "$gte": StartTime,
  1009. "$lt": EndTime,
  1010. },
  1011. }
  1012. group1 := map[string]interface{}{
  1013. "_id": map[string]interface{}{
  1014. "code": "$code",
  1015. "info": "$info",
  1016. },
  1017. "datacount": map[string]interface{}{
  1018. "$sum": 1,
  1019. },
  1020. }
  1021. group2 := map[string]interface{}{
  1022. "_id": "$_id.code",
  1023. "infotext": map[string]interface{}{
  1024. "$push": map[string]interface{}{
  1025. "info": "$_id.info",
  1026. "count": "$datacount",
  1027. },
  1028. },
  1029. "count": map[string]interface{}{
  1030. "$sum": "$datacount",
  1031. },
  1032. }
  1033. project := map[string]interface{}{
  1034. "infoarr": "$infotext",
  1035. "count": 1,
  1036. }
  1037. p := []map[string]interface{}{
  1038. map[string]interface{}{"$match": match},
  1039. map[string]interface{}{"$group": group1},
  1040. map[string]interface{}{"$group": group2},
  1041. map[string]interface{}{"$project": project},
  1042. }
  1043. logger.Debug("spider_warn:", match)
  1044. //1、统计spider_warn
  1045. it1 := sess.DB(util.MgoS.DbName).C("spider_warn").Pipe(p).Iter()
  1046. n1 := 0
  1047. ch := make(chan bool, 5)
  1048. wg := &sync.WaitGroup{}
  1049. lock := &sync.Mutex{}
  1050. for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ {
  1051. wg.Add(1)
  1052. ch <- true
  1053. go func(tmp map[string]interface{}) {
  1054. defer func() {
  1055. <-ch
  1056. wg.Done()
  1057. }()
  1058. code := qu.ObjToString(tmp["_id"])
  1059. //spider.Error = map[string]*ErrorInfo{} //初始化
  1060. if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
  1061. for _, info := range infoArr {
  1062. stype := ""
  1063. query := map[string]interface{}{
  1064. "level": 2,
  1065. "comeintime": map[string]interface{}{
  1066. "$gte": StartTime,
  1067. "$lt": EndTime,
  1068. },
  1069. }
  1070. infoMap := info.(map[string]interface{})
  1071. infoText := qu.ObjToString(infoMap["info"]) //错误信息
  1072. errCount := qu.IntAll(infoMap["count"]) //错误数量
  1073. if infoText == "Publishtime Is Too Late" { //发布时间超前
  1074. query["info"] = infoText
  1075. stype = "publishtime"
  1076. } else if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0
  1077. query["info"] = infoText
  1078. stype = "publishtime"
  1079. } else if infoText == "Publishtime Is Too Early" { //发布时间过小
  1080. query["info"] = infoText
  1081. stype = "publishtime"
  1082. } else if infoText == "Field Value Not Contains Chinese" { //title、detail不含中文
  1083. query["info"] = infoText
  1084. stype = "text"
  1085. } else if infoText == "Field Value Contains Random Code" { //title、detail含乱码
  1086. query["info"] = infoText
  1087. stype = "text"
  1088. } else {
  1089. continue
  1090. }
  1091. query["code"] = code
  1092. //logger.Debug(query)
  1093. //errArr := []*ErrRemark{}
  1094. //list, _ := util.MgoS.Find("spider_warn", query, nil, map[string]interface{}{"href": 1}, false, 0, 3)
  1095. //for _, l := range *list {
  1096. // errArr = append(errArr, &ErrRemark{
  1097. // Href: qu.ObjToString(l["href"]),
  1098. // Remark: infoText,
  1099. // })
  1100. //}
  1101. one, _ := util.MgoS.FindOne("spider_warn", query) //查询该错误信息类型的一条href
  1102. oneErrInfo := &ErrRemark{
  1103. Href: qu.ObjToString((*one)["href"]),
  1104. Remark: infoText,
  1105. }
  1106. lock.Lock()
  1107. if spider := CodeInfoMap[code]; spider != nil {
  1108. if errMap := spider.Error[stype]; errMap != nil {
  1109. errMap.Num += errCount
  1110. errMap.Err = append(errMap.Err, oneErrInfo)
  1111. } else {
  1112. spider.Error[stype] = &ErrorInfo{
  1113. Num: errCount,
  1114. Err: []*ErrRemark{
  1115. oneErrInfo,
  1116. },
  1117. }
  1118. }
  1119. }
  1120. lock.Unlock()
  1121. }
  1122. }
  1123. }(tmp)
  1124. if n1%10 == 0 {
  1125. logger.Debug(n1)
  1126. }
  1127. tmp = map[string]interface{}{}
  1128. }
  1129. //2、统计regatherdata
  1130. //match = map[string]interface{}{
  1131. // "state": map[string]interface{}{
  1132. // "$lte": 1,
  1133. // },
  1134. // "from": "lua",
  1135. // "comeintime": map[string]interface{}{
  1136. // "$gte": StartTime,
  1137. // "$lt": EndTime,
  1138. // },
  1139. //}
  1140. //group1 = map[string]interface{}{
  1141. // "_id": "$spidercode",
  1142. // "count": map[string]interface{}{
  1143. // "$sum": 1,
  1144. // },
  1145. //}
  1146. //p = []map[string]interface{}{
  1147. // map[string]interface{}{"$match": match},
  1148. // map[string]interface{}{"$group": group1},
  1149. //}
  1150. //logger.Debug("regather query:", match)
  1151. //it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter()
  1152. //n2 := 0
  1153. //for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
  1154. // wg.Add(1)
  1155. // ch <- true
  1156. // go func(tmp map[string]interface{}) {
  1157. // defer func() {
  1158. // <-ch
  1159. // wg.Done()
  1160. // }()
  1161. // code := qu.ObjToString(tmp["_id"]) //爬虫代码
  1162. // count := qu.IntAll(tmp["count"]) //异常数据量
  1163. // query := map[string]interface{}{
  1164. // "state": map[string]interface{}{
  1165. // "$lte": 1,
  1166. // },
  1167. // "from": "lua",
  1168. // "comeintime": map[string]interface{}{
  1169. // "$gte": StartTime,
  1170. // "$lt": EndTime,
  1171. // },
  1172. // "spidercode": code,
  1173. // }
  1174. // //logger.Debug("query:", query)
  1175. //
  1176. // errArr := []*ErrRemark{}
  1177. // list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3)
  1178. // for _, l := range *list {
  1179. // errText := qu.ObjToString(l["error"])
  1180. // errText = strings.Replace(errText, "<string>:", "", 1)
  1181. // errArr = append(errArr, &ErrRemark{
  1182. // Href: qu.ObjToString(l["href"]),
  1183. // Remark: errText,
  1184. // })
  1185. // }
  1186. // //one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href
  1187. // //oneErrInfo := &ErrRemark{
  1188. // // Href: qu.ObjToString((*one)["href"]),
  1189. // // Remark: qu.ObjToString((*one)["error"]),
  1190. // //}
  1191. // if spider := CodeInfoMap[code]; spider != nil {
  1192. // spider.Error["regather"] = &ErrorInfo{
  1193. // Num: count,
  1194. // Err: errArr,
  1195. // }
  1196. // // if spider_err := spider.Error; spider_err != nil {
  1197. // // spider_err["regather"] = &ErrorInfo{
  1198. // // Num: count,
  1199. // // Err: []map[string]interface{}{
  1200. // // oneErrInfo,
  1201. // // },
  1202. // // }
  1203. // // } else {
  1204. // // spider.Error = map[string]*ErrorInfo{
  1205. // // "regather": &ErrorInfo{
  1206. // // Num: count,
  1207. // // Err: []map[string]interface{}{
  1208. // // oneErrInfo,
  1209. // // },
  1210. // // },
  1211. // // }
  1212. // // }
  1213. // }
  1214. // }(tmp)
  1215. // if n2%10 == 0 {
  1216. // logger.Debug(n2)
  1217. // }
  1218. // tmp = map[string]interface{}{}
  1219. //}
  1220. wg.Wait()
  1221. logger.Debug("错误信息数据统计完成...")
  1222. }
  1223. //汇总python错误信息数据
  1224. func GetPythonWarnErrData() {
  1225. GetPythonListDownloadNum() //统计列表页采集量
  1226. GetPythonDetailDownloadNum() //统计data_bak总下载量
  1227. GetPythonErrData() //统计异常信息
  1228. }
  1229. //python统计列表页采集量
  1230. func GetPythonListDownloadNum() {
  1231. defer qu.Catch()
  1232. logger.Debug("python列表页数据下载量统计开始...")
  1233. sess := util.MgoPy.GetMgoConn()
  1234. defer util.MgoPy.DestoryMongoConn(sess)
  1235. query := map[string]interface{}{
  1236. "runtime": Publishtime,
  1237. "rel_count": map[string]interface{}{
  1238. "$gt": 0,
  1239. },
  1240. }
  1241. fields := map[string]interface{}{
  1242. "spidercode": 1,
  1243. "rel_count": 1,
  1244. }
  1245. lock := &sync.Mutex{}
  1246. wg := &sync.WaitGroup{}
  1247. ch := make(chan bool, 5)
  1248. it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter()
  1249. n := 0
  1250. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  1251. wg.Add(1)
  1252. ch <- true
  1253. go func(tmp map[string]interface{}) {
  1254. defer func() {
  1255. <-ch
  1256. wg.Done()
  1257. }()
  1258. code := qu.ObjToString(tmp["spidercode"])
  1259. count := qu.IntAll(tmp["rel_count"])
  1260. lock.Lock()
  1261. if sp := CodeInfoMap[code]; sp != nil {
  1262. //href不去重统计
  1263. sp.DownloadAllNum += count
  1264. sp.RepeatDownloadAllNum += count
  1265. }
  1266. lock.Unlock()
  1267. }(tmp)
  1268. if n%1000 == 0 {
  1269. logger.Debug(n)
  1270. }
  1271. tmp = map[string]interface{}{}
  1272. }
  1273. wg.Wait()
  1274. logger.Debug("python数据下载量统计完成...")
  1275. }
  1276. //python三级页统计总下载量
  1277. func GetPythonDetailDownloadNum() {
  1278. defer qu.Catch()
  1279. logger.Debug("python三级页数据下载量统计开始...")
  1280. sess := util.MgoPy.GetMgoConn()
  1281. defer util.MgoPy.DestoryMongoConn(sess)
  1282. query := map[string]interface{}{
  1283. "comeintime": map[string]interface{}{
  1284. "$gte": StartTime,
  1285. "$lt": EndTime,
  1286. },
  1287. }
  1288. fields := map[string]interface{}{
  1289. "spidercode": 1,
  1290. "publishtime": 1,
  1291. "sendflag": 1,
  1292. }
  1293. lock := &sync.Mutex{}
  1294. wg := &sync.WaitGroup{}
  1295. ch := make(chan bool, 5)
  1296. it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fields).Iter()
  1297. n := 0
  1298. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  1299. wg.Add(1)
  1300. ch <- true
  1301. go func(tmp map[string]interface{}) {
  1302. defer func() {
  1303. <-ch
  1304. wg.Done()
  1305. }()
  1306. code := qu.ObjToString(tmp["spidercode"])
  1307. ptime := qu.ObjToString(tmp["publishtime"])
  1308. sendflag := qu.ObjToString(tmp["sendflag"])
  1309. samaDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据
  1310. lock.Lock()
  1311. if sp := CodeInfoMap[code]; sp != nil {
  1312. //sp.DownloadAllNum++
  1313. //sp.RepeatDownloadAllNum++
  1314. if sendflag == "true" {
  1315. sp.DownloadSuccessNum++
  1316. sp.RepeatDownloadSuccessNum++
  1317. }
  1318. if samaDay {
  1319. sp.PTimeAllNum++
  1320. sp.RepeatPTimeAllNum++
  1321. if sendflag == "true" {
  1322. sp.PTimeSuccessNum++
  1323. sp.RepeatPTimeSuccessNum++
  1324. sp.RepeatPTimeSuccessDataBakNum++
  1325. }
  1326. }
  1327. }
  1328. lock.Unlock()
  1329. }(tmp)
  1330. if n%1000 == 0 {
  1331. logger.Debug(n)
  1332. }
  1333. tmp = map[string]interface{}{}
  1334. }
  1335. wg.Wait()
  1336. logger.Debug("python数据下载量统计完成...")
  1337. }
  1338. //python统计总下载量
  1339. func GetPythonDownloadNum_back() {
  1340. defer qu.Catch()
  1341. sess := util.MgoPy.GetMgoConn()
  1342. defer util.MgoPy.DestoryMongoConn(sess)
  1343. match := map[string]interface{}{
  1344. "comeintime": map[string]interface{}{
  1345. "$gte": StartTime,
  1346. "$lt": EndTime,
  1347. },
  1348. }
  1349. group1 := map[string]interface{}{
  1350. "_id": map[string]interface{}{
  1351. "spidercode": "$spidercode",
  1352. "sendflag": "$sendflag",
  1353. },
  1354. "datacount": map[string]interface{}{
  1355. "$sum": 1,
  1356. },
  1357. }
  1358. group2 := map[string]interface{}{
  1359. "_id": "$_id.spidercode",
  1360. "sendflagarr": map[string]interface{}{
  1361. "$push": map[string]interface{}{
  1362. "sendflag": "$_id.sendflag",
  1363. "count": "$datacount",
  1364. },
  1365. },
  1366. "count": map[string]interface{}{
  1367. "$sum": "$datacount",
  1368. },
  1369. }
  1370. project := map[string]interface{}{
  1371. "infoarr": "$sendflagarr",
  1372. "count": 1,
  1373. }
  1374. p := []map[string]interface{}{
  1375. map[string]interface{}{"$match": match},
  1376. map[string]interface{}{"$group": group1},
  1377. map[string]interface{}{"$group": group2},
  1378. map[string]interface{}{"$project": project},
  1379. }
  1380. ch := make(chan bool, 5)
  1381. wg := &sync.WaitGroup{}
  1382. lock := &sync.Mutex{}
  1383. it1 := sess.DB(util.MgoPy.DbName).C("data_bak").Pipe(p).Iter()
  1384. n := 0
  1385. for tmp := make(map[string]interface{}); it1.Next(&tmp); n++ {
  1386. wg.Add(1)
  1387. ch <- true
  1388. go func(tmp map[string]interface{}) {
  1389. defer func() {
  1390. <-ch
  1391. wg.Done()
  1392. }()
  1393. code := qu.ObjToString(tmp["_id"])
  1394. count := qu.IntAll(tmp["count"]) //下载总量
  1395. successCount := 0 //下载成功总量
  1396. if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
  1397. for _, info := range infoArr {
  1398. infoMap := info.(map[string]interface{})
  1399. if sendflag := qu.ObjToString(infoMap["sendflag"]); sendflag == "true" {
  1400. successCount = qu.IntAll(infoMap["count"])
  1401. }
  1402. }
  1403. }
  1404. lock.Lock()
  1405. if sp := CodeInfoMap[code]; sp != nil {
  1406. sp.DownloadAllNum = count
  1407. sp.DownloadSuccessNum = successCount //保存服务发送成功数
  1408. }
  1409. lock.Unlock()
  1410. }(tmp)
  1411. if n%100 == 0 {
  1412. logger.Debug(n)
  1413. }
  1414. tmp = map[string]interface{}{}
  1415. }
  1416. wg.Wait()
  1417. logger.Debug("python数据下载量统计完成...")
  1418. }
  1419. //python统计异常信息
  1420. func GetPythonErrData() {
  1421. defer qu.Catch()
  1422. sess := util.MgoPy.GetMgoConn()
  1423. defer util.MgoPy.DestoryMongoConn(sess)
  1424. query := map[string]interface{}{
  1425. "comeintime": map[string]interface{}{
  1426. "$gte": StartTime,
  1427. "$lt": EndTime,
  1428. },
  1429. }
  1430. fieles := map[string]interface{}{
  1431. "spidercode": 1,
  1432. "parser_name": 1,
  1433. "parse_url": 1,
  1434. "failed": 1,
  1435. "code": 1,
  1436. }
  1437. it := sess.DB(util.MgoPy.DbName).C("mgp_list").Find(&query).Select(&fieles).Iter()
  1438. n := 0
  1439. lock := &sync.Mutex{}
  1440. wg := &sync.WaitGroup{}
  1441. ch := make(chan bool, 5)
  1442. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  1443. wg.Add(1)
  1444. ch <- true
  1445. go func(tmp map[string]interface{}) {
  1446. defer func() {
  1447. <-ch
  1448. wg.Done()
  1449. }()
  1450. state := qu.IntAll(tmp["code"])
  1451. if state == -1 { //状态码为-1表示详情页未执行下载操作,不统计
  1452. return
  1453. }
  1454. spidercode := qu.ObjToString(tmp["spidercode"])
  1455. remark := qu.ObjToString(tmp["parser_name"])
  1456. href := qu.ObjToString(tmp["parse_url"])
  1457. failed := qu.IntAll(tmp["failed"])
  1458. errType := StateFeedBackErr[state]
  1459. oneErrInfo := &ErrRemark{
  1460. Href: href,
  1461. Remark: remark,
  1462. }
  1463. lock.Lock()
  1464. if spider := CodeInfoMap[spidercode]; spider != nil {
  1465. if failed == 0 { //未采集
  1466. spider.NoDownloadNum++
  1467. } else { //下载失败
  1468. spider.DownloadFailedNum++
  1469. if spider_err := spider.Error; spider_err != nil {
  1470. if errInfo := spider_err[errType]; errInfo != nil {
  1471. errInfo.Num++
  1472. if len(errInfo.Err) < 3 { //最多存放三个错误数据连接
  1473. errInfo.Err = append(errInfo.Err, oneErrInfo)
  1474. }
  1475. } else {
  1476. spider.Error[errType] = &ErrorInfo{
  1477. Num: 1,
  1478. Err: []*ErrRemark{
  1479. oneErrInfo,
  1480. },
  1481. }
  1482. }
  1483. } else {
  1484. spider.Error = map[string]*ErrorInfo{
  1485. errType: &ErrorInfo{
  1486. Num: 1,
  1487. Err: []*ErrRemark{
  1488. oneErrInfo,
  1489. },
  1490. },
  1491. }
  1492. }
  1493. }
  1494. }
  1495. lock.Unlock()
  1496. }(tmp)
  1497. if n%100 == 0 {
  1498. logger.Debug(n)
  1499. }
  1500. tmp = map[string]interface{}{}
  1501. }
  1502. wg.Wait()
  1503. logger.Debug("python下载异常数据统计完成...")
  1504. }
  1505. //根据爬虫监控信息创建任务流程
  1506. func CreateTaskProcess() {
  1507. defer qu.Catch()
  1508. logger.Debug("开始生成爬虫任务...")
  1509. //arr := []map[string]interface{}{}
  1510. upsertBulk := [][]map[string]interface{}{} //任务更新集
  1511. arr := []map[string]interface{}{} //当天爬虫信息集
  1512. wg := &sync.WaitGroup{}
  1513. lock := &sync.Mutex{}
  1514. ch := make(chan bool, 10)
  1515. logger.Debug("CodeInfoMap:", len(CodeInfoMap))
  1516. for code, spider := range CodeInfoMap {
  1517. wg.Add(1)
  1518. ch <- true
  1519. go func(code string, spider *Spider) {
  1520. defer func() {
  1521. <-ch
  1522. wg.Done()
  1523. }()
  1524. //整理新任务的信息
  1525. task := &Task{
  1526. DescribeMap: map[int]string{},
  1527. }
  1528. //task.Platform = spider.Platform
  1529. //task.Site = spider.Site
  1530. //task.Code = spider.Code
  1531. //task.Channel = spider.Channel
  1532. //task.ModifyUser = spider.ModifyUser
  1533. //task.ModifyId = spider.ModifyId
  1534. //task.FrequencyErrTimes = spider.FrequencyErrTimes
  1535. //lua、python共有异常publishtime、text
  1536. if len(spider.Error) > 0 {
  1537. //1、download:下载异常errtype:5;
  1538. //2、regather:运行异常errtype:4;
  1539. //3、publishtime:时间异常errtype:3;
  1540. //4、text:数据异常errtype:2;
  1541. for stype, info := range LuaErrTypeInfoMap {
  1542. if err := spider.Error[stype]; err != nil {
  1543. taskStateOk := false
  1544. if stype == "download" {
  1545. if spider.Model == 1 { //新模式(7100、7110、7200、7210、7300、7310、7400)根据异常总量和占比建任务
  1546. moreThanLimit := false
  1547. //1、异常条数;2、异常占比
  1548. if spider.DownloadFailedNum > FailedNumLimit {
  1549. moreThanLimit = true
  1550. } else if spider.DownloadAllNum > 0 && (float64(spider.DownloadFailedNum)/float64(spider.DownloadAllNum)) > FailedPercentLimit {
  1551. moreThanLimit = true
  1552. }
  1553. if !moreThanLimit { //不在异常范围,不建该类型任务
  1554. continue
  1555. }
  1556. } else if spider.Model == 0 && spider.Working == 1 { //老模式,队列模式(7500,7700)有下载异常数据直接建任务
  1557. if spider.DownloadFailedNum > 0 { //只有7500、7700出现一条下载异常时,任务状态即为待处理
  1558. task.State = 1 //待处理
  1559. taskStateOk = true
  1560. } else {
  1561. continue
  1562. }
  1563. } else if spider.Model == 0 && spider.Working == 0 { //老模式,高性能模式(7410)不建下载异常任务
  1564. continue
  1565. }
  1566. }
  1567. //取最大的错误异常类型
  1568. if task.ErrType < info.ErrType {
  1569. task.ErrType = info.ErrType
  1570. }
  1571. //download、regather、publishtime、text错误中有一个类型错误个数大于10,任务状态即为待处理
  1572. if !taskStateOk && err.Num > 10 { //错误个数大于10为待处理
  1573. task.State = 1 //待处理
  1574. }
  1575. //错误描述
  1576. descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
  1577. for _, errRemark := range err.Err {
  1578. if stype == "regather" { //特殊处理运行异常描述
  1579. descript += errRemark.Remark + ":" + errRemark.Href + "\n"
  1580. } else {
  1581. descript += errRemark.Href + "\n"
  1582. }
  1583. }
  1584. task.DescribeMap[info.ErrType] = descript
  1585. }
  1586. }
  1587. }
  1588. if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理)
  1589. //5、列表页异常 errtype:7
  1590. if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes {
  1591. if !spider.ListIsFilter || (spider.FindListHeart < util.GetTime(0) && spider.ListIsFilter) { //列表页不含过滤代码或者有过滤无心跳
  1592. task.State = 1 //待处理
  1593. task.ErrType = TASK_LISTERR
  1594. task.DescribeMap[TASK_LISTERR] = "列表页异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListNoDataTimes) + "轮无数据\n"
  1595. }
  1596. // if !spider.ListIsFilter { //列表页不含过滤代码
  1597. // task.State = 1 //待处理
  1598. // task.ErrType = TASK_LISTERR
  1599. // } else if len(task.DescribeMap) == 0 { //只有列表页异常且有过滤代码
  1600. // task.State = 0 //待确认
  1601. // task.ErrType = TASK_LISTERR
  1602. // }
  1603. }
  1604. //6、采集频率异常 errtype:8
  1605. if spider.ListOhPercentTimes > 0 { //采集频率异常
  1606. //UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架
  1607. //只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入)
  1608. if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认
  1609. task.State = 1 //待处理
  1610. task.ErrType = TASK_RATEERR
  1611. } else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3
  1612. task.State = 0 //待确认
  1613. task.ErrType = TASK_RATEERR
  1614. }
  1615. task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n"
  1616. }
  1617. } else if spider.Platform == "python" { //python异常
  1618. for stype, info := range PythonErrTypeInfoMap {
  1619. if err := spider.Error[stype]; err != nil {
  1620. //取最大的错误异常类型
  1621. if task.ErrType < info.ErrType {
  1622. task.ErrType = info.ErrType
  1623. }
  1624. if info.ErrType > 3 { //python404异常、下载异常、运行异常任务状态均为待处理
  1625. task.State = 1
  1626. }
  1627. //错误描述
  1628. descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
  1629. for _, errRemark := range err.Err {
  1630. descript += errRemark.Remark + ":" + errRemark.Href + "\n"
  1631. }
  1632. //lua和python的info.ErrType:3、4可能同时存在,描述累加
  1633. task.DescribeMap[info.ErrType] = descript + task.DescribeMap[info.ErrType]
  1634. }
  1635. }
  1636. }
  1637. //存储爬虫统计信息
  1638. byteText, err := json.Marshal(spider)
  1639. if err != nil {
  1640. logger.Debug("Json Marshal Error", code)
  1641. return
  1642. }
  1643. tmp := map[string]interface{}{}
  1644. if json.Unmarshal(byteText, &tmp) == nil {
  1645. lock.Lock()
  1646. arr = append(arr, tmp)
  1647. lock.Unlock()
  1648. } else {
  1649. logger.Debug("Json UnMarshal Error", code)
  1650. return
  1651. }
  1652. //根据爬虫信息新建任务
  1653. CreateTask(task, spider, &upsertBulk, lock) //比对历史任务,新建任务
  1654. if spider.Platform == "golua平台" {
  1655. //列表页总下载量
  1656. atomic.AddInt64(&LuaListDownloadAllNum, int64(spider.RepeatDownloadAllNum))
  1657. //列表页总下载成功量
  1658. atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum))
  1659. } else {
  1660. //列表页总下载量
  1661. atomic.AddInt64(&PythonListDownloadAllNum, int64(spider.RepeatDownloadAllNum))
  1662. //列表页总下载成功量
  1663. atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum))
  1664. }
  1665. lock.Lock()
  1666. if len(arr) > 500 {
  1667. util.MgoE.SaveBulk("luacodeinfo", arr...)
  1668. arr = []map[string]interface{}{}
  1669. }
  1670. if len(upsertBulk) > 500 {
  1671. util.MgoE.UpSertBulk("task", upsertBulk...)
  1672. upsertBulk = [][]map[string]interface{}{}
  1673. }
  1674. lock.Unlock()
  1675. }(code, spider)
  1676. }
  1677. wg.Wait()
  1678. lock.Lock()
  1679. if len(arr) > 0 {
  1680. util.MgoE.SaveBulk("luacodeinfo", arr...)
  1681. arr = []map[string]interface{}{}
  1682. }
  1683. if len(upsertBulk) > 0 {
  1684. util.MgoE.UpSertBulk("task", upsertBulk...)
  1685. upsertBulk = [][]map[string]interface{}{}
  1686. }
  1687. lock.Unlock()
  1688. logger.Debug("生成任务完成...")
  1689. CodeInfoMap = map[string]*Spider{}
  1690. }
  1691. //新任务与历史任务整合
  1692. func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, lock *sync.Mutex) {
  1693. defer qu.Catch()
  1694. if t.ErrType == 0 { //不是异常任务
  1695. return
  1696. }
  1697. if sp.PendState == 1 {
  1698. if sp.DownloadAllNum == 0 { //挂起状态爬虫,且下载量为0,不建任务
  1699. return
  1700. } else { //挂起状态有下载量,更新爬虫挂起状态
  1701. sp.PendState = 0 //影响任务i_pendstate状态
  1702. util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": map[string]interface{}{"pendstate": 0}}, false, false)
  1703. }
  1704. }
  1705. diff := time.Now().Unix() - sp.AuditTime
  1706. if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
  1707. logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
  1708. return
  1709. }
  1710. descript_new := "" //新任务的异常描述
  1711. for _, text := range t.DescribeMap {
  1712. descript_new += text
  1713. }
  1714. query := map[string]interface{}{
  1715. "s_code": sp.Code,
  1716. "i_state": map[string]interface{}{
  1717. "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务
  1718. },
  1719. }
  1720. fields := map[string]interface{}{
  1721. "i_state": 1,
  1722. "s_type": 1,
  1723. "s_descript": 1,
  1724. "i_times": 1,
  1725. "s_urgency": 1,
  1726. }
  1727. list, _ := util.MgoE.Find("task", query, nil, fields, false, -1, -1)
  1728. update := []map[string]interface{}{}
  1729. if list != nil && len(*list) > 0 { //已有任务
  1730. if len(*list) > 1 {
  1731. logger.Error("Code:", sp.Code, "任务异常")
  1732. util.MgoE.Save("luacreatetaskerr", map[string]interface{}{
  1733. "code": sp.Code,
  1734. "comeintime": time.Now().Unix(),
  1735. "tasknum": len(*list),
  1736. })
  1737. return
  1738. }
  1739. task := (*list)[0] //唯一任务
  1740. state_old := qu.IntAll(task["i_state"]) //历史任务状态
  1741. times_old := qu.IntAll(task["i_times"]) //历史任务次数
  1742. type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型
  1743. urgency_old := qu.ObjToString(task["s_urgency"]) //历史任务紧急度
  1744. descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述
  1745. result := map[string]interface{}{
  1746. "i_frequencyerrtimes": sp.FrequencyErrTimes,
  1747. "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
  1748. "l_updatetime": time.Now().Unix(),
  1749. "i_times": times_old + 1,
  1750. "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + descript_new,
  1751. }
  1752. if state_old == 0 || state_old == 1 { //如果历史任务状态为待确认、待处理,更新任务信息,其它状态只追加任务描述、任务次数、下载量
  1753. //任务状态state、任务类型s_type
  1754. if state_old == 1 || t.State == 1 { //新任务、历史任务有一个任务状态为待处理,更新后任务状态为待处理
  1755. result["i_state"] = 1
  1756. if t.State == 1 && state_old == 1 { //新任务和历史任务均为待处理时,取异常类型等级高者
  1757. if t.ErrType > qu.IntAll(type_old) {
  1758. result["s_type"] = fmt.Sprint(t.ErrType)
  1759. }
  1760. } else if t.State == 1 { //新任务为待处理历史任务为待确认,取新任务的类型
  1761. result["s_type"] = fmt.Sprint(t.ErrType)
  1762. } /*else if state_old == 1 {
  1763. }*/
  1764. } else if state_old == 0 && t.State == 0 && t.ErrType > qu.IntAll(type_old) { //新任务、历史任务均为待确认,取异常类型等级高者
  1765. result["s_type"] = fmt.Sprint(t.ErrType)
  1766. }
  1767. if times_old >= 3 { //某爬虫第四次建任务时,任务状态变为待处理
  1768. result["i_state"] = 1
  1769. }
  1770. //任务紧急度urgency
  1771. urgency := qu.IntAll(urgency_old)
  1772. if urgency < 4 {
  1773. result["s_urgency"] = fmt.Sprint(urgency + 1)
  1774. }
  1775. //最迟完成时间
  1776. if qu.IntAll(result["i_state"]) == 1 && state_old == 0 { //新任务综合处理后任务状态为待处理,历史任务为待确认时,更新最迟完成时间
  1777. result["l_complete"] = util.CompleteTime(fmt.Sprint(urgency + 1))
  1778. }
  1779. }
  1780. update = append(update, map[string]interface{}{"_id": task["_id"]})
  1781. update = append(update, map[string]interface{}{"$set": result})
  1782. lock.Lock()
  1783. *upsertBulk = append(*upsertBulk, update)
  1784. lock.Unlock()
  1785. } else { //无历史任务
  1786. //times := 0
  1787. //if t.State == 1 { //待处理times=1
  1788. // times = 1
  1789. //}
  1790. saveMap := map[string]interface{}{
  1791. "s_modify": sp.ModifyUser,
  1792. "s_modifyid": sp.ModifyId,
  1793. "s_code": sp.Code,
  1794. "s_site": sp.Site,
  1795. "s_channel": sp.Channel,
  1796. "i_event": sp.Event,
  1797. "i_state": t.State,
  1798. "s_source": "程序",
  1799. "s_type": fmt.Sprint(t.ErrType),
  1800. "s_descript": descript_new,
  1801. "i_times": 1,
  1802. "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
  1803. "l_comeintime": time.Now().Unix(),
  1804. //"l_updatetime": time.Now().Unix(),
  1805. "l_complete": util.CompleteTime("1"),
  1806. "s_urgency": "1",
  1807. "i_frequencyerrtimes": sp.FrequencyErrTimes,
  1808. "i_pendstate": sp.PendState, //爬虫挂起状态
  1809. }
  1810. update = append(update, query)
  1811. update = append(update, saveMap)
  1812. lock.Lock()
  1813. *upsertBulk = append(*upsertBulk, update)
  1814. lock.Unlock()
  1815. }
  1816. }
  1817. //更新爬虫最大页、爬虫上下架
  1818. func UpdateLuaInfo(sp *Spider) {
  1819. defer qu.Catch()
  1820. //1、更新爬虫信息
  1821. set := map[string]interface{}{
  1822. "frequencyerrtimes": sp.FrequencyErrTimes, //更新次数
  1823. }
  1824. if sp.FrequencyErrTimes <= 3 {
  1825. set["param_common.5"] = sp.MaxPage + 1
  1826. }
  1827. logger.Debug("Code:", sp.Code, " ", sp.FrequencyErrTimes)
  1828. b := util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": set}, false, false)
  1829. if b && sp.FrequencyErrTimes <= 3 { //FrequencyErrTimes>3时会建采集频率异常的待处理任务,不再上下架
  1830. //爬虫下架、上加
  1831. qu.Debug("爬虫上下架 code:", sp.Code)
  1832. CodeLock.Lock()
  1833. ok, err := util.UpdateSpiderByCodeState(sp.Code, "6", sp.Event) //下架
  1834. if ok && err == nil {
  1835. logger.Debug(sp.Code, "下架成功")
  1836. ok, err = util.UpdateSpiderByCodeState(sp.Code, "5", sp.Event) //上架
  1837. if ok && err == nil {
  1838. logger.Debug(sp.Code, "上架成功")
  1839. }
  1840. }
  1841. CodeLock.Unlock()
  1842. }
  1843. }
  1844. //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
  1845. func ResetDataState() {
  1846. defer qu.Catch()
  1847. logger.Info("-----更新数据状态-----")
  1848. sess := util.MgoS.GetMgoConn()
  1849. defer util.MgoS.DestoryMongoConn(sess)
  1850. ch := make(chan bool, 3)
  1851. wg := &sync.WaitGroup{}
  1852. lock := &sync.Mutex{}
  1853. query := map[string]interface{}{
  1854. "comeintime": map[string]interface{}{
  1855. "$gte": util.GetTime(-util.DayNum),
  1856. "$lt": util.GetTime(0),
  1857. },
  1858. "state": -1,
  1859. }
  1860. field := map[string]interface{}{
  1861. "_id": 1,
  1862. }
  1863. it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
  1864. count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
  1865. logger.Info("更新数据状态数量:", count)
  1866. n := 0
  1867. arr := [][]map[string]interface{}{}
  1868. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  1869. ch <- true
  1870. wg.Add(1)
  1871. go func(tmp map[string]interface{}) {
  1872. defer func() {
  1873. <-ch
  1874. wg.Done()
  1875. }()
  1876. update := []map[string]interface{}{}
  1877. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  1878. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
  1879. lock.Lock()
  1880. arr = append(arr, update)
  1881. if len(arr) > 500 {
  1882. tmps := arr
  1883. util.MgoS.UpdateBulk("spider_highlistdata", tmps...)
  1884. arr = [][]map[string]interface{}{}
  1885. }
  1886. lock.Unlock()
  1887. }(tmp)
  1888. tmp = map[string]interface{}{}
  1889. }
  1890. wg.Wait()
  1891. lock.Lock()
  1892. if len(arr) > 0 {
  1893. util.MgoS.UpdateBulk("spider_highlistdata", arr...)
  1894. arr = [][]map[string]interface{}{}
  1895. }
  1896. lock.Unlock()
  1897. logger.Info("-----更新数据状态完毕-----")
  1898. }
  1899. //关闭任务
  1900. func CloseTask() {
  1901. qu.Catch()
  1902. logger.Debug("---清理未更新任务---")
  1903. decreaseDay, day := 0, 0
  1904. var cleanDay string
  1905. for {
  1906. decreaseDay--
  1907. weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
  1908. if weekDay != "Saturday" && weekDay != "Sunday" {
  1909. day++
  1910. }
  1911. if day == util.CloseNum {
  1912. cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
  1913. break
  1914. }
  1915. }
  1916. the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
  1917. unix_time := the_time.Unix() //凌晨时间戳
  1918. query := map[string]interface{}{
  1919. "i_state": 0,
  1920. "l_complete": map[string]interface{}{
  1921. "$lt": unix_time + 86400,
  1922. },
  1923. "s_type": "1",
  1924. // "s_type": map[string]interface{}{
  1925. // "$ne": "7",
  1926. // },
  1927. }
  1928. logger.Debug("query:", query)
  1929. set := map[string]interface{}{
  1930. "$set": map[string]interface{}{
  1931. "i_state": 6,
  1932. },
  1933. }
  1934. util.MgoE.Update("task", query, set, false, true)
  1935. logger.Debug("---清理未更新任务完毕---")
  1936. }
  1937. //保存爬虫每日监控信息
  1938. func SaveCodeInfo() {
  1939. defer qu.Catch()
  1940. arr := []map[string]interface{}{}
  1941. wg := &sync.WaitGroup{}
  1942. lock := &sync.Mutex{}
  1943. ch := make(chan bool, 10)
  1944. logger.Debug("CodeInfoMap:", len(CodeInfoMap))
  1945. for code, spider := range CodeInfoMap {
  1946. wg.Add(1)
  1947. ch <- true
  1948. go func(code string, sp Spider) {
  1949. defer func() {
  1950. <-ch
  1951. wg.Done()
  1952. }()
  1953. byteText, err := json.Marshal(sp)
  1954. if err != nil {
  1955. logger.Debug("Json Marshal Error", code)
  1956. return
  1957. }
  1958. tmp := map[string]interface{}{}
  1959. if json.Unmarshal(byteText, &tmp) == nil {
  1960. lock.Lock()
  1961. arr = append(arr, tmp)
  1962. lock.Unlock()
  1963. } else {
  1964. logger.Debug("Json UnMarshal Error", code)
  1965. return
  1966. }
  1967. if sp.Platform == "golua平台" {
  1968. //列表页总下载量
  1969. atomic.AddInt64(&LuaListDownloadAllNum, int64(sp.RepeatDownloadAllNum))
  1970. //列表页总下载成功量
  1971. atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum))
  1972. } else {
  1973. //列表页总下载量
  1974. atomic.AddInt64(&PythonListDownloadAllNum, int64(sp.RepeatDownloadAllNum))
  1975. //列表页总下载成功量
  1976. atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum))
  1977. }
  1978. lock.Lock()
  1979. if len(arr) > 500 {
  1980. util.MgoE.SaveBulk("luacodeinfo_back", arr...)
  1981. arr = []map[string]interface{}{}
  1982. }
  1983. lock.Unlock()
  1984. }(code, *spider)
  1985. }
  1986. wg.Wait()
  1987. if len(arr) > 0 {
  1988. util.MgoE.SaveBulk("luacodeinfo_back", arr...)
  1989. arr = []map[string]interface{}{}
  1990. }
  1991. logger.Debug("爬虫基本信息生成完成...")
  1992. }
  1993. func SaveUserCreateTaskNum() {
  1994. defer qu.Catch()
  1995. for user, sn := range UserTaskNum {
  1996. save := map[string]interface{}{}
  1997. save["user"] = user
  1998. save["comeintime"] = time.Now().Unix()
  1999. for s, n := range sn {
  2000. save[s] = n
  2001. }
  2002. util.MgoE.Save("luausertask", save)
  2003. }
  2004. UserTaskNum = map[string]map[string]int{}
  2005. }