newtask.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253
  1. package luatask
  2. import (
  3. "fmt"
  4. "github.com/donnie4w/go-logger/logger"
  5. "go.mongodb.org/mongo-driver/bson"
  6. qu "qfw/util"
  7. "sync"
  8. "time"
  9. "util"
  10. )
  11. const NEWTASK_LISTERR, NEWTASK_DATAINFOERR, NEWTASK_PAGEFLIPERR, NEWTASK_RATEERR, NEWTASK_DOWNLOADERR, NEWTASK_DATAINFOWARN = "1", "2", "3", "4", "5", "6"
  12. var NewCodeInfoMap = map[string]*NewSpider{}
  13. var LuaErrTypeInfo = map[string]string{
  14. NEWTASK_LISTERR: "列表页异常",
  15. NEWTASK_DATAINFOERR: "数据异常错误",
  16. NEWTASK_PAGEFLIPERR: "爬虫翻页异常",
  17. NEWTASK_RATEERR: "采集频率异常",
  18. NEWTASK_DOWNLOADERR: "下载异常",
  19. NEWTASK_DATAINFOWARN: "数据异常警告",
  20. }
  21. var DataInfoErrMap = map[int]string{
  22. 1: "Save Coll Error",
  23. 2: "File Size Or Url Error",
  24. 4: "Field Value Is Null",
  25. 9: "Html Contains Temp Language",
  26. 10: "Publishtime Is Error",
  27. 11: "Publishtime Is Zero",
  28. 12: "Field Type Error",
  29. }
  30. var DataInfoWarnMap = map[int]string{
  31. 5: "Field Value Contains Random Code",
  32. 6: "Field Value Not Contains Chinese",
  33. 8: "Detail File Err",
  34. }
  35. var UpdateLuaconfig [][]map[string]interface{}
  36. type NewSpider struct {
  37. //爬虫基本信息
  38. Code string `bson:"code"`
  39. Site string `bson:"site"`
  40. Channel string `bson:"channel"`
  41. Platform string `bson:"platform"`
  42. Event int `bson:"event"`
  43. InfoFormat int `bson:"infoformat"`
  44. PendState int `bson:"pendstate"`
  45. ModifyUser string `bson:"modifyuser"`
  46. ModifyId string `bson:"modifyuserid"`
  47. ModifyTime int64 `bson:"modifytime"`
  48. Model int `bson:"model"`
  49. Working int `bson:"working"`
  50. AuditTime int64 `bson:"l_uploadtime"`
  51. ListIsFilter bool `bson:"listisfilter"`
  52. UpLimit int `bson:"uplimit"`
  53. MaxPage int `bson:"maxpage"`
  54. Page_FlipOk bool `bson:"page_flipok"`
  55. Page_OneOk bool `bson:"page_oneok"`
  56. Page_TwoOk bool `bson:"page_twook"`
  57. CodeTags map[string]interface{} `bson:"codetags"`
  58. //统计信息
  59. Detail_DownloadNum int `bson:"detail_downloadnum"`
  60. Detail_DownloadSuccessNum int `bson:"detail_downloadsuccessnum"`
  61. Detail_DownloadFailNum int `bson:"detail_downloadfailnum"`
  62. List_IsGetData bool `bson:"list_isgetdata"`
  63. HeartTime int64 `bson:"heart_time"`
  64. List_RunTimes int `bson:"list_runtimes"`
  65. List_NoDataTimes int `bson:"list_nodatatimes"`
  66. List_AllInTimes int `bson:"list_allintimes"`
  67. WarnInfoMap map[int]*WarnInfo `bson:"warninfo"`
  68. //python
  69. Py_TaskId string `bson:"py_taskid"`
  70. Py_NodeName string `bson:"py_nodename"`
  71. Py_IsValid bool `bson:"py_isvalid"`
  72. //站点信息
  73. Channel_Status int `bson:"channel_status"` //栏目响应状态
  74. //补充信息
  75. Comeintime int64 `bson:"comeintime"`
  76. //异常汇总
  77. //Error map[string]*ErrorInfo `json:"error"`
  78. ErrType string `bson:"errtype"` //记录权重最高的异常类型
  79. ErrTypeMap map[int]bool `bson:"errtypemap"` //记录所有异常
  80. ErrDescription string `bson:"errdescription"` //异常描述
  81. }
  82. type WarnInfo struct {
  83. Info string `bson:"info"`
  84. Num int `bson:"num"`
  85. Fields map[string]int `bson:"fields"`
  86. Hrefs map[string]string `bson:"hrefs"`
  87. }
  88. func NewStartTask() {
  89. InitInfo() //初始化时间
  90. logger.Info(StartTime, EndTime, Publishtime)
  91. getCodeBaseInfo() //获取爬虫基本信息
  92. getCodeStatus() //获取爬虫响应状态信息
  93. getPythonSummaryInfo() //获取python汇总信息
  94. getLuaSummaryInfo() //获取lua汇总信息
  95. getSpiderWarnInfo() //获取异常数据
  96. saveCodeInfo() //汇总异常信息,产出任务
  97. updateLuaconfig() //更新爬虫信息
  98. closeTask()
  99. }
  100. func getCodeBaseInfo() {
  101. defer qu.Catch()
  102. sess := util.MgoEB.GetMgoConn()
  103. defer util.MgoEB.DestoryMongoConn(sess)
  104. lock := &sync.Mutex{}
  105. wg := &sync.WaitGroup{}
  106. ch := make(chan bool, 5)
  107. query := map[string]interface{}{
  108. "$or": []interface{}{
  109. //lua、python上线爬虫
  110. map[string]interface{}{
  111. "state": map[string]interface{}{
  112. "$in": []int{5, 11}, //上架、上线爬虫
  113. },
  114. },
  115. //lua正在被维护的爬虫和上架爬虫
  116. map[string]interface{}{
  117. "platform": map[string]interface{}{
  118. "$in": []string{"golua平台", "chrome"},
  119. },
  120. "state": map[string]interface{}{
  121. "$in": []int{0, 1, 2}, //待完成、待审核、未通过
  122. },
  123. "event": map[string]interface{}{
  124. "$ne": 7000,
  125. },
  126. },
  127. },
  128. }
  129. fields := map[string]interface{}{
  130. "code": 1,
  131. "site": 1,
  132. "channel": 1,
  133. "platform": 1,
  134. "event": 1,
  135. "pendstate": 1,
  136. "modifyuser": 1,
  137. "modifyuserid": 1,
  138. "modifytime": 1,
  139. "l_uploadtime": 1,
  140. "listisfilter": 1,
  141. "codetags": 1,
  142. "infoformat": 1,
  143. "param_common": 1,
  144. }
  145. it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
  146. n := 0
  147. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  148. wg.Add(1)
  149. ch <- true
  150. go func(tmp map[string]interface{}) {
  151. defer func() {
  152. <-ch
  153. wg.Done()
  154. }()
  155. sp := &NewSpider{
  156. WarnInfoMap: map[int]*WarnInfo{},
  157. //Error: map[string]*ErrorInfo{},
  158. ErrType: "-1",
  159. ErrTypeMap: map[int]bool{},
  160. Page_FlipOk: true,
  161. Page_OneOk: true,
  162. Page_TwoOk: true,
  163. }
  164. param_common := tmp["param_common"].([]interface{})
  165. maxPage := qu.IntAll(param_common[5])
  166. delete(tmp, "param_common")
  167. luaByte, _ := bson.Marshal(tmp)
  168. if bson.Unmarshal(luaByte, &sp) != nil {
  169. qu.Info("初始化爬虫失败:", tmp["_id"])
  170. return
  171. }
  172. sp.Working = util.CodeEventWorking[sp.Working]
  173. sp.Model = util.CodeEventModel[sp.Event]
  174. sp.MaxPage = maxPage
  175. if sp.Platform == "python" {
  176. sp.Model = 1
  177. }
  178. lock.Lock()
  179. NewCodeInfoMap[sp.Code] = sp
  180. lock.Unlock()
  181. }(tmp)
  182. if n%1000 == 0 {
  183. logger.Info(n)
  184. }
  185. tmp = map[string]interface{}{}
  186. }
  187. wg.Wait()
  188. logger.Info("爬虫基本信息准备完成...", len(NewCodeInfoMap))
  189. }
  190. func getCodeStatus() {
  191. defer qu.Catch()
  192. sess := util.MgoEB.GetMgoConn()
  193. defer util.MgoEB.DestoryMongoConn(sess)
  194. lock := &sync.Mutex{}
  195. wg := &sync.WaitGroup{}
  196. ch := make(chan bool, 5)
  197. fields := map[string]interface{}{
  198. "spidercode": 1,
  199. "status_code": 1,
  200. }
  201. it := sess.DB(util.MgoPy.DbName).C("site_monitor").Find(nil).Select(&fields).Iter()
  202. n := 0
  203. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  204. wg.Add(1)
  205. ch <- true
  206. go func(tmp map[string]interface{}) {
  207. defer func() {
  208. <-ch
  209. wg.Done()
  210. }()
  211. code := qu.ObjToString(tmp["spidercode"])
  212. status := qu.IntAll(tmp["status_code"])
  213. lock.Lock()
  214. if sp := NewCodeInfoMap[code]; sp != nil {
  215. sp.Channel_Status = status
  216. }
  217. lock.Unlock()
  218. }(tmp)
  219. if n%1000 == 0 {
  220. logger.Info(n)
  221. }
  222. tmp = map[string]interface{}{}
  223. }
  224. wg.Wait()
  225. logger.Info("栏目响应状态信息准备完成...", len(NewCodeInfoMap))
  226. }
  227. func getPythonSummaryInfo() {
  228. defer qu.Catch()
  229. sess := util.MgoPy.GetMgoConn()
  230. defer util.MgoPy.DestoryMongoConn(sess)
  231. lock := &sync.Mutex{}
  232. wg := &sync.WaitGroup{}
  233. ch := make(chan bool, 5)
  234. query := map[string]interface{}{
  235. "comeintime": map[string]interface{}{
  236. "$gte": util.GetTime(0),
  237. },
  238. }
  239. it := sess.DB(util.MgoPy.DbName).C("spider_monitor").Find(&query).Iter()
  240. n := 0
  241. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  242. wg.Add(1)
  243. ch <- true
  244. go func(tmp map[string]interface{}) {
  245. defer func() {
  246. <-ch
  247. wg.Done()
  248. }()
  249. code := qu.ObjToString(tmp["code"])
  250. is_valid, _ := tmp["is_valid"].(bool) //无效监控爬虫
  251. py_taskid := qu.ObjToString(tmp["py_taskid"])
  252. py_nodename := qu.ObjToString(tmp["py_nodename"])
  253. list_isgetdata, _ := tmp["list_isgetdata"].(bool)
  254. list_allintimes := qu.IntAll(tmp["list_allintimes"])
  255. list_nodatatimes := qu.IntAll(tmp["list_nodatatimes"])
  256. list_runtimes := qu.IntAll(tmp["list_runtimes"])
  257. detail_downloadnum := qu.IntAll(tmp["detail_downloadnum"])
  258. detail_downloadsuccessnum := qu.IntAll(tmp["detail_downloadsuccessnum"])
  259. detail_downloadfailnum := qu.IntAll(tmp["detail_downloadfailnum"])
  260. lock.Lock()
  261. if sp := NewCodeInfoMap[code]; sp != nil {
  262. sp.Py_TaskId = py_taskid
  263. sp.Py_NodeName = py_nodename
  264. sp.Py_IsValid = is_valid
  265. sp.List_IsGetData = list_isgetdata
  266. sp.List_AllInTimes = list_allintimes
  267. sp.List_NoDataTimes = list_nodatatimes
  268. sp.List_RunTimes = list_runtimes
  269. sp.Detail_DownloadNum = detail_downloadnum
  270. sp.Detail_DownloadSuccessNum = detail_downloadsuccessnum
  271. sp.Detail_DownloadFailNum = detail_downloadfailnum
  272. }
  273. lock.Unlock()
  274. }(tmp)
  275. if n%1000 == 0 {
  276. logger.Info(n)
  277. }
  278. tmp = map[string]interface{}{}
  279. }
  280. wg.Wait()
  281. logger.Info("python汇总信息完成...")
  282. }
  283. func getLuaSummaryInfo() {
  284. getSpiderHeart() //获取心跳信息
  285. getSpiderHighListDownloadNum() //获取分开采集模式爬虫下载量信息
  286. getSpiderListDownloadNum() //获取顺序采集模式爬虫下载量信息
  287. getSpiderDownloadRateData() //获取下载详情
  288. }
  289. func getSpiderWarnInfo() {
  290. defer qu.Catch()
  291. sess := util.MgoS.GetMgoConn()
  292. defer util.MgoS.DestoryMongoConn(sess)
  293. query := map[string]interface{}{
  294. "comeintime": map[string]interface{}{
  295. "$gte": StartTime,
  296. "$lt": EndTime,
  297. },
  298. }
  299. fields := map[string]interface{}{
  300. "field": 1,
  301. "level": 1,
  302. "info": 1,
  303. "code": 1,
  304. "infotype": 1,
  305. "href": 1,
  306. "data.publishtime": 1,
  307. "data.l_np_publishtime": 1,
  308. }
  309. it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Select(&fields).Iter()
  310. n := 0
  311. ch := make(chan bool, 5)
  312. wg := &sync.WaitGroup{}
  313. lock := &sync.Mutex{}
  314. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  315. wg.Add(1)
  316. ch <- true
  317. go func(tmp map[string]interface{}) {
  318. defer func() {
  319. <-ch
  320. wg.Done()
  321. }()
  322. infotype := qu.IntAll(tmp["infotype"])
  323. level := qu.IntAll(tmp["level"])
  324. field := qu.ObjToString(tmp["field"])
  325. if infotype == 3 || infotype == 7 {
  326. return
  327. }
  328. if (infotype == 5 || infotype == 6) && level == 1 {
  329. return
  330. } else if infotype == 8 && field == "projectinfo" {
  331. return
  332. }
  333. if infotype == 2 || infotype == 6 || infotype == 8 {
  334. if data, ok := tmp["data"].(map[string]interface{}); ok {
  335. var ptime int64
  336. if l_np_publishtime := data["l_np_publishtime"]; l_np_publishtime != nil {
  337. ptime = qu.Int64All(l_np_publishtime)
  338. } else if publishtime := data["publishtime"]; publishtime != nil {
  339. ptime = qu.Int64All(publishtime)
  340. }
  341. if ptime < time.Now().AddDate(0, -6, 0).Unix() { //半年内的异常数据有效
  342. return
  343. }
  344. }
  345. }
  346. code := qu.ObjToString(tmp["code"])
  347. info := qu.ObjToString(tmp["info"])
  348. href := qu.ObjToString(tmp["href"])
  349. lock.Lock()
  350. if sp := NewCodeInfoMap[code]; sp != nil {
  351. if wf := sp.WarnInfoMap[infotype]; wf != nil {
  352. if wf.Fields[field] == 0 {
  353. wf.Hrefs[field] = href
  354. }
  355. wf.Fields[field] += 1
  356. } else {
  357. sp.WarnInfoMap[infotype] = &WarnInfo{
  358. Info: info,
  359. Num: 1,
  360. Fields: map[string]int{field: 1},
  361. Hrefs: map[string]string{field: href},
  362. }
  363. }
  364. }
  365. lock.Unlock()
  366. }(tmp)
  367. if n%1000 == 0 {
  368. logger.Info(n)
  369. }
  370. tmp = map[string]interface{}{}
  371. }
  372. wg.Wait()
  373. logger.Info("错误信息数据统计完成...")
  374. }
  375. func getSpiderHeart() {
  376. defer qu.Catch()
  377. sess := util.MgoS.GetMgoConn()
  378. defer util.MgoS.DestoryMongoConn(sess)
  379. query := map[string]interface{}{
  380. "del": false,
  381. }
  382. fields := map[string]interface{}{
  383. "code": 1,
  384. "findlist": 1,
  385. }
  386. lock := &sync.Mutex{}
  387. wg := &sync.WaitGroup{}
  388. ch := make(chan bool, 5)
  389. it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter()
  390. n := 0
  391. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  392. wg.Add(1)
  393. ch <- true
  394. go func(tmp map[string]interface{}) {
  395. defer func() {
  396. <-ch
  397. wg.Done()
  398. }()
  399. code := qu.ObjToString(tmp["code"])
  400. findListHeart := qu.Int64All(tmp["findlist"])
  401. lock.Lock()
  402. if sp := NewCodeInfoMap[code]; sp != nil {
  403. //limitDayNum := 0
  404. //if sp.Event == 7520 { //由于7520节点爬虫循环一轮的时间较长,心跳有可能仍是前一天的
  405. // limitDayNum = -1
  406. //}
  407. sp.List_IsGetData = findListHeart > util.GetTime(0)-int64(12*3600) //前一天12点
  408. sp.HeartTime = findListHeart
  409. }
  410. lock.Unlock()
  411. }(tmp)
  412. if n%100 == 0 {
  413. logger.Info(n)
  414. }
  415. tmp = map[string]interface{}{}
  416. }
  417. wg.Wait()
  418. logger.Info("lua统计心跳信息完成...")
  419. }
  420. func getSpiderHighListDownloadNum() { //竞品数据暂未统计(延迟采集)
  421. defer qu.Catch()
  422. sess := util.MgoS.GetMgoConn()
  423. defer util.MgoS.DestoryMongoConn(sess)
  424. query := map[string]interface{}{
  425. "comeintime": map[string]interface{}{
  426. "$gte": StartTime,
  427. "$lt": EndTime,
  428. },
  429. }
  430. fields := map[string]interface{}{
  431. "spidercode": 1,
  432. "state": 1,
  433. "times": 1,
  434. }
  435. lock := &sync.Mutex{}
  436. wg := &sync.WaitGroup{}
  437. ch := make(chan bool, 5)
  438. //1、统计spider_highlistdata
  439. it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
  440. n := 0
  441. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  442. wg.Add(1)
  443. ch <- true
  444. go func(tmp map[string]interface{}) {
  445. defer func() {
  446. <-ch
  447. wg.Done()
  448. }()
  449. code := qu.ObjToString(tmp["spidercode"])
  450. state := qu.IntAll(tmp["state"])
  451. times := tmp["times"]
  452. lock.Lock()
  453. if sp := NewCodeInfoMap[code]; sp != nil {
  454. if state == 1 {
  455. sp.Detail_DownloadSuccessNum++
  456. } else if state == -1 {
  457. sp.Detail_DownloadFailNum++
  458. } else if state == 0 && times != nil {
  459. sp.Detail_DownloadFailNum++
  460. }
  461. //未统计未下载的数据量 state==0,times==nil
  462. sp.Detail_DownloadNum++
  463. }
  464. lock.Unlock()
  465. }(tmp)
  466. if n%1000 == 0 {
  467. logger.Info(n)
  468. }
  469. tmp = map[string]interface{}{}
  470. }
  471. wg.Wait()
  472. logger.Info("lua统计采集量spider_highlistdata完成...")
  473. }
  474. func getSpiderListDownloadNum() {
  475. defer qu.Catch()
  476. sess := util.MgoS.GetMgoConn()
  477. defer util.MgoS.DestoryMongoConn(sess)
  478. query := map[string]interface{}{
  479. "comeintime": map[string]interface{}{
  480. "$gte": StartTime,
  481. "$lt": EndTime,
  482. },
  483. }
  484. fields := map[string]interface{}{
  485. "spidercode": 1,
  486. "state": 1,
  487. "href": 1,
  488. }
  489. lock := &sync.Mutex{}
  490. wg := &sync.WaitGroup{}
  491. ch := make(chan bool, 5)
  492. repeatHrefMap := map[string]int{}
  493. it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Sort("_id").Iter()
  494. n := 0
  495. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  496. wg.Add(1)
  497. ch <- true
  498. go func(tmp map[string]interface{}) {
  499. defer func() {
  500. <-ch
  501. wg.Done()
  502. }()
  503. state := qu.IntAll(tmp["state"])
  504. code := qu.ObjToString(tmp["spidercode"])
  505. href := qu.ObjToString(tmp["href"])
  506. lock.Lock()
  507. defer lock.Unlock()
  508. if sp := NewCodeInfoMap[code]; sp != nil {
  509. tmpState := repeatHrefMap[href]
  510. if tmpState == 1 { //该href已记录下载成功,后续不做任务记录
  511. return
  512. } else if tmpState == 0 { //未曾记录该href
  513. //if sp := NewCodeInfoMap[code]; sp != nil {
  514. if state == 1 {
  515. sp.Detail_DownloadSuccessNum++
  516. } else {
  517. state = -1
  518. sp.Detail_DownloadFailNum++
  519. }
  520. sp.Detail_DownloadNum++
  521. repeatHrefMap[href] = state
  522. //}
  523. } else if tmpState == -1 && state == 1 { //已记录状态是下载失败,当前下载成功,记录该href最终为下载成功
  524. //if sp := NewCodeInfoMap[code]; sp != nil {
  525. sp.Detail_DownloadSuccessNum++
  526. sp.Detail_DownloadFailNum--
  527. repeatHrefMap[href] = state
  528. //}
  529. }
  530. }
  531. }(tmp)
  532. if n%1000 == 0 {
  533. logger.Info(n)
  534. }
  535. tmp = map[string]interface{}{}
  536. }
  537. wg.Wait()
  538. repeatHrefMap = map[string]int{}
  539. logger.Info("lua统计spider_listdata采集量完成...")
  540. }
  541. func getSpiderDownloadRateData() {
  542. defer qu.Catch()
  543. sess := util.MgoS.GetMgoConn()
  544. defer util.MgoS.DestoryMongoConn(sess)
  545. ch := make(chan bool, 5)
  546. wg := &sync.WaitGroup{}
  547. lock := &sync.Mutex{}
  548. date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
  549. query := map[string]interface{}{
  550. "date": date,
  551. //"event": map[string]interface{}{
  552. // "$ne": 7000,
  553. //},
  554. }
  555. fields := map[string]interface{}{
  556. "spidercode": 1,
  557. "alltimes": 1,
  558. "zero": 1,
  559. "oh_percent": 1,
  560. "uplimit": 1,
  561. "page_fail": 1,
  562. "page_onefail": 1,
  563. }
  564. it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter()
  565. n := 0
  566. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  567. ch <- true
  568. wg.Add(1)
  569. go func(tmp map[string]interface{}) {
  570. defer func() {
  571. <-ch
  572. wg.Done()
  573. }()
  574. code := qu.ObjToString(tmp["spidercode"])
  575. alltimes := qu.IntAll(tmp["alltimes"])
  576. zero := qu.IntAll(tmp["zero"])
  577. oh_percent := qu.IntAll(tmp["oh_percent"])
  578. uplimit := qu.IntAll(tmp["uplimit"])
  579. page_fail := qu.IntAll(tmp["page_fail"])
  580. page_onefail := qu.IntAll(tmp["page_onefail"])
  581. lock.Lock()
  582. if sp := NewCodeInfoMap[code]; sp != nil {
  583. sp.List_NoDataTimes = zero
  584. sp.List_RunTimes = alltimes
  585. sp.List_AllInTimes = oh_percent
  586. sp.Page_FlipOk = !(uplimit > 0)
  587. sp.UpLimit = uplimit
  588. sp.Page_OneOk = !(page_onefail == alltimes && page_onefail > 0)
  589. sp.Page_TwoOk = !(page_fail == alltimes && page_fail > 0)
  590. }
  591. lock.Unlock()
  592. }(tmp)
  593. if n%1000 == 0 {
  594. logger.Info(n)
  595. }
  596. tmp = map[string]interface{}{}
  597. }
  598. wg.Wait()
  599. logger.Info("lua爬虫采集详情统计完成...")
  600. }
  601. func saveCodeInfo() {
  602. defer qu.Catch()
  603. lock := &sync.Mutex{}
  604. wg := &sync.WaitGroup{}
  605. ch := make(chan bool, 5)
  606. comeintime := time.Now().Unix()
  607. codeInfoArr := []map[string]interface{}{} //爬虫下载详情
  608. taskArr := [][]map[string]interface{}{} //任务更新集
  609. for _, spider := range NewCodeInfoMap {
  610. ch <- true
  611. wg.Add(1)
  612. go func(sp *NewSpider) {
  613. defer func() {
  614. <-ch
  615. wg.Done()
  616. }()
  617. getAllErr(sp) //汇总异常
  618. createTask(sp, &taskArr, lock) //创建任务
  619. sp.Comeintime = comeintime
  620. spByte, err := bson.Marshal(sp)
  621. if err != nil {
  622. logger.Info("Json Marshal Error", sp.Code)
  623. return
  624. }
  625. lock.Lock()
  626. defer lock.Unlock()
  627. tmp := map[string]interface{}{}
  628. if bson.Unmarshal(spByte, &tmp) == nil {
  629. codeInfoArr = append(codeInfoArr, tmp)
  630. } else {
  631. logger.Error("Json UnMarshal Error", sp.Code)
  632. return
  633. }
  634. if len(codeInfoArr) > 500 {
  635. util.MgoS.SaveBulk("spider_info", codeInfoArr...)
  636. codeInfoArr = []map[string]interface{}{}
  637. }
  638. if len(taskArr) > 500 {
  639. util.MgoEB.UpSertBulk("task", taskArr...)
  640. taskArr = [][]map[string]interface{}{}
  641. }
  642. }(spider)
  643. }
  644. wg.Wait()
  645. if len(codeInfoArr) > 0 {
  646. util.MgoS.SaveBulk("spider_info", codeInfoArr...)
  647. codeInfoArr = []map[string]interface{}{}
  648. }
  649. if len(taskArr) > 0 {
  650. util.MgoEB.UpSertBulk("task", taskArr...)
  651. taskArr = [][]map[string]interface{}{}
  652. }
  653. NewCodeInfoMap = map[string]*NewSpider{}
  654. logger.Info("爬虫统计完成...")
  655. }
  656. func createTask(sp *NewSpider, taskArr *[][]map[string]interface{}, lock *sync.Mutex) {
  657. defer qu.Catch()
  658. if sp.Event == 7000 {
  659. return
  660. }
  661. if sp.ErrType == "-1" { //无异常
  662. return
  663. }
  664. if !util.CreateTaskInfoFormat[sp.InfoFormat] { //非创建任务爬虫
  665. return
  666. }
  667. //查询历史任务
  668. query := map[string]interface{}{
  669. "s_code": sp.Code,
  670. "i_state": map[string]interface{}{
  671. "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务
  672. },
  673. }
  674. fields := map[string]interface{}{
  675. "i_state": 1,
  676. "s_type": 1,
  677. "s_descript": 1,
  678. "i_times": 1,
  679. "l_comeintime": 1,
  680. }
  681. list, _ := util.MgoEB.Find("task", query, nil, fields, false, -1, -1)
  682. update := []map[string]interface{}{}
  683. if list != nil && len(*list) > 0 { //已有任务
  684. if len(*list) > 1 {
  685. logger.Error("Code:", sp.Code, "任务异常")
  686. util.MgoEB.Save("luacreatetaskerr", map[string]interface{}{
  687. "code": sp.Code,
  688. "comeintime": time.Now().Unix(),
  689. "tasknum": len(*list),
  690. })
  691. return
  692. }
  693. task := (*list)[0] //唯一任务
  694. state_old := qu.IntAll(task["i_state"]) //历史任务状态
  695. times_old := qu.IntAll(task["i_times"]) //历史任务次数
  696. type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型
  697. descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述
  698. comeintime_old := qu.Int64All(task["l_comeintime"]) //历史任务创建时间
  699. result := map[string]interface{}{
  700. "i_event": sp.Event,
  701. "l_updatetime": time.Now().Unix(),
  702. "i_times": times_old + 1,
  703. "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + sp.ErrDescription,
  704. }
  705. //任务状态
  706. if state_old == 0 {
  707. if sp.ErrType == NEWTASK_LISTERR || sp.ErrType == NEWTASK_DATAINFOERR {
  708. result["i_state"] = 1
  709. } else if comeintime_old >= util.GetTime(-30) { //在一个月内有历史任务
  710. result["i_state"] = 1
  711. } else {
  712. result["l_complete"] = util.CompleteTime("1")
  713. result["l_comeintime"] = time.Now().Unix()
  714. result["l_updatetime"] = time.Now().Unix()
  715. }
  716. }
  717. //任务类型
  718. if sp.ErrType < type_old { //取优先级高者
  719. result["s_type"] = sp.ErrType
  720. }
  721. update = append(update, map[string]interface{}{"_id": task["_id"]})
  722. update = append(update, map[string]interface{}{"$set": result})
  723. lock.Lock()
  724. *taskArr = append(*taskArr, update)
  725. lock.Unlock()
  726. } else { //无历史任务
  727. state_new := 0
  728. //if sp.ErrType == 1 && sp.Channel_Status != 200 { //列表页异常任务,栏目响应状态异常者,直接建待处理任务
  729. // state_new = 1
  730. //}
  731. if sp.ErrType == NEWTASK_LISTERR || sp.ErrType == NEWTASK_DATAINFOERR {
  732. state_new = 1
  733. }
  734. saveMap := map[string]interface{}{
  735. "s_modify": sp.ModifyUser,
  736. "s_modifyid": sp.ModifyId,
  737. "s_code": sp.Code,
  738. "s_site": sp.Site,
  739. "s_channel": sp.Channel,
  740. "i_event": sp.Event,
  741. "i_state": state_new,
  742. "s_source": "程序",
  743. "s_type": sp.ErrType,
  744. "s_descript": sp.ErrDescription,
  745. "i_times": 1,
  746. "l_comeintime": time.Now().Unix(),
  747. "l_complete": util.CompleteTime("1"),
  748. //"s_urgency": "1",
  749. "s_platform": sp.Platform,
  750. }
  751. update = append(update, query)
  752. update = append(update, saveMap)
  753. lock.Lock()
  754. *taskArr = append(*taskArr, update)
  755. lock.Unlock()
  756. }
  757. }
  758. func getAllErr(sp *NewSpider) {
  759. listErr(sp) //列表页异常
  760. dataInfoErr(sp) //数据异常错误
  761. pageFlipErr(sp) //爬虫翻页异常
  762. downloadRateErr(sp) //下载频率异常
  763. downloadFailedErr(sp) //下载异常
  764. dataInfoWarn(sp) //数据异常警告
  765. }
  766. func listErr(sp *NewSpider) {
  767. defer qu.Catch()
  768. if sp.Platform == "python" && !sp.Py_IsValid {
  769. return
  770. }
  771. //if !sp.List_IsGetData || sp.List_RunTimes == 0 {
  772. if !sp.List_IsGetData {
  773. errFlag := false
  774. if sp.CodeTags != nil {
  775. tagTime, _ := sp.CodeTags[NEWTASK_LISTERR].(int64) //用struct接收,会转为floa64
  776. if tagTime == 0 { //无列表异常标记
  777. errFlag = true
  778. } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
  779. errFlag = true
  780. }
  781. } else { //无任何标记
  782. errFlag = true
  783. }
  784. if errFlag {
  785. //sp.Error[NEWTASK_LISTERR] = &ErrorInfo{
  786. // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_LISTERR]: true},
  787. //}
  788. sp.ErrType = NEWTASK_LISTERR
  789. sp.ErrTypeMap[qu.IntAll(NEWTASK_LISTERR)] = true
  790. heartTime := ""
  791. if sp.HeartTime != 0 {
  792. heartTime = qu.FormatDateByInt64(&sp.HeartTime, qu.Date_Full_Layout)
  793. }
  794. sp.ErrDescription += "列表页异常:\n 无最新心跳:" + heartTime + "\n"
  795. //if !sp.List_IsGetData {
  796. // heartTime := ""
  797. // if sp.HeartTime != 0 {
  798. // heartTime = qu.FormatDateByInt64(&sp.HeartTime, qu.Date_Full_Layout)
  799. // }
  800. // sp.ErrDescription += "列表页异常:\n 无最新心跳:" + heartTime + "\n"
  801. //} else if sp.List_RunTimes == 0 {
  802. // sp.ErrDescription += "列表页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_NoDataTimes) + "轮无数据\n"
  803. //}
  804. }
  805. }
  806. }
  807. func dataInfoErr(sp *NewSpider) {
  808. defer qu.Catch()
  809. if len(sp.WarnInfoMap) > 0 {
  810. errFlag := false
  811. resultDescription := ""
  812. for err, _ := range DataInfoErrMap {
  813. if wf := sp.WarnInfoMap[err]; wf != nil {
  814. tmpDescription := ""
  815. for field, href := range wf.Hrefs {
  816. tmpDescription += " 字段" + field + ":" + href + "\n"
  817. }
  818. if tmpDescription != "" {
  819. resultDescription += " " + wf.Info + "\n" + tmpDescription
  820. }
  821. errFlag = true
  822. }
  823. }
  824. if errFlag {
  825. //sp.Error[NEWTASK_DATAINFOERR] = &ErrorInfo{
  826. // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_DATAINFOERR]: true},
  827. //}
  828. sp.ErrDescription += "数据异常错误:\n" + resultDescription
  829. sp.ErrTypeMap[qu.IntAll(NEWTASK_DATAINFOERR)] = true
  830. if sp.ErrType < "0" {
  831. sp.ErrType = NEWTASK_DATAINFOERR
  832. }
  833. }
  834. }
  835. }
  836. func pageFlipErr(sp *NewSpider) {
  837. defer qu.Catch()
  838. if sp.Platform == "python" {
  839. return
  840. }
  841. errFlag := false
  842. if sp.CodeTags != nil {
  843. tagTime, _ := sp.CodeTags[NEWTASK_PAGEFLIPERR].(int64)
  844. if tagTime == 0 { //无翻页异常标记
  845. errFlag = true
  846. } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
  847. errFlag = true
  848. }
  849. } else { //无标记,记录翻页异常
  850. errFlag = true
  851. }
  852. if errFlag {
  853. //1、无限翻页爬虫列表页采集时超过最大限制页,高性能100页,队列50页
  854. if !sp.Page_FlipOk && sp.Model == 1 {
  855. sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true
  856. sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.UpLimit) + "轮列表页采集翻页超过最大限制\n"
  857. if sp.ErrType < "0" {
  858. sp.ErrType = NEWTASK_PAGEFLIPERR
  859. }
  860. }
  861. //2、爬虫列表页采集第一页无数据,第二页有数据
  862. if !sp.Page_OneOk {
  863. sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true
  864. sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_RunTimes) + "轮爬虫未采集到第一页数据\n"
  865. if sp.ErrType < "0" {
  866. sp.ErrType = NEWTASK_PAGEFLIPERR
  867. }
  868. }
  869. //3、爬虫列表页采集第一页有数据,第二页无数据或第二页数据与第一页数据相同
  870. if !sp.Page_TwoOk {
  871. sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true
  872. sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_RunTimes) + "轮爬虫采集的第一、二页数据相同或未采集到第二页数据\n"
  873. if sp.ErrType < "0" {
  874. sp.ErrType = NEWTASK_PAGEFLIPERR
  875. }
  876. }
  877. }
  878. }
  879. func downloadRateErr(sp *NewSpider) {
  880. defer qu.Catch()
  881. if sp.Platform == "python" {
  882. if !sp.Py_IsValid { //无效爬虫
  883. return
  884. } else {
  885. errFlag := false
  886. if sp.CodeTags != nil {
  887. tagTime, _ := sp.CodeTags[NEWTASK_RATEERR].(int64)
  888. if tagTime == 0 { //无频率异常标记
  889. errFlag = true
  890. } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
  891. errFlag = true
  892. }
  893. } else { //无标记,记录采集频率异常
  894. errFlag = true
  895. }
  896. if errFlag && sp.List_AllInTimes > 0 && sp.AuditTime > 24 {
  897. sp.ErrTypeMap[qu.IntAll(NEWTASK_RATEERR)] = true
  898. sp.ErrDescription += "采集频率异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_AllInTimes) + "轮数据全采\n"
  899. if sp.ErrType < "0" {
  900. sp.ErrType = NEWTASK_RATEERR
  901. }
  902. }
  903. }
  904. } else { //lua
  905. if sp.List_AllInTimes > 0 {
  906. errFlag := false
  907. if sp.Model == 1 && sp.AuditTime > 24 && (sp.MaxPage == 1 || sp.MaxPage > 100) { //分开采集,且爬虫审核时间超过24小时,记录异常
  908. errFlag = true
  909. } else if sp.Event != 7410 { //顺序采集(7410节点不建采集频率异常任务)
  910. if sp.CodeTags != nil {
  911. tagTime, _ := sp.CodeTags[NEWTASK_RATEERR].(int64)
  912. if tagTime == 0 { //无频率异常标记
  913. errFlag = true
  914. } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
  915. errFlag = true
  916. }
  917. } else { //无标记,记录采集频率异常
  918. errFlag = true
  919. }
  920. }
  921. if errFlag {
  922. //sp.Error[NEWTASK_RATEERR] = &ErrorInfo{
  923. // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_RATEERR]: true},
  924. //}
  925. sp.ErrTypeMap[qu.IntAll(NEWTASK_RATEERR)] = true
  926. sp.ErrDescription += "采集频率异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_AllInTimes) + "轮数据全采\n"
  927. if sp.ErrType < "0" {
  928. sp.ErrType = NEWTASK_RATEERR
  929. }
  930. }
  931. }
  932. }
  933. }
  934. func downloadFailedErr(sp *NewSpider) {
  935. defer qu.Catch()
  936. if sp.Platform == "python" && !sp.Py_IsValid {
  937. return
  938. }
  939. flagTime := util.GetTime(-7)
  940. if sp.Detail_DownloadFailNum > 0 {
  941. tagTime := int64(0)
  942. if sp.CodeTags != nil {
  943. tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64) //历史标记
  944. if tagTime > flagTime { //标记未超期
  945. if sp.Detail_DownloadFailNum == sp.Detail_DownloadNum { //全部下载失败,删除标记
  946. delete(sp.CodeTags, NEWTASK_DOWNLOADERR)
  947. UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{
  948. {"code": sp.Code},
  949. {"$set": map[string]interface{}{
  950. "codetags": sp.CodeTags,
  951. }},
  952. })
  953. } else {
  954. return
  955. }
  956. }
  957. }
  958. if sp.Model == 1 { //分开采集
  959. errFlag := false
  960. if sp.Detail_DownloadNum < 100 { //下载总量小于100
  961. if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个
  962. errFlag = true //异常
  963. } else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20%
  964. errFlag = true //异常
  965. }
  966. } else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个
  967. if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3%
  968. errFlag = true //异常
  969. } else {
  970. if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个
  971. errFlag = true //异常
  972. } else {
  973. if qu.FormatDateByInt64(&tagTime, qu.Date_Short_Layout) == qu.FormatDateByInt64(&flagTime, qu.Date_Short_Layout) {
  974. errFlag = true
  975. } else {
  976. //系统打标记
  977. UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{
  978. {"code": sp.Code},
  979. {"$set": map[string]interface{}{
  980. "codetags." + NEWTASK_DOWNLOADERR: time.Now().Unix(),
  981. }},
  982. })
  983. }
  984. }
  985. }
  986. }
  987. if errFlag {
  988. q := map[string]interface{}{
  989. "$or": []interface{}{
  990. map[string]interface{}{ //state=-1下载失败
  991. "spidercode": sp.Code,
  992. "state": -1,
  993. "comeintime": map[string]interface{}{
  994. "$gte": StartTime,
  995. "$lt": EndTime,
  996. },
  997. },
  998. map[string]interface{}{ //state=0,times存在,前一天未下载成功的
  999. "spidercode": sp.Code,
  1000. "state": 0,
  1001. "times": map[string]interface{}{
  1002. "$exists": true,
  1003. },
  1004. "comeintime": map[string]interface{}{
  1005. "$gte": StartTime,
  1006. "$lt": EndTime,
  1007. },
  1008. },
  1009. },
  1010. }
  1011. sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q)
  1012. sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
  1013. if sp.ErrType < "0" {
  1014. sp.ErrType = NEWTASK_DOWNLOADERR
  1015. }
  1016. }
  1017. } else { //顺序采集
  1018. //查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计)
  1019. q := map[string]interface{}{
  1020. "spidercode": sp.Code,
  1021. "count": 0,
  1022. "state": -1,
  1023. "comeintime": map[string]interface{}{
  1024. "$gte": StartTime,
  1025. "$lt": EndTime,
  1026. },
  1027. }
  1028. count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q)
  1029. if count > 0 {
  1030. sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
  1031. if sp.ErrType < "0" {
  1032. sp.ErrType = NEWTASK_DOWNLOADERR
  1033. }
  1034. }
  1035. }
  1036. }
  1037. }
  1038. func dataInfoWarn(sp *NewSpider) {
  1039. defer qu.Catch()
  1040. if len(sp.WarnInfoMap) > 0 {
  1041. tagTime := int64(-1)
  1042. if sp.CodeTags != nil {
  1043. tagTime, _ = sp.CodeTags[NEWTASK_DATAINFOWARN].(int64)
  1044. } else { //无标记,记录列表页异常
  1045. tagTime = 0
  1046. }
  1047. if tagTime > -1 { //标记时间超时或无标记
  1048. errFlag := false
  1049. resultDescription := ""
  1050. for err, _ := range DataInfoWarnMap {
  1051. if wf := sp.WarnInfoMap[err]; wf != nil {
  1052. tmpDescription := ""
  1053. for field, href := range wf.Hrefs {
  1054. tmpDescription += " 字段" + field + ":" + href + "\n"
  1055. }
  1056. if tmpDescription != "" {
  1057. resultDescription += " " + wf.Info + "\n" + tmpDescription
  1058. }
  1059. errFlag = true
  1060. }
  1061. }
  1062. if errFlag {
  1063. //sp.Error[NEWTASK_DATAINFOWARN] = &ErrorInfo{
  1064. // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_DATAINFOWARN]: true},
  1065. //}
  1066. sp.ErrDescription += "数据异常警告:\n" + resultDescription
  1067. sp.ErrTypeMap[qu.IntAll(NEWTASK_DATAINFOWARN)] = true
  1068. if sp.ErrType < "0" {
  1069. sp.ErrType = NEWTASK_DATAINFOWARN
  1070. }
  1071. }
  1072. }
  1073. }
  1074. }
  1075. func (sp *NewSpider) getErrHrefs(coll, errType string, query map[string]interface{}) (count int) {
  1076. defer qu.Catch()
  1077. if coll == "spider_listdata" { //
  1078. count = util.MgoS.Count(coll, query)
  1079. } else {
  1080. count = sp.Detail_DownloadFailNum
  1081. }
  1082. if count == 0 {
  1083. return
  1084. }
  1085. sp.ErrDescription += LuaErrTypeInfo[NEWTASK_DOWNLOADERR] + ":共下载" + fmt.Sprint(sp.Detail_DownloadNum) + "条,失败" + fmt.Sprint(sp.Detail_DownloadFailNum) + "条\n"
  1086. if sp.Platform != "golua平台" || sp.Platform != "chrome" {
  1087. return
  1088. }
  1089. list, _ := util.MgoS.Find(coll, query, nil, `{"href":1}`, false, 0, 3)
  1090. if len(*list) > 0 {
  1091. //errHrefs := []*ErrRemark{}
  1092. for _, l := range *list {
  1093. href := qu.ObjToString(l["href"])
  1094. //errHrefs = append(errHrefs, &ErrRemark{Href: href})
  1095. sp.ErrDescription += " " + href + "\n"
  1096. }
  1097. //sp.Error[errType] = &ErrorInfo{
  1098. // Num: sp.Detail_DownloadFailNum,
  1099. // Err: errHrefs,
  1100. // ErrInfo: map[string]bool{LuaErrTypeInfo[errType]: true},
  1101. //}
  1102. }
  1103. return
  1104. }
  1105. //更新爬虫
  1106. func updateLuaconfig() {
  1107. if len(UpdateLuaconfig) > 0 {
  1108. util.MgoEB.UpdateBulk("luaconfig", UpdateLuaconfig...)
  1109. UpdateLuaconfig = [][]map[string]interface{}{}
  1110. }
  1111. }
  1112. //关闭任务
  1113. func closeTask() {
  1114. defer qu.Catch()
  1115. query := map[string]interface{}{ //关闭7天未转为待处理的下载异常,数据异常警告类型的任务
  1116. "l_comeintime": map[string]interface{}{
  1117. "$lte": util.GetTime(-7),
  1118. },
  1119. "i_state": 0,
  1120. "s_type": map[string]interface{}{
  1121. "$in": []string{"5", "6"},
  1122. },
  1123. }
  1124. set := map[string]interface{}{
  1125. "$set": map[string]interface{}{
  1126. "l_closetime": time.Now().Unix(),
  1127. },
  1128. }
  1129. util.MgoEB.Update("task", query, set, false, true)
  1130. }
  1131. /*
  1132. 1、列表页统计的是当天心跳,提前告警。如果当天心跳有问题呢?
  1133. 2、下载异常由于原网站详情页无信息造成的,如何提高任务准确率?
  1134. 3、7410变链接造成的采集频率异常如何解决?
  1135. */
  1136. //func downloadFailedErr(sp *NewSpider) {
  1137. // defer qu.Catch()
  1138. // if sp.Platform == "python" && !sp.Py_IsValid {
  1139. // return
  1140. // }
  1141. // if sp.Detail_DownloadFailNum > 0 {
  1142. // tagTime := int64(-1)
  1143. // if sp.CodeTags != nil {
  1144. // tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64)
  1145. // } else { //无标记,记录列表页异常
  1146. // tagTime = 0
  1147. // }
  1148. // if tagTime > -1 {
  1149. // if sp.Model == 1 { //分开采集(python爬虫默认分开采集模式)
  1150. // errFlag := false
  1151. // if sp.Detail_DownloadNum < 100 { //下载总量小于100
  1152. // if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个
  1153. // errFlag = true //异常
  1154. // } else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20%
  1155. // errFlag = true //异常
  1156. // }
  1157. // } else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个
  1158. // if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3%
  1159. // errFlag = true //异常
  1160. // } else {
  1161. // if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个
  1162. // errFlag = true //异常
  1163. // } else {
  1164. // tagFlag := tagTime == util.GetTime(-7) //上次标记时间是否是7天前当天
  1165. // if tagTime == 0 || !tagFlag { //系统打标记
  1166. // //系统打标记
  1167. // } else if tagFlag {
  1168. // errFlag = true //异常
  1169. // }
  1170. // }
  1171. // }
  1172. // }
  1173. // if errFlag {
  1174. // q := map[string]interface{}{
  1175. // "$or": []interface{}{
  1176. // map[string]interface{}{ //state=-1下载失败
  1177. // "spidercode": sp.Code,
  1178. // "state": -1,
  1179. // "comeintime": map[string]interface{}{
  1180. // "$gte": StartTime,
  1181. // "$lt": EndTime,
  1182. // },
  1183. // },
  1184. // map[string]interface{}{ //state=0,times存在,前一天未下载成功的
  1185. // "spidercode": sp.Code,
  1186. // "state": 0,
  1187. // "times": map[string]interface{}{
  1188. // "$exists": true,
  1189. // },
  1190. // "comeintime": map[string]interface{}{
  1191. // "$gte": StartTime,
  1192. // "$lt": EndTime,
  1193. // },
  1194. // },
  1195. // },
  1196. // }
  1197. // sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q)
  1198. // sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
  1199. // if sp.ErrType < 0 {
  1200. // sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
  1201. // }
  1202. // }
  1203. // } else { //顺序采集
  1204. // //查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计)
  1205. // q := map[string]interface{}{
  1206. // "spidercode": sp.Code,
  1207. // "count": 0,
  1208. // "state": -1,
  1209. // "comeintime": map[string]interface{}{
  1210. // "$gte": StartTime,
  1211. // "$lt": EndTime,
  1212. // },
  1213. // }
  1214. // count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q)
  1215. // if count > 0 {
  1216. // sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
  1217. // if sp.ErrType < 0 {
  1218. // sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
  1219. // }
  1220. // }
  1221. // }
  1222. // }
  1223. // }
  1224. //}