task.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088
  1. package main
  2. import (
  3. "fmt"
  4. mgo "mongodb"
  5. qu "qfw/util"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/donnie4w/go-logger/logger"
  10. )
  11. type Task struct {
  12. Code string //爬虫代码
  13. Site string //站点
  14. Channel string //栏目
  15. ErrType string //异常类型:6:运行异常;5:下载异常;4:发布时间异常;3:乱码;2:状态码异常;1:数据量异常
  16. ErrInfo map[string]map[string]interface{} //异常集合
  17. Description string //描述
  18. State int //状态
  19. }
  20. var (
  21. StartTime int64 //上一个工作日的起始时间
  22. EndTime int64 //上一个工作日的结束时间
  23. TaskMap map[string]*Task //任务集合
  24. StartTaskCron string //任务开始
  25. UpdateStateCron string //每天关闭任务的时间
  26. CodeSummaryCron string //每天统计爬虫信息
  27. CloseNum int //关闭几天的任务
  28. DayNum int //更新数据天数
  29. UserTaskNum map[string]map[string]int //记录每人每天新建任务量
  30. )
  31. func StartTask() {
  32. GetDownloadNumber() //统计下载量
  33. CreateTaskProcess() //创建任务
  34. ResetDataState() //更新数据状态
  35. CloseTask() //关闭任务
  36. }
  37. //创建任务
  38. func CreateTaskProcess() {
  39. InitInfo() //初始化
  40. GetSpiderDownloadRateData() //1、统计spider_downloadrate前一天列表页采集异常爬虫
  41. GetStatusCodeErrorData() //2、统计spider_sitecheck 站点异常爬虫(404)
  42. GetDownloadFailedData() //3、统计spider_highlistdata前一天下载失败的爬虫数据(统计完成后修改状态state:0)
  43. GetRegatherFailedData() //4、统计regatherdata前一天重采失败的爬虫数据
  44. GetDTPErrData() //5、统计spider_warn异常数据(发布时间异常、乱码)
  45. GetDownloadNumErrData() //6、统计download前一天下载量异常的爬虫数据(每天1点统计下载量,目前统计完成需要1个小时)
  46. SaveResult() //保存统计信息
  47. CreateLuaTask() //创建任务
  48. SaveUserCreateTaskNum() //保存每人创建的任务量
  49. }
  50. //初始化
  51. func InitInfo() {
  52. defer qu.Catch()
  53. TaskMap = map[string]*Task{}
  54. UserTaskNum = map[string]map[string]int{}
  55. InitTime() //初始化时间
  56. }
  57. //关闭任务
  58. func CloseTask() {
  59. qu.Catch()
  60. logger.Debug("---清理未更新任务---")
  61. decreaseDay, day := 0, 0
  62. var cleanDay string
  63. for {
  64. decreaseDay--
  65. weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
  66. if weekDay != "Saturday" && weekDay != "Sunday" {
  67. day++
  68. }
  69. if day == CloseNum {
  70. cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
  71. break
  72. }
  73. }
  74. the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
  75. unix_time := the_time.Unix() //凌晨时间戳
  76. query := map[string]interface{}{
  77. "i_state": 0,
  78. "l_complete": map[string]interface{}{
  79. "$lt": unix_time + 86400,
  80. },
  81. "s_type": "1",
  82. // "s_type": map[string]interface{}{
  83. // "$ne": "7",
  84. // },
  85. }
  86. logger.Debug("query:", query)
  87. set := map[string]interface{}{
  88. "$set": map[string]interface{}{
  89. "i_state": 6,
  90. },
  91. }
  92. MgoE.Update("task", query, set, false, true)
  93. logger.Debug("---清理未更新任务完毕---")
  94. }
  95. //1、统计spider_downloadrate前一天列表页采集异常爬虫
  96. func GetSpiderDownloadRateData() {
  97. defer qu.Catch()
  98. logger.Debug("---开始统计spider_downloadrate异常信息---")
  99. sess := MgoS.GetMgoConn()
  100. defer MgoS.DestoryMongoConn(sess)
  101. ch := make(chan bool, 5)
  102. wg := &sync.WaitGroup{}
  103. lock := &sync.Mutex{}
  104. date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
  105. query := map[string]interface{}{
  106. "date": date,
  107. }
  108. it := sess.DB("spider").C("spider_downloadrate").Find(&query).Iter()
  109. n := 0
  110. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  111. ch <- true
  112. wg.Add(1)
  113. go func(tmp map[string]interface{}) {
  114. defer func() {
  115. <-ch
  116. wg.Done()
  117. }()
  118. stype := -1
  119. //1、统计采集频率异常信息
  120. oh_percent := qu.IntAll(tmp["oh_percent"])
  121. event := qu.IntAll(tmp["event"])
  122. if oh_percent > 0 && event != 7410 {
  123. stype = 8
  124. }
  125. //2、统计列表页异常(统计zero占总下载次数的百分比超过80%的)
  126. alltimes := qu.IntAll(tmp["alltimes"])
  127. zero := qu.IntAll(tmp["zero"])
  128. percent := 0 //记录百分比
  129. if zero > 0 {
  130. tmpPercent := float64(zero) / float64(alltimes)
  131. tmpPercent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", tmpPercent), 64)
  132. percent = int(tmpPercent * float64(100))
  133. if percent >= 80 { //占比超过80%
  134. stype = 7
  135. }
  136. }
  137. if stype != -1 { //出现异常
  138. code := qu.ObjToString(tmp["spidercode"])
  139. site := qu.ObjToString(tmp["site"])
  140. channel := qu.ObjToString(tmp["channel"])
  141. t := &Task{
  142. Code: code,
  143. Site: site,
  144. Channel: channel,
  145. ErrInfo: map[string]map[string]interface{}{},
  146. State: 1,
  147. }
  148. if stype == 8 {
  149. t.ErrType = "8"
  150. t.ErrInfo = map[string]map[string]interface{}{
  151. "8": map[string]interface{}{
  152. "num": oh_percent,
  153. },
  154. }
  155. t.Description = "采集频率异常:\n 列表页共采集" + fmt.Sprint(alltimes) + "轮,其中有" + fmt.Sprint(oh_percent) + "轮数据全采\n"
  156. } else if stype == 7 {
  157. t.ErrType = "7"
  158. t.ErrInfo = map[string]map[string]interface{}{
  159. "7": map[string]interface{}{
  160. "num": percent,
  161. },
  162. }
  163. t.Description = "列表页异常:\n 列表页采集无信息次数占比" + fmt.Sprint(percent) + "%\n"
  164. }
  165. lock.Lock()
  166. TaskMap[code] = t
  167. lock.Unlock()
  168. }
  169. }(tmp)
  170. if n%100 == 0 {
  171. qu.Debug("current:", n)
  172. }
  173. tmp = map[string]interface{}{}
  174. }
  175. wg.Wait()
  176. logger.Debug("---统计spider_downloadrate异常信息完成---")
  177. }
  178. //2、状态码404
  179. func GetStatusCodeErrorData() {
  180. defer qu.Catch()
  181. logger.Debug("---开始统计栏目地址404数据---")
  182. sess := MgoS.GetMgoConn()
  183. defer MgoS.DestoryMongoConn(sess)
  184. ch := make(chan bool, 5)
  185. wg := &sync.WaitGroup{}
  186. lock := &sync.Mutex{}
  187. field := map[string]interface{}{
  188. "url": 1,
  189. "code": 1,
  190. "site": 1,
  191. "channel": 1,
  192. }
  193. query := map[string]interface{}{
  194. "comeintime": map[string]interface{}{
  195. "$gte": StartTime,
  196. "$lte": EndTime,
  197. },
  198. "statuscode": 404,
  199. }
  200. it := sess.DB("spider").C("spider_sitecheck").Find(&query).Select(&field).Iter()
  201. count, _ := sess.DB("spider").C("spider_sitecheck").Find(&query).Count()
  202. logger.Debug("共有404地址", count, "条")
  203. n := 0
  204. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  205. ch <- true
  206. wg.Add(1)
  207. go func(tmp map[string]interface{}) {
  208. defer func() {
  209. <-ch
  210. wg.Done()
  211. }()
  212. code := qu.ObjToString(tmp["code"])
  213. one, _ := MgoE.FindOneByField("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"state": 1})
  214. state := qu.IntAll((*one)["state"])
  215. if state == 4 || state > 6 {
  216. return
  217. }
  218. //判断3天内是否有采集数据,有则不建404任务
  219. stime, etime := GetTime(-3), GetTime(0)
  220. q := map[string]interface{}{
  221. "spidercode": code,
  222. "l_np_publishtime": map[string]interface{}{
  223. "$gte": stime,
  224. "$lte": etime,
  225. },
  226. }
  227. if MgoS.Count("data_bak", q) > 0 { //有采集数据,不认为是404
  228. return
  229. }
  230. href := qu.ObjToString(tmp["url"])
  231. site := qu.ObjToString(tmp["site"])
  232. channel := qu.ObjToString(tmp["channel"])
  233. lock.Lock()
  234. if t := TaskMap[code]; t != nil {
  235. t.ErrInfo["6"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  236. "num": 404,
  237. "hrefs": []string{href},
  238. }
  239. t.Description += "网站监测:404\n" + href + "\n"
  240. t.State = 1
  241. } else {
  242. t := &Task{
  243. Code: code,
  244. Site: site,
  245. Channel: channel,
  246. ErrType: "6",
  247. ErrInfo: map[string]map[string]interface{}{},
  248. Description: "网站监测:404\n" + href + "\n",
  249. State: 1,
  250. }
  251. t.ErrInfo = map[string]map[string]interface{}{
  252. "6": map[string]interface{}{
  253. "num": 404,
  254. "hrefs": []string{href},
  255. },
  256. }
  257. TaskMap[code] = t
  258. }
  259. lock.Unlock()
  260. }(tmp)
  261. if n%100 == 0 {
  262. qu.Debug("current:", n)
  263. }
  264. tmp = map[string]interface{}{}
  265. }
  266. wg.Wait()
  267. logger.Debug("---统计栏目地址404数据完成---")
  268. }
  269. //3、统计三级页下载失败数据
  270. /*
  271. 先统计下载失败信息再更新下载失败信息状态(ResetDataState)使其可重新下载,这样不影响统计
  272. 但是任务已经就绪,若下载失败信息重新下载成功,则使任务不太准备
  273. 若先重置状态再统计,会使任务统计时缺少,无法正常监控
  274. */
  275. func GetDownloadFailedData() {
  276. defer qu.Catch()
  277. logger.Debug("---开始统计下载失败信息---")
  278. sess := MgoS.GetMgoConn()
  279. defer MgoS.DestoryMongoConn(sess)
  280. ch := make(chan bool, 5)
  281. wg := &sync.WaitGroup{}
  282. lock := &sync.Mutex{}
  283. field := map[string]interface{}{
  284. "spidercode": 1,
  285. "href": 1,
  286. "site": 1,
  287. "channel": 1,
  288. }
  289. query := map[string]interface{}{
  290. "comeintime": map[string]interface{}{
  291. "$gte": StartTime,
  292. "$lte": EndTime,
  293. },
  294. "state": -1,
  295. }
  296. it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
  297. count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
  298. logger.Debug("共有下载失败数据", count, "条")
  299. n := 0
  300. //arr := [][]map[string]interface{}{}
  301. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  302. ch <- true
  303. wg.Add(1)
  304. go func(tmp map[string]interface{}) {
  305. defer func() {
  306. <-ch
  307. wg.Done()
  308. }()
  309. code := qu.ObjToString(tmp["spidercode"])
  310. href := qu.ObjToString(tmp["href"])
  311. site := qu.ObjToString(tmp["site"])
  312. channel := qu.ObjToString(tmp["channel"])
  313. lock.Lock()
  314. if t := TaskMap[code]; t != nil {
  315. if info := t.ErrInfo["5"]; info != nil {
  316. num := qu.IntAll(info["num"])
  317. num++
  318. info["num"] = num
  319. hrefs := info["hrefs"].([]string)
  320. if len(hrefs) < 3 {
  321. hrefs = append(hrefs, href)
  322. info["hrefs"] = hrefs
  323. t.Description += href + "\n"
  324. }
  325. if num >= 10 {
  326. t.State = 1
  327. }
  328. } else {
  329. t.ErrInfo["5"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  330. "num": 1,
  331. "hrefs": []string{href},
  332. }
  333. t.Description += "下载异常:\n" + href + "\n"
  334. }
  335. } else {
  336. t := &Task{
  337. Code: code,
  338. Site: site,
  339. Channel: channel,
  340. ErrType: "5",
  341. ErrInfo: map[string]map[string]interface{}{},
  342. Description: "下载异常:\n" + href + "\n",
  343. State: 0,
  344. }
  345. t.ErrInfo = map[string]map[string]interface{}{
  346. "5": map[string]interface{}{
  347. "num": 1,
  348. "hrefs": []string{href},
  349. },
  350. }
  351. TaskMap[code] = t
  352. }
  353. //更新state状态重新下载
  354. // update := []map[string]interface{}{}
  355. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  356. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}})
  357. // arr = append(arr, update)
  358. // if len(arr) > 500 {
  359. // tmps := arr
  360. // MgoS.UpdateBulk("spider_highlistdata", tmps...)
  361. // arr = [][]map[string]interface{}{}
  362. // }
  363. lock.Unlock()
  364. }(tmp)
  365. if n%100 == 0 {
  366. qu.Debug("current:", n)
  367. }
  368. tmp = map[string]interface{}{}
  369. }
  370. wg.Wait()
  371. // lock.Lock()
  372. // if len(arr) > 0 {
  373. // MgoS.UpdateBulk("spider_highlistdata", arr...)
  374. // arr = [][]map[string]interface{}{}
  375. // }
  376. // lock.Unlock()
  377. logger.Debug("---统计下载失败信息完成---")
  378. }
  379. //4、统计重采失败数据
  380. func GetRegatherFailedData() {
  381. defer qu.Catch()
  382. logger.Debug("---开始统计重采失败信息---")
  383. sess := MgoS.GetMgoConn()
  384. defer MgoS.DestoryMongoConn(sess)
  385. ch := make(chan bool, 5)
  386. wg := &sync.WaitGroup{}
  387. lock := &sync.Mutex{}
  388. field := map[string]interface{}{
  389. "spidercode": 1,
  390. "href": 1,
  391. "site": 1,
  392. "channel": 1,
  393. }
  394. query := map[string]interface{}{
  395. "state": map[string]interface{}{
  396. "$lte": 1,
  397. },
  398. "from": "lua",
  399. "comeintime": map[string]interface{}{
  400. "$gte": StartTime,
  401. "$lte": EndTime,
  402. },
  403. }
  404. it := sess.DB("spider").C("regatherdata").Find(&query).Select(&field).Iter()
  405. count, _ := sess.DB("spider").C("regatherdata").Find(&query).Count()
  406. logger.Debug("共有重采失败数据", count, "条")
  407. n := 0
  408. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  409. ch <- true
  410. wg.Add(1)
  411. go func(tmp map[string]interface{}) {
  412. defer func() {
  413. <-ch
  414. wg.Done()
  415. }()
  416. code := qu.ObjToString(tmp["spidercode"])
  417. href := qu.ObjToString(tmp["href"])
  418. site := qu.ObjToString(tmp["site"])
  419. channel := qu.ObjToString(tmp["channel"])
  420. lock.Lock()
  421. if t := TaskMap[code]; t != nil {
  422. if info := t.ErrInfo["4"]; info != nil {
  423. num := qu.IntAll(info["num"])
  424. num++
  425. info["num"] = num
  426. hrefs := info["hrefs"].([]string)
  427. if len(hrefs) < 3 {
  428. hrefs = append(hrefs, href)
  429. info["hrefs"] = hrefs
  430. t.Description += href + "\n"
  431. }
  432. if num >= 10 {
  433. t.State = 1
  434. }
  435. } else {
  436. t.ErrInfo["4"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  437. "num": 1,
  438. "hrefs": []string{href},
  439. }
  440. t.Description += "运行报错:\n" + href + "\n"
  441. }
  442. } else {
  443. t := &Task{
  444. Code: code,
  445. Site: site,
  446. Channel: channel,
  447. ErrType: "4",
  448. ErrInfo: map[string]map[string]interface{}{},
  449. Description: "运行报错:\n" + href + "\n",
  450. State: 0,
  451. }
  452. t.ErrInfo = map[string]map[string]interface{}{
  453. "4": map[string]interface{}{
  454. "num": 1,
  455. "hrefs": []string{href},
  456. },
  457. }
  458. TaskMap[code] = t
  459. }
  460. lock.Unlock()
  461. }(tmp)
  462. if n%100 == 0 {
  463. qu.Debug("current:", n)
  464. }
  465. tmp = map[string]interface{}{}
  466. }
  467. wg.Wait()
  468. // for _, task := range TaskMap {
  469. // qu.Debug("code:", task.Code)
  470. // qu.Debug("site:", task.Site)
  471. // qu.Debug("channel:", task.Channel)
  472. // qu.Debug("errtype:", task.ErrType)
  473. // qu.Debug("description:", task.Description)
  474. // qu.Debug("info:", task.ErrInfo)
  475. // qu.Debug("-------------------------------------------")
  476. // tmap := map[string]interface{}{}
  477. // ab, _ := json.Marshal(&task)
  478. // json.Unmarshal(ab, &tmap)
  479. // MgoE.Save("save_aa", tmap)
  480. // }
  481. logger.Debug("---统计重采失败信息完成---")
  482. }
  483. //5、统计detail、title、publishtime异常数据
  484. func GetDTPErrData() {
  485. defer qu.Catch()
  486. logger.Debug("---开始统计信息异常数据---")
  487. sess := MgoS.GetMgoConn()
  488. defer MgoS.DestoryMongoConn(sess)
  489. ch := make(chan bool, 5)
  490. wg := &sync.WaitGroup{}
  491. lock := &sync.Mutex{}
  492. field := map[string]interface{}{
  493. "code": 1,
  494. "href": 1,
  495. "site": 1,
  496. "channel": 1,
  497. "field": 1,
  498. "info": 1,
  499. }
  500. query := map[string]interface{}{
  501. "comeintime": map[string]interface{}{
  502. "$gte": StartTime,
  503. "$lte": EndTime,
  504. },
  505. "level": 2, //2:error数据 1:warn数据
  506. }
  507. it := sess.DB("spider").C("spider_warn").Find(&query).Select(&field).Iter()
  508. count, _ := sess.DB("spider").C("spider_warn").Find(&query).Count()
  509. logger.Debug("共有信息异常数据", count, "条")
  510. n := 0
  511. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  512. ch <- true
  513. wg.Add(1)
  514. go func(tmp map[string]interface{}) {
  515. defer func() {
  516. <-ch
  517. wg.Done()
  518. }()
  519. errnum := "2" //detail、 title异常
  520. destmp := "正文标题异常:\n"
  521. field := qu.ObjToString(tmp["field"])
  522. info := qu.ObjToString(tmp["info"])
  523. if field == "publishtime" { //发布时间异常
  524. if info == "Publishtime Is Too Late" { //发布时间超前的不建任务
  525. return
  526. }
  527. errnum = "3"
  528. destmp = "发布时间异常:\n"
  529. }
  530. code := qu.ObjToString(tmp["code"])
  531. href := qu.ObjToString(tmp["href"])
  532. site := qu.ObjToString(tmp["site"])
  533. channel := qu.ObjToString(tmp["channel"])
  534. lock.Lock()
  535. if t := TaskMap[code]; t != nil {
  536. if info := t.ErrInfo[errnum]; info != nil {
  537. num := qu.IntAll(info["num"])
  538. num++
  539. info["num"] = num
  540. hrefs := info["hrefs"].([]string)
  541. if len(hrefs) < 3 {
  542. hrefs = append(hrefs, href)
  543. info["hrefs"] = hrefs
  544. t.Description += href + "\n"
  545. }
  546. if num >= 10 {
  547. t.State = 1
  548. }
  549. } else {
  550. t.ErrInfo[errnum] = map[string]interface{}{
  551. "num": 1,
  552. "hrefs": []string{href},
  553. }
  554. t.Description += destmp + href + "\n"
  555. }
  556. } else {
  557. t := &Task{
  558. Code: code,
  559. Site: site,
  560. Channel: channel,
  561. ErrType: errnum,
  562. ErrInfo: map[string]map[string]interface{}{},
  563. Description: destmp + href + "\n",
  564. State: 0,
  565. }
  566. t.ErrInfo = map[string]map[string]interface{}{
  567. errnum: map[string]interface{}{
  568. "num": 1,
  569. "hrefs": []string{href},
  570. },
  571. }
  572. TaskMap[code] = t
  573. }
  574. lock.Unlock()
  575. }(tmp)
  576. if n%100 == 0 {
  577. qu.Debug("current:", n)
  578. }
  579. tmp = map[string]interface{}{}
  580. }
  581. wg.Wait()
  582. logger.Debug("---统计信息异常数据完成---")
  583. }
  584. //6、统计下载量异常数据
  585. func GetDownloadNumErrData() {
  586. defer qu.Catch()
  587. logger.Debug("---开始统计下载量异常数据---")
  588. sess := MgoS.GetMgoConn()
  589. defer MgoS.DestoryMongoConn(sess)
  590. ch := make(chan bool, 5)
  591. wg := &sync.WaitGroup{}
  592. lock := &sync.Mutex{}
  593. field := map[string]interface{}{
  594. "downloadNum": 1,
  595. "code": 1,
  596. "averageDownload": 1,
  597. "site": 1,
  598. "channel": 1,
  599. }
  600. query := map[string]interface{}{
  601. "isok": false,
  602. }
  603. it := sess.DB("spider").C("spider_download").Find(&query).Select(&field).Iter()
  604. count, _ := sess.DB("spider").C("spider_download").Find(&query).Count()
  605. logger.Debug("共有下载量异常数据", count, "条")
  606. n := 0
  607. arr := [][]map[string]interface{}{}
  608. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  609. ch <- true
  610. wg.Add(1)
  611. go func(tmp map[string]interface{}) {
  612. defer func() {
  613. <-ch
  614. wg.Done()
  615. }()
  616. code := qu.ObjToString(tmp["code"])
  617. site := qu.ObjToString(tmp["site"])
  618. channel := qu.ObjToString(tmp["channel"])
  619. average := qu.IntAll(tmp["averageDownload"])
  620. date := "" //日期
  621. dnum := 0 //下载量
  622. for d, n := range tmp["downloadNum"].(map[string]interface{}) {
  623. date = d
  624. dnum = qu.IntAll(n)
  625. }
  626. lock.Lock()
  627. if t := TaskMap[code]; t != nil {
  628. t.ErrInfo["1"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  629. "num": dnum,
  630. "date": date,
  631. "average": average,
  632. }
  633. t.Description += "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n"
  634. } else {
  635. t := &Task{
  636. Code: code,
  637. Site: site,
  638. Channel: channel,
  639. ErrType: "1",
  640. ErrInfo: map[string]map[string]interface{}{},
  641. Description: "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n",
  642. State: 0,
  643. }
  644. t.ErrInfo = map[string]map[string]interface{}{
  645. "1": map[string]interface{}{
  646. "num": dnum,
  647. "date": date,
  648. "average": average,
  649. },
  650. }
  651. TaskMap[code] = t
  652. }
  653. //更新isok
  654. update := []map[string]interface{}{}
  655. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  656. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"isok": true}})
  657. arr = append(arr, update)
  658. if len(arr) > 500 {
  659. tmps := arr
  660. MgoS.UpdateBulk("spider_download", tmps...)
  661. arr = [][]map[string]interface{}{}
  662. }
  663. lock.Unlock()
  664. }(tmp)
  665. if n%100 == 0 {
  666. qu.Debug("current:", n)
  667. }
  668. tmp = map[string]interface{}{}
  669. }
  670. wg.Wait()
  671. lock.Lock()
  672. if len(arr) > 0 {
  673. MgoS.UpdateBulk("spider_download", arr...)
  674. arr = [][]map[string]interface{}{}
  675. }
  676. lock.Unlock()
  677. logger.Debug("---统计下载量异常数据完成---")
  678. }
  679. //保存统计信息
  680. func SaveResult() {
  681. defer qu.Catch()
  682. logger.Debug("---开始保存信息---")
  683. wg := &sync.WaitGroup{}
  684. lock := &sync.Mutex{}
  685. ch := make(chan bool, 10)
  686. savearr := []map[string]interface{}{}
  687. for _, task := range TaskMap {
  688. wg.Add(1)
  689. ch <- true
  690. go func(t *Task) {
  691. defer func() {
  692. <-ch
  693. wg.Done()
  694. }()
  695. delYearMinCode := false
  696. if errInfo := t.ErrInfo; errInfo != nil {
  697. //爬虫任务为下载异常、运行异常、404、时间异常、数据异常任务时,不再建该爬虫的抽查任务
  698. if len(errInfo) >= 2 || (len(errInfo) == 1 && errInfo["1"] == nil) { //不是数量异常任务
  699. delYearMinCode = true
  700. }
  701. }
  702. lock.Lock()
  703. has := YearMinCodeMap[t.Code]
  704. lock.Unlock()
  705. if delYearMinCode {
  706. lock.Lock()
  707. delete(YearMinCodeMap, t.Code)
  708. lock.Unlock()
  709. go MgoE.Update("luayearmincode", map[string]interface{}{"code": t.Code}, map[string]interface{}{"$set": map[string]interface{}{"send": true}}, false, false)
  710. } else if has { //luayearmincode中爬虫任务删除
  711. return
  712. }
  713. result := map[string]interface{}{}
  714. result["code"] = t.Code
  715. result["site"] = t.Site
  716. result["channel"] = t.Channel
  717. result["errtype"] = t.ErrType
  718. result["errinfo"] = t.ErrInfo
  719. result["description"] = t.Description
  720. result["comeintime"] = time.Now().Unix()
  721. result["state"] = t.State
  722. //result["updatetime"] = time.Now().Unix()
  723. lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": t.Code})
  724. if lua != nil && len(*lua) > 0 {
  725. result["modifyid"] = (*lua)["createuserid"]
  726. result["modify"] = (*lua)["createuser"]
  727. result["event"] = (*lua)["event"]
  728. }
  729. lock.Lock()
  730. if len(result) > 0 {
  731. savearr = append(savearr, result)
  732. }
  733. if len(savearr) > 500 {
  734. tmps := savearr
  735. MgoE.SaveBulk("luataskinfo", tmps...)
  736. savearr = []map[string]interface{}{}
  737. }
  738. lock.Unlock()
  739. }(task)
  740. }
  741. wg.Wait()
  742. lock.Lock()
  743. if len(savearr) > 0 {
  744. MgoE.SaveBulk("luataskinfo", savearr...)
  745. savearr = []map[string]interface{}{}
  746. }
  747. lock.Unlock()
  748. TaskMap = map[string]*Task{} //重置
  749. logger.Debug("---保存信息完成---")
  750. }
  751. //创建任务
  752. func CreateLuaTask() {
  753. defer qu.Catch()
  754. logger.Debug("---开始创建任务---")
  755. sess := MgoE.GetMgoConn()
  756. defer MgoE.DestoryMongoConn(sess)
  757. ch := make(chan bool, 1)
  758. wg := &sync.WaitGroup{}
  759. field := map[string]interface{}{
  760. "comeintime": 0,
  761. //"updatetime": 0,
  762. }
  763. query := map[string]interface{}{
  764. "comeintime": map[string]interface{}{
  765. "$gte": GetTime(0),
  766. },
  767. }
  768. it := sess.DB("editor").C("luataskinfo").Find(&query).Select(&field).Iter()
  769. count, _ := sess.DB("editor").C("luataskinfo").Find(&query).Count()
  770. logger.Debug("共有异常爬虫数据量", count, "条")
  771. n := 0
  772. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  773. ch <- true
  774. wg.Add(1)
  775. func(tmp map[string]interface{}) { //目前不用多线程
  776. defer func() {
  777. <-ch
  778. wg.Done()
  779. }()
  780. id := mgo.BsonIdToSId(tmp["_id"])
  781. code := qu.ObjToString(tmp["code"])
  782. site := qu.ObjToString(tmp["site"])
  783. channel := qu.ObjToString(tmp["channel"])
  784. description := qu.ObjToString(tmp["description"])
  785. errtype := qu.ObjToString(tmp["errtype"])
  786. errinfo := tmp["errinfo"].(map[string]interface{})
  787. modifyid := qu.ObjToString(tmp["modifyid"])
  788. modify := qu.ObjToString(tmp["modify"])
  789. event := qu.IntAll(tmp["event"])
  790. state := qu.IntAll(tmp["state"])
  791. //初始化一些任务的变量
  792. n_imin := 0 //最小下载量
  793. n_itimes := 0 //任务出现特别紧急的次数
  794. if state == 1 {
  795. n_itimes = 1
  796. }
  797. n_idn := 0 //下载量
  798. n_sdt := "" //下载量对应的日期
  799. n_surgency := "1" //紧急程度
  800. //
  801. dnerr := errinfo["1"]
  802. if errtype == "1" && dnerr != nil { //只有任务类型是数据量异常时,才记录数据量信息
  803. info := errinfo["1"].(map[string]interface{})
  804. n_imin = qu.IntAll(info["average"])
  805. n_idn = qu.IntAll(info["num"])
  806. n_sdt = qu.ObjToString(info["date"])
  807. }
  808. if errtype == "8" || errtype == "7" || errtype == "6" {
  809. n_surgency = "4"
  810. }
  811. query := map[string]interface{}{
  812. "s_code": code,
  813. "i_state": map[string]interface{}{
  814. "$in": []int{0, 1, 2, 3, 5},
  815. },
  816. }
  817. list, _ := MgoE.Find("task", query, nil, nil, false, -1, -1)
  818. if list != nil && len(*list) > 0 { //已有任务
  819. if len(*list) > 1 {
  820. logger.Error("Code:", code, "任务异常")
  821. MgoE.Save("luacreatetaskerr", map[string]interface{}{
  822. "code": code,
  823. "comeintime": time.Now().Unix(),
  824. "tasknum": len(*list),
  825. })
  826. return
  827. }
  828. task := (*list)[0]
  829. o_istate := qu.IntAll(task["i_state"]) //已有任务的状态
  830. o_stype := qu.ObjToString(task["s_type"]) //已有任务的类型
  831. o_sdescript := qu.ObjToString(task["s_descript"]) //已有任务的描述
  832. o_addinfoid, _ := task["addinfoid"].([]interface{}) //luataskinfo信息
  833. o_lcomplete := qu.Int64All(task["l_complete"]) //已有任务的最迟完成时间
  834. o_surgency := qu.ObjToString(task["s_urgency"]) //已有任务的紧急度
  835. o_iurgency, _ := strconv.Atoi(o_surgency) //已有任务的紧急度int类型
  836. o_itimes := qu.IntAll(task["i_times"]) //已有任务出现的次数
  837. //
  838. o_addinfoid = append(o_addinfoid, id) //追加addinfoid信息
  839. o_sdescript += time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + description //追加描述
  840. set := map[string]interface{}{}
  841. //MgoE.Update("task", q, s, false, false)
  842. if state == 1 { //新任务为待处理
  843. if o_istate < 2 {
  844. if errtype > o_stype { //历史任务是待确认、待处理状态且任务类型等级低于新建任务,任务类型替换为新任务类型
  845. o_stype = errtype
  846. }
  847. o_surgency = n_surgency //更新紧急度
  848. o_itimes++
  849. set = map[string]interface{}{
  850. "addinfoid": o_addinfoid,
  851. "s_descript": o_sdescript,
  852. /// "i_min": n_imin,
  853. // "i_num": n_idn,
  854. // "s_downloadtime": n_sdt,
  855. "i_state": state,
  856. "l_complete": CompleteTime(o_surgency),
  857. "s_urgency": o_surgency,
  858. "s_type": o_stype,
  859. "i_times": o_itimes,
  860. "l_updatetime": time.Now().Unix(),
  861. }
  862. } else { //历史任务类型为未通过或待审核,更新信息
  863. set = map[string]interface{}{
  864. "addinfoid": o_addinfoid,
  865. "s_descript": o_sdescript,
  866. "l_updatetime": time.Now().Unix(),
  867. }
  868. }
  869. } else { //新任务为待确认
  870. if o_istate == 0 { //历史任务为待确认
  871. if o_stype == "1" { //历史任务为数量异常待确认
  872. if errtype == "1" { //新任务为数量异常待确认,按紧急程度递增,次数递增
  873. o_iurgency++ //紧急度加一级
  874. if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
  875. o_itimes++
  876. o_iurgency = 4
  877. }
  878. o_surgency = fmt.Sprint(o_iurgency)
  879. o_lcomplete = CompleteTime(o_surgency)
  880. if o_itimes >= 5 { //特别紧急的次数出现5次,自动创建待处理的任务(排除有待审核任务的可能)
  881. state = 1
  882. }
  883. set = map[string]interface{}{
  884. "addinfoid": o_addinfoid,
  885. "s_descript": o_sdescript,
  886. "i_min": n_imin,
  887. "i_num": n_idn,
  888. "s_downloadtime": n_sdt,
  889. "i_state": state,
  890. "l_complete": o_lcomplete,
  891. "s_urgency": o_surgency,
  892. "s_type": errtype,
  893. "i_times": o_itimes,
  894. "l_updatetime": time.Now().Unix(),
  895. }
  896. } else { //新任务为其他异常类型待确认,紧急程度:紧急;
  897. if o_iurgency < 4 { //数量异常,特别紧急以下
  898. o_surgency = "1"
  899. } else {
  900. o_surgency = "2"
  901. }
  902. set = map[string]interface{}{
  903. "addinfoid": o_addinfoid,
  904. "s_descript": o_sdescript,
  905. "i_min": n_imin,
  906. "i_num": n_idn,
  907. "s_downloadtime": n_sdt,
  908. "i_state": state,
  909. "l_complete": CompleteTime(o_surgency),
  910. "s_urgency": o_surgency,
  911. "s_type": errtype,
  912. "l_updatetime": time.Now().Unix(),
  913. }
  914. }
  915. } else { //其他任务类型待确认,历史任务紧急程度+1,次数+1,任务类型更新为异常等级高者且连续4天变为待处理
  916. if errtype > o_stype {
  917. o_stype = errtype
  918. }
  919. o_iurgency++ //紧急度加一级
  920. if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
  921. o_itimes++
  922. o_iurgency = 4
  923. state = 1 //特别紧急,任务变为待处理
  924. }
  925. o_surgency = fmt.Sprint(o_iurgency)
  926. set = map[string]interface{}{
  927. "addinfoid": o_addinfoid,
  928. "s_descript": o_sdescript,
  929. "i_min": n_imin,
  930. "i_num": n_idn,
  931. "s_downloadtime": n_sdt,
  932. "i_state": state,
  933. "l_complete": CompleteTime(o_surgency),
  934. "s_urgency": o_surgency,
  935. "s_type": o_stype,
  936. "i_times": o_itimes,
  937. "l_updatetime": time.Now().Unix(),
  938. }
  939. }
  940. } else { //历史任务为待处理以上,只追加描述
  941. set = map[string]interface{}{
  942. "addinfoid": o_addinfoid,
  943. "s_descript": o_sdescript,
  944. "i_min": n_imin,
  945. "i_num": n_idn,
  946. "s_downloadtime": n_sdt,
  947. "l_updatetime": time.Now().Unix(),
  948. }
  949. }
  950. }
  951. MgoE.Update("task", map[string]interface{}{"_id": task["_id"]}, map[string]interface{}{"$set": set}, false, false)
  952. } else {
  953. SaveTask(code, site, channel, modifyid, modify, description, n_surgency, n_sdt, errtype, state, n_imin, n_idn, event, n_itimes, []string{id})
  954. }
  955. }(tmp)
  956. if n%100 == 0 {
  957. qu.Debug("current:", n)
  958. }
  959. tmp = map[string]interface{}{}
  960. }
  961. wg.Wait()
  962. logger.Debug("---任务创建完成---")
  963. }
  964. func SaveTask(code, site, channel, modifyid, modify, description, urgency, downloadtime, errtype string, state, min, downloadnum, event, times int, addinfoid []string) {
  965. defer qu.Catch()
  966. result := map[string]interface{}{}
  967. // if stateNum := UserTaskNum[modify]; stateNum == nil {
  968. // tmp := map[string]int{fmt.Sprint(state): 1}
  969. // UserTaskNum[modify] = tmp
  970. // } else {
  971. // stateNum[fmt.Sprint(state)]++
  972. // }
  973. // if state == 1 { //待处理任务,紧急程度定为特别紧急
  974. // urgency = "4"
  975. // }
  976. result["s_code"] = code
  977. result["s_site"] = site
  978. result["s_channel"] = channel
  979. result["s_modifyid"] = modifyid
  980. result["s_modify"] = modify
  981. result["s_descript"] = description
  982. result["i_min"] = min
  983. result["i_num"] = downloadnum //下载量
  984. result["s_urgency"] = urgency
  985. result["i_state"] = state
  986. result["i_event"] = event
  987. result["s_downloadtime"] = downloadtime //下载量对应的日期
  988. result["l_comeintime"] = time.Now().Unix()
  989. result["l_updatetime"] = time.Now().Unix()
  990. result["l_complete"] = CompleteTime(urgency)
  991. //result["s_date"] = time.Now().Format(qu.Date_Short_Layout) //任务创建字符串日期
  992. result["i_times"] = times //为了方便编辑器对次数的排序,记录当前的次数
  993. result["s_type"] = errtype //任务类型
  994. result["addinfoid"] = addinfoid //信息id
  995. result["s_source"] = "程序"
  996. MgoE.Save("task", result)
  997. }
  998. func SaveUserCreateTaskNum() {
  999. defer qu.Catch()
  1000. for user, sn := range UserTaskNum {
  1001. save := map[string]interface{}{}
  1002. save["user"] = user
  1003. save["comeintime"] = time.Now().Unix()
  1004. for s, n := range sn {
  1005. save[s] = n
  1006. }
  1007. MgoE.Save("luausertask", save)
  1008. }
  1009. UserTaskNum = map[string]map[string]int{}
  1010. }
  1011. //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
  1012. func ResetDataState() {
  1013. defer qu.Catch()
  1014. logger.Info("-----更新数据状态-----")
  1015. sess := MgoS.GetMgoConn()
  1016. defer MgoS.DestoryMongoConn(sess)
  1017. ch := make(chan bool, 3)
  1018. wg := &sync.WaitGroup{}
  1019. lock := &sync.Mutex{}
  1020. query := map[string]interface{}{
  1021. "comeintime": map[string]interface{}{
  1022. "$gte": GetTime(-DayNum),
  1023. "$lt": GetTime(0),
  1024. },
  1025. "state": -1,
  1026. }
  1027. field := map[string]interface{}{
  1028. "_id": 1,
  1029. }
  1030. it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
  1031. count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
  1032. logger.Info("更新数据状态数量:", count)
  1033. n := 0
  1034. arr := [][]map[string]interface{}{}
  1035. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  1036. ch <- true
  1037. wg.Add(1)
  1038. go func(tmp map[string]interface{}) {
  1039. defer func() {
  1040. <-ch
  1041. wg.Done()
  1042. }()
  1043. update := []map[string]interface{}{}
  1044. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  1045. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
  1046. lock.Lock()
  1047. arr = append(arr, update)
  1048. if len(arr) > 500 {
  1049. tmps := arr
  1050. MgoS.UpdateBulk("spider_highlistdata", tmps...)
  1051. arr = [][]map[string]interface{}{}
  1052. }
  1053. lock.Unlock()
  1054. }(tmp)
  1055. tmp = map[string]interface{}{}
  1056. }
  1057. wg.Wait()
  1058. lock.Lock()
  1059. if len(arr) > 0 {
  1060. MgoS.UpdateBulk("spider_highlistdata", arr...)
  1061. arr = [][]map[string]interface{}{}
  1062. }
  1063. lock.Unlock()
  1064. logger.Info("-----更新数据状态完毕-----")
  1065. }