task.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. package main
  2. import (
  3. "fmt"
  4. mgo "mongodb"
  5. qu "qfw/util"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/donnie4w/go-logger/logger"
  10. )
  11. type Task struct {
  12. Code string //爬虫代码
  13. Site string //站点
  14. Channel string //栏目
  15. ErrType string //异常类型:6:运行异常;5:下载异常;4:发布时间异常;3:乱码;2:状态码异常;1:数据量异常
  16. ErrInfo map[string]map[string]interface{} //异常集合
  17. Description string //描述
  18. State int //状态
  19. }
  20. var (
  21. StartTime int64 //上一个工作日的起始时间
  22. EndTime int64 //上一个工作日的结束时间
  23. TaskMap map[string]*Task //任务集合
  24. UpdateStateCron string //每天关闭任务的时间
  25. CreateTaskCron string //每天创建任务的时间
  26. CloseTaskCron string //每天关闭任务的时间
  27. CodeSummaryCron string //每天统计爬虫信息
  28. CloseNum int //关闭几天的任务
  29. DayNum int //更新数据天数
  30. UserTaskNum map[string]map[string]int //记录每人每天新建任务量
  31. )
  32. //创建任务
  33. func CreateTaskProcess() {
  34. InitInfo() //初始化
  35. GetSpiderDownloadRateData() //1、统计spider_downloadrate前一天列表页采集异常爬虫
  36. GetStatusCodeErrorData() //2、统计spider_sitecheck 站点异常爬虫(404)
  37. GetDownloadFailedData() //3、统计spider_highlistdata前一天下载失败的爬虫数据(统计完成后修改状态state:0)
  38. GetRegatherFailedData() //4、统计regatherdata前一天重采失败的爬虫数据
  39. GetDTPErrData() //5、统计spider_warn异常数据(发布时间异常、乱码)
  40. GetDownloadNumErrData() //6、统计download前一天下载量异常的爬虫数据(每天1点统计下载量,目前统计完成需要1个小时)
  41. SaveResult() //保存统计信息
  42. CreateLuaTask() //创建任务
  43. SaveUserCreateTaskNum() //保存每人创建的任务量
  44. }
  45. //初始化
  46. func InitInfo() {
  47. defer qu.Catch()
  48. TaskMap = map[string]*Task{}
  49. UserTaskNum = map[string]map[string]int{}
  50. InitTime() //初始化时间
  51. }
  52. //关闭任务
  53. func CloseTask() {
  54. qu.Catch()
  55. logger.Debug("---清理未更新任务---")
  56. decreaseDay, day := 0, 0
  57. var cleanDay string
  58. for {
  59. decreaseDay--
  60. weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
  61. if weekDay != "Saturday" && weekDay != "Sunday" {
  62. day++
  63. }
  64. if day == CloseNum {
  65. cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
  66. break
  67. }
  68. }
  69. the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
  70. unix_time := the_time.Unix() //凌晨时间戳
  71. query := map[string]interface{}{
  72. "i_state": 0,
  73. "l_complete": map[string]interface{}{
  74. "$lt": unix_time + 86400,
  75. },
  76. "s_type": "1",
  77. // "s_type": map[string]interface{}{
  78. // "$ne": "7",
  79. // },
  80. }
  81. logger.Debug("query:", query)
  82. set := map[string]interface{}{
  83. "$set": map[string]interface{}{
  84. "i_state": 6,
  85. },
  86. }
  87. MgoE.Update("task", query, set, false, true)
  88. logger.Debug("---清理未更新任务完毕---")
  89. }
  90. //1、统计spider_downloadrate前一天列表页采集异常爬虫
  91. func GetSpiderDownloadRateData() {
  92. defer qu.Catch()
  93. logger.Debug("---开始统计spider_downloadrate异常信息---")
  94. sess := MgoS.GetMgoConn()
  95. defer MgoS.DestoryMongoConn(sess)
  96. ch := make(chan bool, 5)
  97. wg := &sync.WaitGroup{}
  98. lock := &sync.Mutex{}
  99. date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
  100. query := map[string]interface{}{
  101. "date": date,
  102. }
  103. it := sess.DB("spider").C("spider_downloadrate").Find(&query).Iter()
  104. n := 0
  105. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  106. ch <- true
  107. wg.Add(1)
  108. go func(tmp map[string]interface{}) {
  109. defer func() {
  110. <-ch
  111. wg.Done()
  112. }()
  113. stype := -1
  114. //1、统计采集频率异常信息
  115. oh_percent := qu.IntAll(tmp["oh_percent"])
  116. event := qu.IntAll(tmp["event"])
  117. if oh_percent > 0 && event != 7410 {
  118. stype = 8
  119. }
  120. //2、统计列表页异常(统计zero占总下载次数的百分比超过80%的)
  121. alltimes := qu.IntAll(tmp["alltimes"])
  122. zero := qu.IntAll(tmp["zero"])
  123. percent := 0 //记录百分比
  124. if zero > 0 {
  125. tmpPercent := float64(zero) / float64(alltimes)
  126. tmpPercent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", tmpPercent), 64)
  127. percent = int(tmpPercent * float64(100))
  128. if percent >= 80 { //占比超过80%
  129. stype = 7
  130. }
  131. }
  132. if stype != -1 { //出现异常
  133. code := qu.ObjToString(tmp["spidercode"])
  134. site := qu.ObjToString(tmp["site"])
  135. channel := qu.ObjToString(tmp["channel"])
  136. t := &Task{
  137. Code: code,
  138. Site: site,
  139. Channel: channel,
  140. ErrInfo: map[string]map[string]interface{}{},
  141. State: 1,
  142. }
  143. if stype == 8 {
  144. t.ErrType = "8"
  145. t.ErrInfo = map[string]map[string]interface{}{
  146. "8": map[string]interface{}{
  147. "num": oh_percent,
  148. },
  149. }
  150. t.Description = "采集频率异常:\n 列表页共采集" + fmt.Sprint(alltimes) + "轮,其中有" + fmt.Sprint(oh_percent) + "轮数据全采\n"
  151. } else if stype == 7 {
  152. t.ErrType = "7"
  153. t.ErrInfo = map[string]map[string]interface{}{
  154. "7": map[string]interface{}{
  155. "num": percent,
  156. },
  157. }
  158. t.Description = "列表页异常:\n 列表页采集无信息次数占比" + fmt.Sprint(percent) + "%\n"
  159. }
  160. lock.Lock()
  161. TaskMap[code] = t
  162. lock.Unlock()
  163. }
  164. }(tmp)
  165. if n%100 == 0 {
  166. qu.Debug("current:", n)
  167. }
  168. tmp = map[string]interface{}{}
  169. }
  170. wg.Wait()
  171. logger.Debug("---统计spider_downloadrate异常信息完成---")
  172. }
  173. //2、状态码404
  174. func GetStatusCodeErrorData() {
  175. defer qu.Catch()
  176. logger.Debug("---开始统计栏目地址404数据---")
  177. sess := MgoS.GetMgoConn()
  178. defer MgoS.DestoryMongoConn(sess)
  179. ch := make(chan bool, 5)
  180. wg := &sync.WaitGroup{}
  181. lock := &sync.Mutex{}
  182. field := map[string]interface{}{
  183. "url": 1,
  184. "code": 1,
  185. "site": 1,
  186. "channel": 1,
  187. }
  188. query := map[string]interface{}{
  189. "comeintime": map[string]interface{}{
  190. "$gte": StartTime,
  191. "$lte": EndTime,
  192. },
  193. "statuscode": 404,
  194. }
  195. it := sess.DB("spider").C("spider_sitecheck").Find(&query).Select(&field).Iter()
  196. count, _ := sess.DB("spider").C("spider_sitecheck").Find(&query).Count()
  197. logger.Debug("共有404地址", count, "条")
  198. n := 0
  199. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  200. ch <- true
  201. wg.Add(1)
  202. go func(tmp map[string]interface{}) {
  203. defer func() {
  204. <-ch
  205. wg.Done()
  206. }()
  207. code := qu.ObjToString(tmp["code"])
  208. one, _ := MgoE.FindOneByField("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"state": 1})
  209. state := qu.IntAll((*one)["state"])
  210. if state == 4 || state > 6 {
  211. return
  212. }
  213. //判断3天内是否有采集数据,有则不建404任务
  214. stime, etime := GetTime(-3), GetTime(0)
  215. q := map[string]interface{}{
  216. "spidercode": code,
  217. "l_np_publishtime": map[string]interface{}{
  218. "$gte": stime,
  219. "$lte": etime,
  220. },
  221. }
  222. if MgoS.Count("data_bak", q) > 0 { //有采集数据,不认为是404
  223. return
  224. }
  225. href := qu.ObjToString(tmp["url"])
  226. site := qu.ObjToString(tmp["site"])
  227. channel := qu.ObjToString(tmp["channel"])
  228. lock.Lock()
  229. if t := TaskMap[code]; t != nil {
  230. t.ErrInfo["6"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  231. "num": 404,
  232. "hrefs": []string{href},
  233. }
  234. t.Description += "网站监测:404\n" + href + "\n"
  235. t.State = 1
  236. } else {
  237. t := &Task{
  238. Code: code,
  239. Site: site,
  240. Channel: channel,
  241. ErrType: "6",
  242. ErrInfo: map[string]map[string]interface{}{},
  243. Description: "网站监测:404\n" + href + "\n",
  244. State: 1,
  245. }
  246. t.ErrInfo = map[string]map[string]interface{}{
  247. "6": map[string]interface{}{
  248. "num": 404,
  249. "hrefs": []string{href},
  250. },
  251. }
  252. TaskMap[code] = t
  253. }
  254. lock.Unlock()
  255. }(tmp)
  256. if n%100 == 0 {
  257. qu.Debug("current:", n)
  258. }
  259. tmp = map[string]interface{}{}
  260. }
  261. wg.Wait()
  262. logger.Debug("---统计栏目地址404数据完成---")
  263. }
  264. //3、统计三级页下载失败数据
  265. /*
  266. 先统计下载失败信息再更新下载失败信息状态(ResetDataState)使其可重新下载,这样不影响统计
  267. 但是任务已经就绪,若下载失败信息重新下载成功,则使任务不太准备
  268. 若先重置状态再统计,会使任务统计时缺少,无法正常监控
  269. */
  270. func GetDownloadFailedData() {
  271. defer qu.Catch()
  272. logger.Debug("---开始统计下载失败信息---")
  273. sess := MgoS.GetMgoConn()
  274. defer MgoS.DestoryMongoConn(sess)
  275. ch := make(chan bool, 5)
  276. wg := &sync.WaitGroup{}
  277. lock := &sync.Mutex{}
  278. field := map[string]interface{}{
  279. "spidercode": 1,
  280. "href": 1,
  281. "site": 1,
  282. "channel": 1,
  283. }
  284. query := map[string]interface{}{
  285. "comeintime": map[string]interface{}{
  286. "$gte": StartTime,
  287. "$lte": EndTime,
  288. },
  289. "state": -1,
  290. }
  291. it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
  292. count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
  293. logger.Debug("共有下载失败数据", count, "条")
  294. n := 0
  295. //arr := [][]map[string]interface{}{}
  296. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  297. ch <- true
  298. wg.Add(1)
  299. go func(tmp map[string]interface{}) {
  300. defer func() {
  301. <-ch
  302. wg.Done()
  303. }()
  304. code := qu.ObjToString(tmp["spidercode"])
  305. href := qu.ObjToString(tmp["href"])
  306. site := qu.ObjToString(tmp["site"])
  307. channel := qu.ObjToString(tmp["channel"])
  308. lock.Lock()
  309. if t := TaskMap[code]; t != nil {
  310. if info := t.ErrInfo["5"]; info != nil {
  311. num := qu.IntAll(info["num"])
  312. num++
  313. info["num"] = num
  314. hrefs := info["hrefs"].([]string)
  315. if len(hrefs) < 3 {
  316. hrefs = append(hrefs, href)
  317. info["hrefs"] = hrefs
  318. t.Description += href + "\n"
  319. }
  320. if num >= 10 {
  321. t.State = 1
  322. }
  323. } else {
  324. t.ErrInfo["5"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  325. "num": 1,
  326. "hrefs": []string{href},
  327. }
  328. t.Description += "下载异常:\n" + href + "\n"
  329. }
  330. } else {
  331. t := &Task{
  332. Code: code,
  333. Site: site,
  334. Channel: channel,
  335. ErrType: "5",
  336. ErrInfo: map[string]map[string]interface{}{},
  337. Description: "下载异常:\n" + href + "\n",
  338. State: 0,
  339. }
  340. t.ErrInfo = map[string]map[string]interface{}{
  341. "5": map[string]interface{}{
  342. "num": 1,
  343. "hrefs": []string{href},
  344. },
  345. }
  346. TaskMap[code] = t
  347. }
  348. //更新state状态重新下载
  349. // update := []map[string]interface{}{}
  350. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  351. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}})
  352. // arr = append(arr, update)
  353. // if len(arr) > 500 {
  354. // tmps := arr
  355. // MgoS.UpdateBulk("spider_highlistdata", tmps...)
  356. // arr = [][]map[string]interface{}{}
  357. // }
  358. lock.Unlock()
  359. }(tmp)
  360. if n%100 == 0 {
  361. qu.Debug("current:", n)
  362. }
  363. tmp = map[string]interface{}{}
  364. }
  365. wg.Wait()
  366. // lock.Lock()
  367. // if len(arr) > 0 {
  368. // MgoS.UpdateBulk("spider_highlistdata", arr...)
  369. // arr = [][]map[string]interface{}{}
  370. // }
  371. // lock.Unlock()
  372. logger.Debug("---统计下载失败信息完成---")
  373. }
  374. //4、统计重采失败数据
  375. func GetRegatherFailedData() {
  376. defer qu.Catch()
  377. logger.Debug("---开始统计重采失败信息---")
  378. sess := MgoS.GetMgoConn()
  379. defer MgoS.DestoryMongoConn(sess)
  380. ch := make(chan bool, 5)
  381. wg := &sync.WaitGroup{}
  382. lock := &sync.Mutex{}
  383. field := map[string]interface{}{
  384. "spidercode": 1,
  385. "href": 1,
  386. "site": 1,
  387. "channel": 1,
  388. }
  389. query := map[string]interface{}{
  390. "state": map[string]interface{}{
  391. "$lte": 1,
  392. },
  393. "from": "lua",
  394. "comeintime": map[string]interface{}{
  395. "$gte": StartTime,
  396. "$lte": EndTime,
  397. },
  398. }
  399. it := sess.DB("spider").C("regatherdata").Find(&query).Select(&field).Iter()
  400. count, _ := sess.DB("spider").C("regatherdata").Find(&query).Count()
  401. logger.Debug("共有重采失败数据", count, "条")
  402. n := 0
  403. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  404. ch <- true
  405. wg.Add(1)
  406. go func(tmp map[string]interface{}) {
  407. defer func() {
  408. <-ch
  409. wg.Done()
  410. }()
  411. code := qu.ObjToString(tmp["spidercode"])
  412. href := qu.ObjToString(tmp["href"])
  413. site := qu.ObjToString(tmp["site"])
  414. channel := qu.ObjToString(tmp["channel"])
  415. lock.Lock()
  416. if t := TaskMap[code]; t != nil {
  417. if info := t.ErrInfo["4"]; info != nil {
  418. num := qu.IntAll(info["num"])
  419. num++
  420. info["num"] = num
  421. hrefs := info["hrefs"].([]string)
  422. if len(hrefs) < 3 {
  423. hrefs = append(hrefs, href)
  424. info["hrefs"] = hrefs
  425. t.Description += href + "\n"
  426. }
  427. if num >= 10 {
  428. t.State = 1
  429. }
  430. } else {
  431. t.ErrInfo["4"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  432. "num": 1,
  433. "hrefs": []string{href},
  434. }
  435. t.Description += "运行报错:\n" + href + "\n"
  436. }
  437. } else {
  438. t := &Task{
  439. Code: code,
  440. Site: site,
  441. Channel: channel,
  442. ErrType: "4",
  443. ErrInfo: map[string]map[string]interface{}{},
  444. Description: "运行报错:\n" + href + "\n",
  445. State: 0,
  446. }
  447. t.ErrInfo = map[string]map[string]interface{}{
  448. "4": map[string]interface{}{
  449. "num": 1,
  450. "hrefs": []string{href},
  451. },
  452. }
  453. TaskMap[code] = t
  454. }
  455. lock.Unlock()
  456. }(tmp)
  457. if n%100 == 0 {
  458. qu.Debug("current:", n)
  459. }
  460. tmp = map[string]interface{}{}
  461. }
  462. wg.Wait()
  463. // for _, task := range TaskMap {
  464. // qu.Debug("code:", task.Code)
  465. // qu.Debug("site:", task.Site)
  466. // qu.Debug("channel:", task.Channel)
  467. // qu.Debug("errtype:", task.ErrType)
  468. // qu.Debug("description:", task.Description)
  469. // qu.Debug("info:", task.ErrInfo)
  470. // qu.Debug("-------------------------------------------")
  471. // tmap := map[string]interface{}{}
  472. // ab, _ := json.Marshal(&task)
  473. // json.Unmarshal(ab, &tmap)
  474. // MgoE.Save("save_aa", tmap)
  475. // }
  476. logger.Debug("---统计重采失败信息完成---")
  477. }
  478. //5、统计detail、title、publishtime异常数据
  479. func GetDTPErrData() {
  480. defer qu.Catch()
  481. logger.Debug("---开始统计信息异常数据---")
  482. sess := MgoS.GetMgoConn()
  483. defer MgoS.DestoryMongoConn(sess)
  484. ch := make(chan bool, 5)
  485. wg := &sync.WaitGroup{}
  486. lock := &sync.Mutex{}
  487. field := map[string]interface{}{
  488. "code": 1,
  489. "href": 1,
  490. "site": 1,
  491. "channel": 1,
  492. "field": 1,
  493. "info": 1,
  494. }
  495. query := map[string]interface{}{
  496. "comeintime": map[string]interface{}{
  497. "$gte": StartTime,
  498. "$lte": EndTime,
  499. },
  500. "level": 2, //2:error数据 1:warn数据
  501. }
  502. it := sess.DB("spider").C("spider_warn").Find(&query).Select(&field).Iter()
  503. count, _ := sess.DB("spider").C("spider_warn").Find(&query).Count()
  504. logger.Debug("共有信息异常数据", count, "条")
  505. n := 0
  506. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  507. ch <- true
  508. wg.Add(1)
  509. go func(tmp map[string]interface{}) {
  510. defer func() {
  511. <-ch
  512. wg.Done()
  513. }()
  514. errnum := "2" //detail、 title异常
  515. destmp := "正文标题异常:\n"
  516. field := qu.ObjToString(tmp["field"])
  517. info := qu.ObjToString(tmp["info"])
  518. if field == "publishtime" { //发布时间异常
  519. if info == "Publishtime Is Too Late" { //发布时间超前的不建任务
  520. return
  521. }
  522. errnum = "3"
  523. destmp = "发布时间异常:\n"
  524. }
  525. code := qu.ObjToString(tmp["code"])
  526. href := qu.ObjToString(tmp["href"])
  527. site := qu.ObjToString(tmp["site"])
  528. channel := qu.ObjToString(tmp["channel"])
  529. lock.Lock()
  530. if t := TaskMap[code]; t != nil {
  531. if info := t.ErrInfo[errnum]; info != nil {
  532. num := qu.IntAll(info["num"])
  533. num++
  534. info["num"] = num
  535. hrefs := info["hrefs"].([]string)
  536. if len(hrefs) < 3 {
  537. hrefs = append(hrefs, href)
  538. info["hrefs"] = hrefs
  539. t.Description += href + "\n"
  540. }
  541. if num >= 10 {
  542. t.State = 1
  543. }
  544. } else {
  545. t.ErrInfo[errnum] = map[string]interface{}{
  546. "num": 1,
  547. "hrefs": []string{href},
  548. }
  549. t.Description += destmp + href + "\n"
  550. }
  551. } else {
  552. t := &Task{
  553. Code: code,
  554. Site: site,
  555. Channel: channel,
  556. ErrType: errnum,
  557. ErrInfo: map[string]map[string]interface{}{},
  558. Description: destmp + href + "\n",
  559. State: 0,
  560. }
  561. t.ErrInfo = map[string]map[string]interface{}{
  562. errnum: map[string]interface{}{
  563. "num": 1,
  564. "hrefs": []string{href},
  565. },
  566. }
  567. TaskMap[code] = t
  568. }
  569. lock.Unlock()
  570. }(tmp)
  571. if n%100 == 0 {
  572. qu.Debug("current:", n)
  573. }
  574. tmp = map[string]interface{}{}
  575. }
  576. wg.Wait()
  577. logger.Debug("---统计信息异常数据完成---")
  578. }
  579. //6、统计下载量异常数据
  580. func GetDownloadNumErrData() {
  581. defer qu.Catch()
  582. logger.Debug("---开始统计下载量异常数据---")
  583. sess := MgoS.GetMgoConn()
  584. defer MgoS.DestoryMongoConn(sess)
  585. ch := make(chan bool, 5)
  586. wg := &sync.WaitGroup{}
  587. lock := &sync.Mutex{}
  588. field := map[string]interface{}{
  589. "downloadNum": 1,
  590. "code": 1,
  591. "averageDownload": 1,
  592. "site": 1,
  593. "channel": 1,
  594. }
  595. query := map[string]interface{}{
  596. "isok": false,
  597. }
  598. it := sess.DB("spider").C("spider_download").Find(&query).Select(&field).Iter()
  599. count, _ := sess.DB("spider").C("spider_download").Find(&query).Count()
  600. logger.Debug("共有下载量异常数据", count, "条")
  601. n := 0
  602. arr := [][]map[string]interface{}{}
  603. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  604. ch <- true
  605. wg.Add(1)
  606. go func(tmp map[string]interface{}) {
  607. defer func() {
  608. <-ch
  609. wg.Done()
  610. }()
  611. code := qu.ObjToString(tmp["code"])
  612. site := qu.ObjToString(tmp["site"])
  613. channel := qu.ObjToString(tmp["channel"])
  614. average := qu.IntAll(tmp["averageDownload"])
  615. date := "" //日期
  616. dnum := 0 //下载量
  617. for d, n := range tmp["downloadNum"].(map[string]interface{}) {
  618. date = d
  619. dnum = qu.IntAll(n)
  620. }
  621. lock.Lock()
  622. if t := TaskMap[code]; t != nil {
  623. t.ErrInfo["1"] = map[string]interface{}{ //ErrInfo新增下载异常信息
  624. "num": dnum,
  625. "date": date,
  626. "average": average,
  627. }
  628. t.Description += "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n"
  629. } else {
  630. t := &Task{
  631. Code: code,
  632. Site: site,
  633. Channel: channel,
  634. ErrType: "1",
  635. ErrInfo: map[string]map[string]interface{}{},
  636. Description: "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n",
  637. State: 0,
  638. }
  639. t.ErrInfo = map[string]map[string]interface{}{
  640. "1": map[string]interface{}{
  641. "num": dnum,
  642. "date": date,
  643. "average": average,
  644. },
  645. }
  646. TaskMap[code] = t
  647. }
  648. //更新isok
  649. update := []map[string]interface{}{}
  650. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  651. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"isok": true}})
  652. arr = append(arr, update)
  653. if len(arr) > 500 {
  654. tmps := arr
  655. MgoS.UpdateBulk("spider_download", tmps...)
  656. arr = [][]map[string]interface{}{}
  657. }
  658. lock.Unlock()
  659. }(tmp)
  660. if n%100 == 0 {
  661. qu.Debug("current:", n)
  662. }
  663. tmp = map[string]interface{}{}
  664. }
  665. wg.Wait()
  666. lock.Lock()
  667. if len(arr) > 0 {
  668. MgoS.UpdateBulk("spider_download", arr...)
  669. arr = [][]map[string]interface{}{}
  670. }
  671. lock.Unlock()
  672. logger.Debug("---统计下载量异常数据完成---")
  673. }
  674. //保存统计信息
  675. func SaveResult() {
  676. defer qu.Catch()
  677. logger.Debug("---开始保存信息---")
  678. wg := &sync.WaitGroup{}
  679. lock := &sync.Mutex{}
  680. ch := make(chan bool, 10)
  681. savearr := []map[string]interface{}{}
  682. for _, task := range TaskMap {
  683. wg.Add(1)
  684. ch <- true
  685. go func(t *Task) {
  686. defer func() {
  687. <-ch
  688. wg.Done()
  689. }()
  690. delYearMinCode := false
  691. if errInfo := t.ErrInfo; errInfo != nil {
  692. //爬虫任务为下载异常、运行异常、404、时间异常、数据异常任务时,不再建该爬虫的抽查任务
  693. if len(errInfo) >= 2 || (len(errInfo) == 1 && errInfo["1"] == nil) { //不是数量异常任务
  694. delYearMinCode = true
  695. }
  696. }
  697. lock.Lock()
  698. has := YearMinCodeMap[t.Code]
  699. lock.Unlock()
  700. if delYearMinCode {
  701. lock.Lock()
  702. delete(YearMinCodeMap, t.Code)
  703. lock.Unlock()
  704. go MgoE.Update("luayearmincode", map[string]interface{}{"code": t.Code}, map[string]interface{}{"$set": map[string]interface{}{"send": true}}, false, false)
  705. } else if has { //luayearmincode中爬虫任务删除
  706. return
  707. }
  708. result := map[string]interface{}{}
  709. result["code"] = t.Code
  710. result["site"] = t.Site
  711. result["channel"] = t.Channel
  712. result["errtype"] = t.ErrType
  713. result["errinfo"] = t.ErrInfo
  714. result["description"] = t.Description
  715. result["comeintime"] = time.Now().Unix()
  716. result["state"] = t.State
  717. //result["updatetime"] = time.Now().Unix()
  718. lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": t.Code})
  719. if lua != nil && len(*lua) > 0 {
  720. result["modifyid"] = (*lua)["createuserid"]
  721. result["modify"] = (*lua)["createuser"]
  722. result["event"] = (*lua)["event"]
  723. }
  724. lock.Lock()
  725. if len(result) > 0 {
  726. savearr = append(savearr, result)
  727. }
  728. if len(savearr) > 500 {
  729. tmps := savearr
  730. MgoE.SaveBulk("luataskinfo_test", tmps...)
  731. savearr = []map[string]interface{}{}
  732. }
  733. lock.Unlock()
  734. }(task)
  735. }
  736. wg.Wait()
  737. lock.Lock()
  738. if len(savearr) > 0 {
  739. MgoE.SaveBulk("luataskinfo", savearr...)
  740. savearr = []map[string]interface{}{}
  741. }
  742. lock.Unlock()
  743. TaskMap = map[string]*Task{} //重置
  744. logger.Debug("---保存信息完成---")
  745. }
  746. //创建任务
  747. func CreateLuaTask() {
  748. defer qu.Catch()
  749. logger.Debug("---开始创建任务---")
  750. sess := MgoE.GetMgoConn()
  751. defer MgoE.DestoryMongoConn(sess)
  752. ch := make(chan bool, 1)
  753. wg := &sync.WaitGroup{}
  754. field := map[string]interface{}{
  755. "comeintime": 0,
  756. //"updatetime": 0,
  757. }
  758. query := map[string]interface{}{
  759. "comeintime": map[string]interface{}{
  760. "$gte": GetTime(0),
  761. },
  762. }
  763. it := sess.DB("editor").C("luataskinfo").Find(&query).Select(&field).Iter()
  764. count, _ := sess.DB("editor").C("luataskinfo").Find(&query).Count()
  765. logger.Debug("共有异常爬虫数据量", count, "条")
  766. n := 0
  767. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  768. ch <- true
  769. wg.Add(1)
  770. func(tmp map[string]interface{}) { //目前不用多线程
  771. defer func() {
  772. <-ch
  773. wg.Done()
  774. }()
  775. id := mgo.BsonIdToSId(tmp["_id"])
  776. code := qu.ObjToString(tmp["code"])
  777. site := qu.ObjToString(tmp["site"])
  778. channel := qu.ObjToString(tmp["channel"])
  779. description := qu.ObjToString(tmp["description"])
  780. errtype := qu.ObjToString(tmp["errtype"])
  781. errinfo := tmp["errinfo"].(map[string]interface{})
  782. modifyid := qu.ObjToString(tmp["modifyid"])
  783. modify := qu.ObjToString(tmp["modify"])
  784. event := qu.IntAll(tmp["event"])
  785. state := qu.IntAll(tmp["state"])
  786. //初始化一些任务的变量
  787. n_imin := 0 //最小下载量
  788. n_itimes := 0 //任务出现特别紧急的次数
  789. if state == 1 {
  790. n_itimes = 1
  791. }
  792. n_idn := 0 //下载量
  793. n_sdt := "" //下载量对应的日期
  794. n_surgency := "1" //紧急程度
  795. //
  796. dnerr := errinfo["1"]
  797. if errtype == "1" && dnerr != nil { //只有任务类型是数据量异常时,才记录数据量信息
  798. info := errinfo["1"].(map[string]interface{})
  799. n_imin = qu.IntAll(info["average"])
  800. n_idn = qu.IntAll(info["num"])
  801. n_sdt = qu.ObjToString(info["date"])
  802. }
  803. if errtype == "8" || errtype == "7" || errtype == "6" {
  804. n_surgency = "4"
  805. }
  806. query := map[string]interface{}{
  807. "s_code": code,
  808. "i_state": map[string]interface{}{
  809. "$in": []int{0, 1, 2, 3, 5},
  810. },
  811. }
  812. list, _ := MgoE.Find("task", query, nil, nil, false, -1, -1)
  813. if list != nil && len(*list) > 0 { //已有任务
  814. if len(*list) > 1 {
  815. logger.Error("Code:", code, "任务异常")
  816. MgoE.Save("luacreatetaskerr", map[string]interface{}{
  817. "code": code,
  818. "comeintime": time.Now().Unix(),
  819. "tasknum": len(*list),
  820. })
  821. return
  822. }
  823. task := (*list)[0]
  824. o_istate := qu.IntAll(task["i_state"]) //已有任务的状态
  825. o_stype := qu.ObjToString(task["s_type"]) //已有任务的类型
  826. o_sdescript := qu.ObjToString(task["s_descript"]) //已有任务的描述
  827. o_addinfoid, _ := task["addinfoid"].([]interface{}) //luataskinfo信息
  828. o_lcomplete := qu.Int64All(task["l_complete"]) //已有任务的最迟完成时间
  829. o_surgency := qu.ObjToString(task["s_urgency"]) //已有任务的紧急度
  830. o_iurgency, _ := strconv.Atoi(o_surgency) //已有任务的紧急度int类型
  831. o_itimes := qu.IntAll(task["i_times"]) //已有任务出现的次数
  832. //
  833. o_addinfoid = append(o_addinfoid, id) //追加addinfoid信息
  834. o_sdescript += time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + description //追加描述
  835. set := map[string]interface{}{}
  836. //MgoE.Update("task", q, s, false, false)
  837. if state == 1 { //新任务为待处理
  838. if o_istate <= 2 {
  839. if errtype > o_stype { //历史任务是待确认、待处理、处理中状态且任务类型等级低于新建任务,任务类型替换为新任务类型
  840. o_stype = errtype
  841. }
  842. o_surgency = n_surgency //更新紧急度
  843. o_itimes++
  844. set = map[string]interface{}{
  845. "addinfoid": o_addinfoid,
  846. "s_descript": o_sdescript,
  847. /// "i_min": n_imin,
  848. // "i_num": n_idn,
  849. // "s_downloadtime": n_sdt,
  850. "i_state": state,
  851. "l_complete": CompleteTime(o_surgency),
  852. "s_urgency": o_surgency,
  853. "s_type": o_stype,
  854. "i_times": o_itimes,
  855. "l_updatetime": time.Now().Unix(),
  856. }
  857. } else { //历史任务类型为未通过或待审核,更新信息
  858. set = map[string]interface{}{
  859. "addinfoid": o_addinfoid,
  860. "s_descript": o_sdescript,
  861. "l_updatetime": time.Now().Unix(),
  862. }
  863. }
  864. } else { //新任务为待确认
  865. if o_istate == 0 { //历史任务为待确认
  866. if o_stype == "1" { //历史任务为数量异常待确认
  867. if errtype == "1" { //新任务为数量异常待确认,按紧急程度递增,次数递增
  868. o_iurgency++ //紧急度加一级
  869. if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
  870. o_itimes++
  871. o_iurgency = 4
  872. }
  873. o_surgency = fmt.Sprint(o_iurgency)
  874. o_lcomplete = CompleteTime(o_surgency)
  875. if o_itimes >= 5 { //特别紧急的次数出现5次,自动创建待处理的任务(排除有待审核任务的可能)
  876. state = 1
  877. }
  878. set = map[string]interface{}{
  879. "addinfoid": o_addinfoid,
  880. "s_descript": o_sdescript,
  881. "i_min": n_imin,
  882. "i_num": n_idn,
  883. "s_downloadtime": n_sdt,
  884. "i_state": state,
  885. "l_complete": o_lcomplete,
  886. "s_urgency": o_surgency,
  887. "s_type": errtype,
  888. "i_times": o_itimes,
  889. "l_updatetime": time.Now().Unix(),
  890. }
  891. } else { //新任务为其他异常类型待确认,紧急程度:紧急;
  892. if o_iurgency < 4 { //数量异常,特别紧急以下
  893. o_surgency = "1"
  894. } else {
  895. o_surgency = "2"
  896. }
  897. set = map[string]interface{}{
  898. "addinfoid": o_addinfoid,
  899. "s_descript": o_sdescript,
  900. "i_min": n_imin,
  901. "i_num": n_idn,
  902. "s_downloadtime": n_sdt,
  903. "i_state": state,
  904. "l_complete": CompleteTime(o_surgency),
  905. "s_urgency": o_surgency,
  906. "s_type": errtype,
  907. "l_updatetime": time.Now().Unix(),
  908. }
  909. }
  910. } else { //其他任务类型待确认,历史任务紧急程度+1,次数+1,任务类型更新为异常等级高者且连续4天变为待处理
  911. if errtype > o_stype {
  912. o_stype = errtype
  913. }
  914. o_iurgency++ //紧急度加一级
  915. if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
  916. o_itimes++
  917. o_iurgency = 4
  918. state = 1 //特别紧急,任务变为待处理
  919. }
  920. o_surgency = fmt.Sprint(o_iurgency)
  921. set = map[string]interface{}{
  922. "addinfoid": o_addinfoid,
  923. "s_descript": o_sdescript,
  924. "i_min": n_imin,
  925. "i_num": n_idn,
  926. "s_downloadtime": n_sdt,
  927. "i_state": state,
  928. "l_complete": CompleteTime(o_surgency),
  929. "s_urgency": o_surgency,
  930. "s_type": o_stype,
  931. "i_times": o_itimes,
  932. "l_updatetime": time.Now().Unix(),
  933. }
  934. }
  935. } else { //历史任务为待处理以上,只追加描述
  936. set = map[string]interface{}{
  937. "addinfoid": o_addinfoid,
  938. "s_descript": o_sdescript,
  939. "i_min": n_imin,
  940. "i_num": n_idn,
  941. "s_downloadtime": n_sdt,
  942. "l_updatetime": time.Now().Unix(),
  943. }
  944. }
  945. }
  946. MgoE.Update("task", map[string]interface{}{"_id": task["_id"]}, map[string]interface{}{"$set": set}, false, false)
  947. } else {
  948. SaveTask(code, site, channel, modifyid, modify, description, n_surgency, n_sdt, errtype, state, n_imin, n_idn, event, n_itimes, []string{id})
  949. }
  950. }(tmp)
  951. if n%100 == 0 {
  952. qu.Debug("current:", n)
  953. }
  954. tmp = map[string]interface{}{}
  955. }
  956. wg.Wait()
  957. logger.Debug("---任务创建完成---")
  958. }
  959. func SaveTask(code, site, channel, modifyid, modify, description, urgency, downloadtime, errtype string, state, min, downloadnum, event, times int, addinfoid []string) {
  960. defer qu.Catch()
  961. result := map[string]interface{}{}
  962. // if stateNum := UserTaskNum[modify]; stateNum == nil {
  963. // tmp := map[string]int{fmt.Sprint(state): 1}
  964. // UserTaskNum[modify] = tmp
  965. // } else {
  966. // stateNum[fmt.Sprint(state)]++
  967. // }
  968. // if state == 1 { //待处理任务,紧急程度定为特别紧急
  969. // urgency = "4"
  970. // }
  971. result["s_code"] = code
  972. result["s_site"] = site
  973. result["s_channel"] = channel
  974. result["s_modifyid"] = modifyid
  975. result["s_modify"] = modify
  976. result["s_descript"] = description
  977. result["i_min"] = min
  978. result["i_num"] = downloadnum //下载量
  979. result["s_urgency"] = urgency
  980. result["i_state"] = state
  981. result["i_event"] = event
  982. result["s_downloadtime"] = downloadtime //下载量对应的日期
  983. result["l_comeintime"] = time.Now().Unix()
  984. result["l_updatetime"] = time.Now().Unix()
  985. result["l_complete"] = CompleteTime(urgency)
  986. //result["s_date"] = time.Now().Format(qu.Date_Short_Layout) //任务创建字符串日期
  987. result["i_times"] = times //为了方便编辑器对次数的排序,记录当前的次数
  988. result["s_type"] = errtype //任务类型
  989. result["addinfoid"] = addinfoid //信息id
  990. result["s_source"] = "程序"
  991. MgoE.Save("task", result)
  992. }
  993. func SaveUserCreateTaskNum() {
  994. defer qu.Catch()
  995. for user, sn := range UserTaskNum {
  996. save := map[string]interface{}{}
  997. save["user"] = user
  998. save["comeintime"] = time.Now().Unix()
  999. for s, n := range sn {
  1000. save[s] = n
  1001. }
  1002. MgoE.Save("luausertask", save)
  1003. }
  1004. UserTaskNum = map[string]map[string]int{}
  1005. }
  1006. //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
  1007. func ResetDataState() {
  1008. defer qu.Catch()
  1009. logger.Info("-----更新数据状态-----")
  1010. sess := MgoS.GetMgoConn()
  1011. defer MgoS.DestoryMongoConn(sess)
  1012. ch := make(chan bool, 3)
  1013. wg := &sync.WaitGroup{}
  1014. lock := &sync.Mutex{}
  1015. query := map[string]interface{}{
  1016. "comeintime": map[string]interface{}{
  1017. "$gte": GetTime(-DayNum),
  1018. "$lt": GetTime(0),
  1019. },
  1020. "state": -1,
  1021. }
  1022. field := map[string]interface{}{
  1023. "_id": 1,
  1024. }
  1025. it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
  1026. count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
  1027. logger.Info("更新数据状态数量:", count)
  1028. n := 0
  1029. arr := [][]map[string]interface{}{}
  1030. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  1031. ch <- true
  1032. wg.Add(1)
  1033. go func(tmp map[string]interface{}) {
  1034. defer func() {
  1035. <-ch
  1036. wg.Done()
  1037. }()
  1038. update := []map[string]interface{}{}
  1039. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  1040. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
  1041. lock.Lock()
  1042. arr = append(arr, update)
  1043. if len(arr) > 500 {
  1044. tmps := arr
  1045. MgoS.UpdateBulk("spider_highlistdata", tmps...)
  1046. arr = [][]map[string]interface{}{}
  1047. }
  1048. lock.Unlock()
  1049. }(tmp)
  1050. tmp = map[string]interface{}{}
  1051. }
  1052. wg.Wait()
  1053. lock.Lock()
  1054. if len(arr) > 0 {
  1055. MgoS.UpdateBulk("spider_highlistdata", arr...)
  1056. arr = [][]map[string]interface{}{}
  1057. }
  1058. lock.Unlock()
  1059. logger.Info("-----更新数据状态完毕-----")
  1060. }