spider.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "fmt"
  8. elc "gopkg.in/olivere/elastic/v7"
  9. "io"
  10. "log"
  11. mgo "mongodb"
  12. qu "qfw/util"
  13. es "qfw/util/elastic.v7"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "regexp"
  18. util "spiderutil"
  19. "sync/atomic"
  20. "time"
  21. "github.com/donnie4w/go-logger/logger"
  22. "github.com/yuin/gopher-lua"
  23. )
  24. // Heart 心跳
  25. type Heart struct {
  26. DetailHeart int64 //爬虫三级页执行心跳
  27. DetailExecuteHeart int64 //三级页采集到数据心跳
  28. FindListHeart int64 //findListHtml执行心跳
  29. ListHeart int64 //爬虫列表页执行心跳
  30. ModifyUser string //爬虫维护人
  31. Site string //站点
  32. Channel string //栏目
  33. }
  34. // SpiderFlow 流量
  35. type SpiderFlow struct {
  36. Flow int64 //流量
  37. ModifyUser string //爬虫维护人
  38. Site string //站点
  39. Channel string //栏目
  40. //Code string
  41. }
  42. // Spider 爬虫
  43. type Spider struct {
  44. Script
  45. Code string //代码
  46. Name string //名称
  47. Channel string //站点
  48. DownDetail bool //是否下载详细页
  49. Stop bool //停止标志
  50. Pass bool //暂停标志
  51. LastPubshTime int64 //最后发布时间
  52. LastHeartbeat int64 //最后心跳时间
  53. SpiderRunRate int64 //执行频率
  54. ExecuteOkTime int64 //任务执行成功/完成时间
  55. Collection string //写入表名
  56. Thread int64 //线程数
  57. LastExecTime int64 //最后执行时间
  58. LastDowncount int32 //最后一次下载量
  59. TodayDowncount int32 //今日下载量
  60. YesterdayDowncount int32 //昨日下载量
  61. TotalDowncount int32 //总下载量
  62. RoundCount int32 //执行轮次
  63. StoreMode int //存储模式
  64. StoreToMsgEvent int //消息类型
  65. CoverAttr string //按属性判重数据
  66. SleepBase int //基本延时
  67. SleepRand int //随机延时
  68. TargetChannelUrl string //栏目页地址
  69. UpperLimit, LowerLimit int //正常值上限、下限
  70. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  71. MUserName, MUserEmail string //维护人,维护人邮箱
  72. //Index int //数组索引
  73. //历史补漏
  74. IsHistoricalMend bool //是否是历史补漏爬虫
  75. IsMustDownload bool //是否强制下载
  76. IsCompete bool //区分新老爬虫
  77. Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
  78. IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程)
  79. }
  80. var (
  81. Es *es.Elastic
  82. EsIndex string
  83. EsType string
  84. MgoS *mgo.MongodbSim
  85. MgoEB *mgo.MongodbSim
  86. TimeChan = make(chan bool, 1)
  87. Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  88. RestrictAccessReg = regexp.MustCompile(`访问被拒绝`)
  89. //DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`)
  90. //RepDomainNameReg = regexp.MustCompile(`[::/]+`)
  91. //Today string
  92. //SpiderFlowMap = sync.Map{} //code:{"2022-05-16":SpiderFlow}
  93. AllThreadNum int64
  94. DelaySiteMap map[string]*DelaySite //延迟采集站点集合
  95. //UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  96. //SP = make(chan bool, 5)
  97. //SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据
  98. //SPS = make(chan bool, 5)
  99. UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
  100. SPH = make(chan bool, 5)
  101. DataBakSaveCache = make(chan map[string]interface{}, 1000) //保存采集信息详情页记录
  102. DB_CH = make(chan bool, 5)
  103. )
  104. type DelaySite struct {
  105. DelayTime int
  106. Compete bool
  107. }
  108. //心跳
  109. func UpdateHeart(site, channel, code, user, t string) {
  110. //sp, spiderOk := LoopListPath.Load(code)
  111. //if spiderOk && sp != nil {
  112. if htmp, ok := SpiderHeart.Load(code); ok {
  113. if heart, ok := htmp.(*Heart); ok {
  114. if t == "list" {
  115. heart.ListHeart = time.Now().Unix()
  116. } else if t == "findlist" {
  117. heart.FindListHeart = time.Now().Unix()
  118. } else if t == "detail" {
  119. heart.DetailHeart = time.Now().Unix()
  120. } else if t == "detailexcute" {
  121. heart.DetailExecuteHeart = time.Now().Unix()
  122. }
  123. }
  124. } else {
  125. heart := &Heart{
  126. ModifyUser: user,
  127. Site: site,
  128. Channel: channel,
  129. }
  130. if t == "list" {
  131. heart.ListHeart = time.Now().Unix()
  132. } else if t == "findlist" {
  133. heart.FindListHeart = time.Now().Unix()
  134. } else if t == "detail" {
  135. heart.DetailHeart = time.Now().Unix()
  136. } else if t == "detailexcute" {
  137. heart.DetailExecuteHeart = time.Now().Unix()
  138. }
  139. SpiderHeart.Store(code, heart)
  140. }
  141. //}
  142. }
  143. //任务
  144. func (s *Spider) StartJob() {
  145. s.Stop = false
  146. s.Pass = false
  147. s.RoundCount++
  148. go s.ExecJob(false)
  149. }
  150. //单次执行
  151. func (s *Spider) ExecJob(reload bool) {
  152. defer func() {
  153. size_ok, size_no := 0, 0
  154. size_no_index := []interface{}{}
  155. LoopListPath.Range(func(k, v interface{}) bool {
  156. if v != nil {
  157. size_ok++
  158. } else {
  159. size_no_index = append(size_no_index, k)
  160. size_no++
  161. }
  162. return true
  163. })
  164. logger.Debug(s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",下线数量:", size_no, ",下线爬虫:", size_no_index)
  165. s.ExecuteOkTime = time.Now().Unix()
  166. util.TimeSleepFunc(5*time.Second, TimeSleepChan)
  167. if util.Config.Working == 1 {
  168. s.Stop = true
  169. if _, b := Allspiders.Load(s.Code); b {
  170. Allspiders.Store(s.Code, s)
  171. }
  172. s.L.Close()
  173. CC <- s.L
  174. }
  175. }()
  176. if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
  177. s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
  178. }
  179. logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
  180. s.LastDowncount = 0
  181. s.LastExecTime = time.Now().Unix()
  182. s.LastHeartbeat = time.Now().Unix()
  183. s.ExecuteOkTime = 0
  184. err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
  185. if err != nil {
  186. logger.Error(s.Code, err)
  187. }
  188. err = s.DownListPageItem() //下载列表
  189. if err != nil {
  190. logger.Error(s.Code, err)
  191. }
  192. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
  193. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  194. SpiderCodeSendToEditor(s.Code) //历史转增量爬虫发送编辑器,切换节点上下架
  195. return
  196. } else {
  197. if util.Config.Working == 0 { //高性能模式
  198. /*
  199. for !s.Stop && s.Pass {
  200. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  201. }
  202. if s.Stop {
  203. return
  204. }
  205. */
  206. //if s.IsMustDownload { //历史数据下载,只跑一轮
  207. if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
  208. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  209. b := MgoEB.Update("luaconfig", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
  210. logger.Info("Delete History Code:", s.Code, b)
  211. } else {
  212. if !s.Stop { //未下架定时执行
  213. util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
  214. s.ExecJob(true)
  215. }, TimeChan)
  216. // util.TimeAfterFunc(30*time.Second, func() {
  217. // s.ExecJob(true)
  218. // }, TimeChan)
  219. } else { //下架后子线程退出
  220. return
  221. }
  222. }
  223. } else { //排队模式
  224. return
  225. }
  226. }
  227. }
  228. //获取最新时间--作为最后更新时间
  229. func (s *Spider) GetLastPublishTime() (errs interface{}) {
  230. defer qu.Catch()
  231. var lastpublishtime string
  232. //取得最后更新时间
  233. if err := s.L.CallByParam(lua.P{
  234. Fn: s.L.GetGlobal("getLastPublishTime"),
  235. NRet: 1,
  236. Protect: true,
  237. }); err != nil {
  238. //panic(s.Code + "," + err.Error())
  239. log.Println(s.Code + "," + err.Error())
  240. errs = err.Error()
  241. atomic.AddInt32(&s.Script.ErrorNum, 1)
  242. return errs
  243. }
  244. ret := s.L.Get(-1)
  245. s.L.Pop(1)
  246. if str, ok := ret.(lua.LString); ok {
  247. lastpublishtime = string(str)
  248. }
  249. if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
  250. //防止发布时间超前
  251. if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
  252. s.LastPubshTime = time.Now().Unix()
  253. } else {
  254. s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
  255. }
  256. }
  257. return nil
  258. }
  259. //下载列表
  260. func (s *Spider) DownListPageItem() (errs interface{}) {
  261. defer qu.Catch()
  262. s.AlreadyGetPageHeart = map[int]bool{} //重置记录
  263. start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
  264. s.MaxPage = max //
  265. //tmpMax := max //临时记录最大页
  266. repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
  267. downloadAllNum := 0 //本轮采集tmpMax页总个数
  268. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
  269. max = s.GetIntVar("spiderHistoryMaxPage")
  270. }
  271. downtimes := 0 //记录某页重试次数(暂定3次)
  272. repeatPageNum := 0 //记录列表页所有连接重复的页码
  273. repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
  274. repeatPageTimesLimit := 10 //记录页码连续判重的次数上线(高性能模式10页,队列模式5页)
  275. isRunRepeatList := false //是否执行列表页连续判重
  276. if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
  277. isRunRepeatList = true
  278. max = 100 //高性能模式设置最大页为100
  279. if util.Config.Working == 1 { //队列模式
  280. repeatPageTimesLimit = 3 //连续判重页3
  281. max = 50 //队列模式最大页50
  282. }
  283. }
  284. for ; start <= max && !s.Stop; start++ {
  285. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  286. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
  287. }
  288. //logger.Info("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
  289. //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
  290. // break
  291. //}
  292. if isRunRepeatList && repeatPageTimes >= repeatPageTimesLimit { //重复次数超过10次,不再翻页
  293. break
  294. }
  295. if err := s.L.CallByParam(lua.P{
  296. Fn: s.L.GetGlobal("downloadAndParseListPage"),
  297. NRet: 1,
  298. Protect: true,
  299. }, lua.LNumber(start)); err != nil {
  300. //panic(s.Code + "," + err.Error())
  301. logger.Error("列表页采集报错", start, s.Code+","+err.Error())
  302. errs = err.Error()
  303. atomic.AddInt32(&s.Script.ErrorNum, 1)
  304. //列表页采集报错进行重试,超过重试次数视为该页已采
  305. if downtimes < 2 {
  306. downtimes++
  307. start--
  308. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  309. } else if isRunRepeatList { //超过重试次数,视为本页重复
  310. if repeatPageNum+1 == start {
  311. repeatPageTimes++ //次数加1
  312. } else {
  313. repeatPageTimes = 0 //重复次数重置0
  314. }
  315. repeatPageNum = start //赋值页码
  316. downtimes = 0
  317. }
  318. continue
  319. }
  320. lv := s.L.Get(-1)
  321. s.L.Pop(1)
  322. if tbl, ok := lv.(*lua.LTable); ok {
  323. list := []map[string]interface{}{}
  324. //qu.Debug("当前页数据量:", tbl.Len())
  325. if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
  326. repeatListNum := 0 // 当前列表页连接重复个数
  327. for i := 1; i <= tabLen; i++ {
  328. v := tbl.RawGetInt(i).(*lua.LTable)
  329. tmp := util.TableToMap(v)
  330. //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据
  331. if !s.IsHistoricalMend { //不是历史补漏
  332. tmp["dataging"] = 0 //数据中打标记dataging=0
  333. if s.DownDetail {
  334. s.DownloadDetailItem(tmp, &repeatListNum)
  335. } /*else {//暂无此类爬虫
  336. tmp["comeintime"] = time.Now().Unix()
  337. //atomic.AddInt32(&s.LastDowncount, 1)
  338. //atomic.AddInt32(&s.TodayDowncount, 1)
  339. //atomic.AddInt32(&s.TotalDowncount, 1)
  340. href := fmt.Sprint(tmp["href"])
  341. if len(href) > 5 { //有效数据
  342. hashHref := util.HexText(href)
  343. util.RedisClusterSet(hashHref, "", -1) //全量redis
  344. list = append(list, tmp)
  345. }
  346. }*/
  347. } else { //历史补漏
  348. s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
  349. }
  350. }
  351. //if start <= tmpMax { //数量赋值
  352. repeatAllNum += repeatListNum
  353. downloadAllNum += tabLen
  354. //}
  355. //if start > tmpMax && isRunRepeatList { //执行连续页码判重
  356. if isRunRepeatList { //执行连续页码判重
  357. if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
  358. //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
  359. if repeatPageNum+1 == start || repeatPageNum == 0 {
  360. repeatPageTimes++ //次数加1
  361. } else {
  362. repeatPageTimes = 0 //重复次数重置0
  363. }
  364. repeatPageNum = start //赋值页码
  365. } else { //当前start页有遗漏数据
  366. repeatPageTimes = 0
  367. repeatPageNum = 0
  368. }
  369. }
  370. if !s.IsHistoricalMend && !s.DownDetail {
  371. if len(list) > 0 { //保存信息入库
  372. StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
  373. }
  374. }
  375. } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
  376. if downtimes < 2 {
  377. downtimes++
  378. start--
  379. continue
  380. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  381. } else if isRunRepeatList { //超过重试次数,视为本页重复
  382. if repeatPageNum+1 == start {
  383. repeatPageTimes++ //次数加1
  384. } else {
  385. repeatPageTimes = 0 //重复次数重置0
  386. }
  387. repeatPageNum = start //赋值页码
  388. }
  389. }
  390. } else { //请求当前列表页失败
  391. if downtimes < 2 {
  392. downtimes++
  393. start--
  394. continue
  395. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  396. } else if isRunRepeatList { //超过重试次数,视为本页重复
  397. if repeatPageNum+1 == start {
  398. repeatPageTimes++ //次数加1
  399. } else {
  400. repeatPageTimes = 0 //重复次数重置0
  401. }
  402. repeatPageNum = start //赋值页码
  403. }
  404. }
  405. downtimes = 0 //当前页下载无误,重置下载重试次数
  406. util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
  407. }
  408. logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start, s.Stop)
  409. if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
  410. nowTime := time.Now()
  411. sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  412. set := map[string]interface{}{
  413. "site": s.Name,
  414. "channel": s.Channel,
  415. "spidercode": s.Code,
  416. "updatetime": nowTime.Unix(),
  417. "event": util.Config.Uploadevent,
  418. "modifyuser": s.MUserName,
  419. "maxpage": s.MaxPage,
  420. "runrate": s.SpiderRunRate,
  421. "endpage": start,
  422. "date": sDate,
  423. }
  424. inc := map[string]interface{}{
  425. "alltimes": 1,
  426. }
  427. //记录翻页是否成功
  428. if s.PageOneTextHash != "" {
  429. if s.PageTwoTextHash != "" {
  430. if s.PageOneTextHash != s.PageTwoTextHash {
  431. inc["page_success"] = 1
  432. } else {
  433. inc["page_fail"] = 1
  434. }
  435. } else {
  436. inc["page_fail"] = 1
  437. }
  438. } else if s.PageTwoTextHash != "" {
  439. inc["page_onefail"] = 1
  440. }
  441. if downloadAllNum > 0 {
  442. rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
  443. rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
  444. if rate == 1.0 {
  445. inc["oh_percent"] = 1
  446. } else if rate >= 0.9 {
  447. inc["nt_percent"] = 1
  448. } else if rate >= 0.8 {
  449. inc["et_percent"] = 1
  450. } else {
  451. inc["other_percent"] = 1
  452. }
  453. if isRunRepeatList && start > max { //连续翻页超过了上限
  454. inc["uplimit"] = 1
  455. }
  456. } else {
  457. inc["zero"] = 1
  458. }
  459. query := map[string]interface{}{
  460. "date": sDate,
  461. "spidercode": s.Code,
  462. }
  463. MgoS.Update("spider_downloadrate", query, map[string]interface{}{
  464. "$set": set,
  465. "$inc": inc,
  466. }, true, false)
  467. }
  468. //信息重置
  469. s.RecordedHeartInfo = false
  470. s.PageOneTextHash = ""
  471. s.PageTwoTextHash = ""
  472. return errs
  473. }
  474. //站点信息统计
  475. //func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
  476. // defer qu.Catch()
  477. // href := qu.ObjToString(tmp["href"])
  478. // url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "")
  479. // href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "")
  480. // if url_dn != href_dn {
  481. // SaveMgoCache <- map[string]interface{}{
  482. // "site": s.Name,
  483. // "channel": s.Channel,
  484. // "spidercode": s.Code,
  485. // "url": s.TargetChannelUrl,
  486. // "href": href,
  487. // "modifyuser": s.MUserName,
  488. // "comeintime": time.Now().Unix(),
  489. // }
  490. // }
  491. //}
  492. //遍历,开启三级页下载(历史补漏)
  493. func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
  494. //qu.Debug("--------------历史下载-----------------")
  495. defer qu.Catch()
  496. var err interface{}
  497. data := map[string]interface{}{}
  498. paramdata := p.(map[string]interface{})
  499. for k, v := range paramdata {
  500. data[k] = v
  501. }
  502. href := qu.ObjToString(data["href"])
  503. if len(href) <= 5 { //无效数据
  504. return
  505. }
  506. hashHref := util.HexText(href)
  507. isExist := util.RedisExist("list", "list_"+hashHref)
  508. //logger.Debug("full href:", href, " isExist:", isExist)
  509. if !s.IsMustDownload { //非强制下载
  510. if isExist { //数据存在,直接return
  511. return
  512. } else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata
  513. num := 0
  514. SaveHighListPageData(paramdata, hashHref, &num)
  515. return
  516. }
  517. } else { //当前不支持强制下载
  518. return
  519. }
  520. //2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫)
  521. id := ""
  522. isEsRepeat := false
  523. if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
  524. title := qu.ObjToString(paramdata["title"])
  525. eTime := time.Now().Unix()
  526. sTime := eTime - int64(7*86400)
  527. //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  528. esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
  529. if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
  530. isEsRepeat = true
  531. }
  532. }
  533. SaveListPageData(paramdata, &id, isEsRepeat) //存储采集记录
  534. if isEsRepeat { //类竞品数据title判重数据加入redis
  535. util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
  536. util.AddBloomRedis("href", href)
  537. return
  538. }
  539. //qu.Debug("----------------下载、解析、入库--------------------")
  540. //下载详情页
  541. data, err = s.DownloadDetailPage(paramdata, data)
  542. if err != nil || data == nil { //下载失败,结束
  543. if err != nil {
  544. logger.Error(s.Code, err, paramdata)
  545. }
  546. //更新spider_listdata中数据下载失败标记
  547. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  548. return
  549. }
  550. util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //采集成功,加入列表页redis
  551. //根据发布时间进行数据判重校验
  552. tmphref := qu.ObjToString(data["href"]) //取tmphref,三级页href替换导致前后href不同
  553. publishtime := qu.Int64All(data["l_np_publishtime"])
  554. if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据进行全量bloom redis href判重
  555. isExist, _ = util.ExistsBloomRedis("href", tmphref)
  556. if isExist {
  557. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
  558. return
  559. }
  560. }
  561. //详情页过滤数据
  562. set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
  563. if data["delete"] != nil {
  564. //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
  565. set["exist"] = "delete"
  566. //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  567. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
  568. return
  569. }
  570. //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
  571. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  572. //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
  573. //if tmphref := qu.ObjToString(data["href"]); tmphref != href {
  574. // util.AddBloomRedis("href", href)
  575. //}
  576. flag := true
  577. //publishtime := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
  578. if s.IsMustDownload { //强制下载
  579. if isExist && publishtime < time.Now().AddDate(0, 0, -5).Unix() {
  580. //qu.Debug("强制下载 redis存在")
  581. data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
  582. flag = false
  583. } else {
  584. //qu.Debug("强制下载 redis不存在")
  585. data["dataging"] = 0
  586. }
  587. } else { //非强制下载
  588. if !isExist {
  589. //qu.Debug("非强制下载 redis不存在")
  590. data["dataging"] = 0
  591. }
  592. }
  593. //if publishtime > time.Now().Unix() { //防止发布时间超前
  594. // data["publishtime"] = time.Now().Unix()
  595. //}
  596. delete(data, "state")
  597. delete(data, "exit")
  598. delete(data, "checkpublishtime")
  599. data["comeintime"] = time.Now().Unix()
  600. data["spidercode"] = s.Code
  601. //qu.Debug("--------------开始保存---------------")
  602. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  603. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
  604. //qu.Debug("--------------保存结束---------------")
  605. }
  606. //遍历,开启三级页下载(增量)
  607. func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
  608. defer qu.Catch()
  609. var err interface{}
  610. data := map[string]interface{}{}
  611. paramdata := p.(map[string]interface{})
  612. for k, v := range paramdata {
  613. data[k] = v
  614. }
  615. href := qu.ObjToString(data["href"])
  616. if len(href) <= 5 { //无效数据
  617. *num++ //视为已采集
  618. return
  619. }
  620. hashHref := util.HexText(href)
  621. //列表页redis判重
  622. isExist := util.RedisExist("list", "list_"+hashHref)
  623. if isExist {
  624. *num++ //已采集
  625. return
  626. }
  627. id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
  628. //if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //列表页、详情页分开采集模式节点和7000节点新爬虫采集的数据数据
  629. if util.Config.Modal == 1 || util.Config.IsHistoryEvent { //分开采集模式和历史节点(7000)
  630. SaveHighListPageData(paramdata, hashHref, num) //存表
  631. return
  632. } else {
  633. if !s.Stop {
  634. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
  635. }
  636. isEsRepeat := false
  637. if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
  638. title := qu.ObjToString(paramdata["title"])
  639. eTime := time.Now().Unix()
  640. sTime := eTime - int64(7*86400)
  641. //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  642. esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
  643. if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
  644. isEsRepeat = true
  645. }
  646. }
  647. SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息
  648. if isEsRepeat { //类竞品数据title判重数据加入redis
  649. util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
  650. util.AddBloomRedis("href", href)
  651. return
  652. }
  653. }
  654. //下载详情页
  655. data, err = s.DownloadDetailPage(paramdata, data)
  656. if err != nil || data == nil {
  657. *num++ //顺序采集模式,在记录重复数据个数时,采集失败记为重复(避免下载失败数据每轮次采集都不会被判重,造成全采次数+1)
  658. if err != nil {
  659. logger.Error(s.Code, err, paramdata)
  660. //if len(paramdata) > 0 {
  661. // SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
  662. //}
  663. }
  664. //更新spider_listdata中数据下载失败标记
  665. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}})
  666. return
  667. } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  668. util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
  669. }*/
  670. util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //加入列表页redis
  671. //根据发布时间进行数据判重校验
  672. tmphref := qu.ObjToString(data["href"])
  673. publishtime := qu.Int64All(data["l_np_publishtime"])
  674. //7410节点(变链接节点)或者一年前数据进行全量bloomredis href判重
  675. if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
  676. isExist, _ = util.ExistsBloomRedis("href", tmphref)
  677. if isExist {
  678. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
  679. return
  680. }
  681. }
  682. //详情页下载数据成功心跳
  683. if !s.Stop {
  684. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
  685. }
  686. set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
  687. //详情页过滤数据
  688. if data["delete"] != nil {
  689. //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
  690. set["exist"] = "delete"
  691. //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  692. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
  693. return
  694. }
  695. set["byid"] = id
  696. //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
  697. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  698. //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
  699. //if tmphref := qu.ObjToString(data["href"]); tmphref != href {
  700. // util.AddBloomRedis("href", href)
  701. //}
  702. delete(data, "state")
  703. delete(data, "exit")
  704. delete(data, "checkpublishtime")
  705. data["comeintime"] = time.Now().Unix()
  706. data["spidercode"] = s.Code
  707. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  708. data["infoformat"] = s.Infoformat //爬虫类型
  709. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  710. }
  711. //遍历下载名录
  712. func (s *Spider) DownloadDetailByNames(p interface{}) {
  713. defer qu.Catch()
  714. var err interface{}
  715. /*
  716. if s.Stop {
  717. return
  718. }
  719. for s.Pass {
  720. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  721. }
  722. */
  723. data := map[string]interface{}{}
  724. paramdata := p.(map[string]interface{})
  725. for k, v := range paramdata {
  726. data[k] = v
  727. }
  728. if s.DownDetail {
  729. href := qu.ObjToString(data["href"])
  730. if href == "" || len(href) < 5 { //无效数据
  731. return
  732. }
  733. //下载、解析、入库
  734. data, err = s.DownloadDetailPage(paramdata, data)
  735. if err != nil {
  736. logger.Error(s.Code, paramdata, err)
  737. return
  738. }
  739. }
  740. data["comeintime"] = time.Now().Unix()
  741. //atomic.AddInt32(&s.LastDowncount, 1)
  742. //atomic.AddInt32(&s.TodayDowncount, 1)
  743. //atomic.AddInt32(&s.TotalDowncount, 1)
  744. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  745. }
  746. //下载解析详情页
  747. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  748. defer qu.Catch()
  749. s.LastHeartbeat = time.Now().Unix()
  750. util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  751. tab := s.L.NewTable()
  752. for k, v := range param {
  753. if val, ok := v.(string); ok {
  754. tab.RawSet(lua.LString(k), lua.LString(val))
  755. } else if val, ok := v.(int64); ok {
  756. tab.RawSet(lua.LString(k), lua.LNumber(val))
  757. } else if val, ok := v.(int32); ok {
  758. tab.RawSet(lua.LString(k), lua.LNumber(val))
  759. } else if val, ok := v.(float64); ok {
  760. tab.RawSet(lua.LString(k), lua.LNumber(val))
  761. } else if val, ok := v.(float32); ok {
  762. tab.RawSet(lua.LString(k), lua.LNumber(val))
  763. } else if val, ok := v.(bool); ok {
  764. tab.RawSet(lua.LString(k), lua.LBool(val))
  765. }
  766. }
  767. var err error
  768. if err = s.L.CallByParam(lua.P{
  769. Fn: s.L.GetGlobal("downloadDetailPage"),
  770. NRet: 1,
  771. Protect: true,
  772. }, tab); err != nil {
  773. //panic(s.Code + "," + err.Error())
  774. log.Println(s.Code + "," + err.Error())
  775. atomic.AddInt32(&s.Script.ErrorNum, 1)
  776. return data, err
  777. }
  778. lv := s.L.Get(-1)
  779. s.L.Pop(1)
  780. //拼map
  781. if v3, ok := lv.(*lua.LTable); ok {
  782. v3.ForEach(func(k, v lua.LValue) {
  783. if tmp, ok := k.(lua.LString); ok {
  784. key := string(tmp)
  785. if value, ok := v.(lua.LString); ok {
  786. data[key] = string(value)
  787. } else if value, ok := v.(lua.LNumber); ok {
  788. data[key] = int64(value)
  789. } else if value, ok := v.(*lua.LTable); ok {
  790. tmp := util.TableToMap(value)
  791. data[key] = tmp
  792. }
  793. }
  794. })
  795. return data, err
  796. } else {
  797. return nil, err
  798. }
  799. }
  800. //高性能模式定时采集三级页信息
  801. func DetailData() {
  802. defer qu.Catch()
  803. <-InitAllLuaOver //脚本加载完毕,执行
  804. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  805. GetListDataDownloadDetail()
  806. }
  807. }
  808. func GetListDataDownloadDetail() {
  809. defer qu.Catch()
  810. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  811. Allspiders2.Range(func(k, v interface{}) bool {
  812. sp := v.(*Spider)
  813. go sp.DownloadHighDetail(true)
  814. time.Sleep(1 * time.Second)
  815. return true
  816. })
  817. }
  818. //高性能模式根据列表页数据下载三级页
  819. func (s *Spider) DownloadHighDetail(reload bool) {
  820. defer qu.Catch()
  821. for {
  822. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  823. if !s.Stop { //爬虫是运行状态
  824. s.DownloadDetail(reload, false)
  825. } else {
  826. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  827. break
  828. }
  829. }
  830. }
  831. //队列模式根据列表页数据下载三级页
  832. func (s *Spider) DownloadListDetail(reload bool) {
  833. defer qu.Catch()
  834. s.DownloadDetail(reload, false)
  835. //队列模式爬虫下载完三级页数据或无下载数据,使用后close
  836. s.Stop = true
  837. if _, b := Allspiders2.Load(s.Code); b {
  838. Allspiders2.Store(s.Code, s)
  839. }
  840. s.L.Close()
  841. CC2 <- s.L
  842. }
  843. //下载详情页
  844. func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
  845. defer qu.Catch()
  846. coll := "spider_highlistdata"
  847. isEsRepeat := false //是否进行es判重
  848. q := map[string]interface{}{
  849. "spidercode": s.Code,
  850. "state": 0, //0:入库状态;-1:采集失败;1:成功
  851. }
  852. o := map[string]interface{}{"_id": -1}
  853. if !isHistory { //非历史数据下载,补充comeintime时间检索条件
  854. comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
  855. if delaySite := DelaySiteMap[s.Name]; delaySite != nil {
  856. isEsRepeat = delaySite.Compete
  857. if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
  858. //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
  859. comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delaySite.DelayTime)
  860. }
  861. }
  862. q["comeintime"] = comeintimeQuery
  863. } else {
  864. coll = "spider_historydata"
  865. o["_id"] = 1 //历史数据正序
  866. }
  867. f := map[string]interface{}{
  868. "state": 0,
  869. "comeintime": 0,
  870. "event": 0,
  871. }
  872. if !isHistory && !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  873. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  874. }
  875. countNum := MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数
  876. if isHistory && countNum == 0 { //下载历史数据量为0,手动stop
  877. s.Stop = true
  878. return
  879. }
  880. //logger.Info("Thread Info: Code:", s.SCode, " count:", countNum)
  881. if countNum > 0 {
  882. threadNum := countNum / util.Config.ThreadBaseNum //线程数
  883. if threadNum > util.Config.ThreadUpperLimit { //设置单个爬虫线程上限
  884. threadNum = util.Config.ThreadUpperLimit
  885. }
  886. logger.Info("Thread Info: Code:", s.SCode, " count:", countNum, " thread num:", threadNum)
  887. list, _ := MgoS.Find(coll, q, o, f, false, 0, 200)
  888. if list != nil && len(*list) > 0 {
  889. spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数)
  890. if threadNum > 1 { //初始化多个sp
  891. if !isHistory {
  892. if v, ok := LoopListPath.Load(s.Code); ok && v != nil {
  893. if info, ok := v.(map[string]string); ok {
  894. NewSpiderByScript(threadNum, s.Code, info, spChan)
  895. } else {
  896. logger.Debug("LoopListPath Not Has Code:", s.Code)
  897. spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
  898. }
  899. } else {
  900. logger.Debug("LoopListPath Not Has Code:", s.Code)
  901. spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
  902. }
  903. } else {
  904. info := map[string]string{
  905. "script": s.ScriptFile,
  906. "createuser": s.UserName,
  907. "createuseremail": s.UserEmail,
  908. "modifyuser": s.MUserName,
  909. "modifyemail": s.MUserEmail,
  910. }
  911. NewSpiderByScript(threadNum, s.Code, info, spChan)
  912. }
  913. }
  914. spChan <- s //主线程sp放入通道
  915. wg := &sync.WaitGroup{}
  916. spLock := &sync.Mutex{}
  917. updateArr := [][]map[string]interface{}{}
  918. for _, tmp := range *list {
  919. spTmp := <-spChan //通道中取出sp对象
  920. wg.Add(1)
  921. atomic.AddInt64(&AllThreadNum, 1)
  922. go func(tmp map[string]interface{}, sp *Spider) {
  923. defer func() {
  924. spChan <- sp //处理完数据sp对象放回通道中
  925. wg.Done()
  926. atomic.AddInt64(&AllThreadNum, -1)
  927. }()
  928. if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据
  929. return
  930. }
  931. _id := tmp["_id"]
  932. query := map[string]interface{}{"_id": _id}
  933. href := qu.ObjToString(tmp["href"])
  934. //hashHref := util.HexText(href)
  935. update := []map[string]interface{}{}
  936. if isEsRepeat { //es数据title判重
  937. title := qu.ObjToString(tmp["title"])
  938. eTime := time.Now().Unix()
  939. sTime := eTime - int64(7*86400)
  940. //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  941. esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
  942. count := Es.Count(EsIndex, EsType, esQuery)
  943. if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
  944. util.AddBloomRedis("href", href)
  945. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "es", "updatetime": time.Now().Unix()}} //已存在state置为1
  946. update = append(update, query)
  947. update = append(update, set)
  948. spLock.Lock()
  949. updateArr = append(updateArr, update)
  950. spLock.Unlock()
  951. return
  952. }
  953. }
  954. times := qu.IntAll(tmp["times"]) //获取下载次数
  955. success := true //数据是否下载成功的标志
  956. delete(tmp, "_id")
  957. delete(tmp, "times")
  958. data := map[string]interface{}{}
  959. var err interface{}
  960. for k, v := range tmp {
  961. data[k] = v
  962. }
  963. //下载、解析、入库
  964. data, err = sp.DownloadDetailPage(tmp, data)
  965. if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息
  966. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  967. }
  968. if err != nil || data == nil {
  969. success = false
  970. times++
  971. if err != nil {
  972. logger.Error(s.Code, err, tmp)
  973. //if len(tmp) > 0 && !isHistory { //下载历史数据时不保存错误信息
  974. // SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  975. //}
  976. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  977. DownloadErrorData(s.Code, tmp)
  978. }*/
  979. } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  980. util.RedisClusterSet(hashHref, "", -1)
  981. }*/
  982. if !success { //下载失败更新次数和状态
  983. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  984. if times >= 3 { //3次下载失败今天不再下载,state置为1
  985. ss["state"] = -1
  986. }
  987. set := map[string]interface{}{"$set": ss}
  988. update = append(update, query)
  989. update = append(update, set)
  990. spLock.Lock()
  991. updateArr = append(updateArr, update)
  992. spLock.Unlock()
  993. return
  994. } else if data["delete"] != nil { //三级页过滤
  995. //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
  996. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  997. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}}
  998. update = append(update, query)
  999. update = append(update, set)
  1000. spLock.Lock()
  1001. updateArr = append(updateArr, update)
  1002. spLock.Unlock()
  1003. return
  1004. }
  1005. //正文、附件分析,下载异常数据重新下载
  1006. if r := AnalysisProjectInfo(data); r != "" { //顺序采集暂不加此块判断(异常数据不会加redis,导致一直下载)
  1007. times++
  1008. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  1009. if times >= 3 { //3次下载失败今天不再下载,state置为-1
  1010. ss["state"] = -1
  1011. ss["detailfilerr"] = r
  1012. }
  1013. set := map[string]interface{}{"$set": ss}
  1014. update = append(update, query)
  1015. update = append(update, set)
  1016. spLock.Lock()
  1017. updateArr = append(updateArr, update)
  1018. spLock.Unlock()
  1019. return
  1020. }
  1021. //数据采集成功
  1022. //根据发布时间进行数据判重校验
  1023. tmphref := qu.ObjToString(data["href"])
  1024. publishtime := qu.Int64All(data["l_np_publishtime"])
  1025. if publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
  1026. isExist, _ := util.ExistsBloomRedis("href", tmphref)
  1027. if isExist {
  1028. set := map[string]interface{}{"$set": map[string]interface{}{
  1029. "state": 1,
  1030. "updatetime": time.Now().Unix(),
  1031. "exist": "bloom_href",
  1032. "tmphref": tmphref,
  1033. }}
  1034. update = append(update, query)
  1035. update = append(update, set)
  1036. spLock.Lock()
  1037. updateArr = append(updateArr, update)
  1038. spLock.Unlock()
  1039. return
  1040. }
  1041. }
  1042. delete(data, "exit")
  1043. delete(data, "checkpublishtime")
  1044. data["comeintime"] = time.Now().Unix()
  1045. data["spidercode"] = s.Code
  1046. data["dataging"] = 0
  1047. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  1048. data["infoformat"] = s.Infoformat //爬虫类型
  1049. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  1050. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
  1051. update = append(update, query)
  1052. update = append(update, set)
  1053. spLock.Lock()
  1054. updateArr = append(updateArr, update)
  1055. spLock.Unlock()
  1056. //到此数据下载完成
  1057. }(tmp, spTmp)
  1058. }
  1059. wg.Wait()
  1060. //更新数据
  1061. if len(updateArr) > 0 {
  1062. MgoS.UpdateBulk(coll, updateArr...)
  1063. updateArr = [][]map[string]interface{}{}
  1064. }
  1065. close(spChan) //关闭通道
  1066. //释放sp对象(保留主线程sp,IsMainThread=true)
  1067. for sp := range spChan {
  1068. if sp != nil && !sp.IsMainThread {
  1069. sp.L.Close()
  1070. }
  1071. }
  1072. if !s.Stop && reload { //高性能模式下载完三级页数据,sp对象需要重载
  1073. //重载主线程sp
  1074. s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
  1075. }
  1076. }
  1077. } else if reload { //高性能模式无数据sleep2分钟
  1078. time.Sleep(2 * time.Minute)
  1079. }
  1080. }
  1081. //初始化sp对象
  1082. func NewSpiderByScript(num int, code string, info map[string]string, spChan chan *Spider) {
  1083. for i := 1; i <= num; i++ {
  1084. spTmp, errstr := CreateSpider(code, info["script"], true, true)
  1085. if errstr == "" && spTmp != nil { //脚本加载成功
  1086. spTmp.UserName = info["createuser"]
  1087. spTmp.UserEmail = info["createuseremail"]
  1088. spTmp.MUserName = info["modifyuser"]
  1089. spTmp.MUserEmail = info["modifyemail"]
  1090. spChan <- spTmp
  1091. } else {
  1092. spChan <- nil
  1093. }
  1094. }
  1095. }
  1096. //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功
  1097. func AnalysisProjectInfo(data map[string]interface{}) string {
  1098. defer qu.Catch()
  1099. detail := qu.ObjToString(data["detail"])
  1100. if RestrictAccessReg.MatchString(detail) { //限制访问
  1101. return "ip"
  1102. }
  1103. if detail == "详情请访问原网页!" || detail == "<br/>详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页”
  1104. if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
  1105. if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
  1106. fileOk := false
  1107. for _, data := range attachments {
  1108. if d, ok := data.(map[string]interface{}); ok {
  1109. fid := qu.ObjToString(d["fid"])
  1110. if fid != "" { //附件上传成功
  1111. return ""
  1112. }
  1113. }
  1114. }
  1115. if !fileOk {
  1116. return "detail_file"
  1117. }
  1118. } else {
  1119. return "detail_file"
  1120. }
  1121. } else {
  1122. return "detail_file"
  1123. }
  1124. }
  1125. return ""
  1126. }
  1127. //打印线程数
  1128. func AllThreadLog() {
  1129. logger.Info("Detail Download All Thread:", AllThreadNum)
  1130. time.AfterFunc(1*time.Minute, AllThreadLog)
  1131. }
  1132. //获取hascode
  1133. func GetHas1(data string) string {
  1134. t := sha1.New()
  1135. io.WriteString(t, data)
  1136. hf := Reg.FindString(data)
  1137. if !strings.HasSuffix(hf, "/") {
  1138. hf = hf + "/"
  1139. }
  1140. return hf + fmt.Sprintf("%x", t.Sum(nil))
  1141. }