spider.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. mgo "mongodb"
  15. qu "qfw/util"
  16. mgu "qfw/util/mongodbutil"
  17. "strconv"
  18. //mgu "qfw/util/mongodbutil"
  19. //"qfw/util/redis"
  20. "regexp"
  21. util "spiderutil"
  22. "strings"
  23. "sync/atomic"
  24. "time"
  25. "github.com/donnie4w/go-logger/logger"
  26. "github.com/yuin/gopher-lua"
  27. )
  28. type Heart struct {
  29. DetailHeart int64 //爬虫三级页执行心跳
  30. DetailExecuteHeart int64 //三级页采集到数据心跳
  31. ListHeart int64 //爬虫列表页执行心跳
  32. ModifyUser string //爬虫维护人
  33. Site string //站点
  34. Channel string //栏目
  35. }
  36. //爬虫()
  37. type Spider struct {
  38. Script
  39. Code string //代码
  40. Name string //名称
  41. Channel string //站点
  42. DownDetail bool //是否下载详细页
  43. Stop bool //停止标志
  44. Pass bool //暂停标志
  45. LastPubshTime int64 //最后发布时间
  46. LastHeartbeat int64 //最后心跳时间
  47. SpiderRunRate int64 //执行频率
  48. ExecuteOkTime int64 //任务执行成功/完成时间
  49. Collection string //写入表名
  50. Thread int64 //线程数
  51. LastExecTime int64 //最后执行时间
  52. LastDowncount int32 //最后一次下载量
  53. TodayDowncount int32 //今日下载量
  54. YesterdayDowncount int32 //昨日下载量
  55. TotalDowncount int32 //总下载量
  56. RoundCount int32 //执行轮次
  57. StoreMode int //存储模式
  58. StoreToMsgEvent int //消息类型
  59. CoverAttr string //按属性判重数据
  60. SleepBase int //基本延时
  61. SleepRand int //随机延时
  62. TargetChannelUrl string //栏目页地址
  63. UpperLimit, LowerLimit int //正常值上限、下限
  64. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  65. MUserName, MUserEmail string //维护人,维护人邮箱
  66. Index int //数组索引
  67. //历史补漏
  68. IsHistoricalMend bool //是否是历史补漏爬虫
  69. IsMustDownload bool //是否强制下载
  70. IsCompete bool //区分新老爬虫
  71. }
  72. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  73. var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
  74. var SP = make(chan bool, 5)
  75. var SPH = make(chan bool, 5)
  76. var Mgo *mgo.MongodbSim
  77. var TimeChan = make(chan bool, 1)
  78. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  79. //心跳
  80. func UpdateHeart(site, channel, code, user, t string) {
  81. if htmp, ok := SpiderHeart.Load(code); ok {
  82. if heart, ok := htmp.(*Heart); ok {
  83. if t == "list" {
  84. heart.ListHeart = time.Now().Unix()
  85. } else if t == "detail" {
  86. heart.DetailHeart = time.Now().Unix()
  87. } else if t == "detailexcute" {
  88. heart.DetailExecuteHeart = time.Now().Unix()
  89. }
  90. }
  91. } else {
  92. heart := &Heart{
  93. ModifyUser: user,
  94. Site: site,
  95. Channel: channel,
  96. }
  97. if t == "list" {
  98. heart.ListHeart = time.Now().Unix()
  99. } else if t == "detail" {
  100. heart.DetailHeart = time.Now().Unix()
  101. } else if t == "detailexcute" {
  102. heart.DetailExecuteHeart = time.Now().Unix()
  103. }
  104. SpiderHeart.Store(code, heart)
  105. }
  106. }
  107. //任务
  108. func (s *Spider) StartJob() {
  109. s.Stop = false
  110. s.Pass = false
  111. s.RoundCount++
  112. go s.ExecJob(false)
  113. }
  114. //单次执行
  115. func (s *Spider) ExecJob(reload bool) {
  116. defer func() {
  117. size_ok, size_no := 0, 0
  118. size_no_index := []interface{}{}
  119. LoopListPath.Range(func(k, v interface{}) bool {
  120. if v != nil {
  121. size_ok++
  122. } else {
  123. size_no_index = append(size_no_index, k)
  124. size_no++
  125. }
  126. return true
  127. })
  128. logger.Debug("index_", s.Index, ",", s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",废弃数量:", size_no, ",废弃位置:", size_no_index)
  129. s.ExecuteOkTime = time.Now().Unix()
  130. util.TimeSleepFunc(5*time.Second, TimeSleepChan)
  131. if util.Config.Working == 1 {
  132. s.Stop = true
  133. if _, b := Allspiders.Load(s.Code); b {
  134. Allspiders.Store(s.Code, s)
  135. }
  136. s.L.Close()
  137. CC <- s.L
  138. }
  139. }()
  140. if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
  141. s.LoadScript(s.Code, s.ScriptFile, true)
  142. }
  143. logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
  144. s.LastDowncount = 0
  145. s.LastExecTime = time.Now().Unix()
  146. s.LastHeartbeat = time.Now().Unix()
  147. s.ExecuteOkTime = 0
  148. err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
  149. if err != nil {
  150. logger.Error(s.Code, err)
  151. }
  152. err = s.DownListPageItem() //下载列表
  153. if err != nil {
  154. logger.Error(s.Code, err)
  155. }
  156. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
  157. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  158. SpiderCodeSendToEditor(s.Code) //发送编辑器
  159. return
  160. } else {
  161. if util.Config.Working == 0 { //高性能模式
  162. /*
  163. for !s.Stop && s.Pass {
  164. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  165. }
  166. if s.Stop {
  167. return
  168. }
  169. */
  170. //if s.IsMustDownload { //历史数据下载,只跑一轮
  171. if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
  172. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  173. b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
  174. logger.Info("Delete History Code:", s.Code, b)
  175. } else {
  176. if !s.Stop { //未下架定时执行
  177. util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
  178. s.ExecJob(true)
  179. }, TimeChan)
  180. // util.TimeAfterFunc(30*time.Second, func() {
  181. // s.ExecJob(true)
  182. // }, TimeChan)
  183. } else { //下架后子线程退出
  184. return
  185. }
  186. }
  187. } else { //排队模式
  188. return
  189. }
  190. }
  191. }
  192. //获取最新时间--作为最后更新时间
  193. func (s *Spider) GetLastPublishTime() (errs interface{}) {
  194. defer mu.Catch()
  195. var lastpublishtime string
  196. //取得最后更新时间
  197. if err := s.L.CallByParam(lua.P{
  198. Fn: s.L.GetGlobal("getLastPublishTime"),
  199. NRet: 1,
  200. Protect: true,
  201. }); err != nil {
  202. //panic(s.Code + "," + err.Error())
  203. log.Println(s.Code + "," + err.Error())
  204. errs = err.Error()
  205. atomic.AddInt32(&s.Script.ErrorNum, 1)
  206. return errs
  207. }
  208. ret := s.L.Get(-1)
  209. s.L.Pop(1)
  210. if str, ok := ret.(lua.LString); ok {
  211. lastpublishtime = string(str)
  212. }
  213. if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
  214. //防止发布时间超前
  215. if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
  216. s.LastPubshTime = time.Now().Unix()
  217. } else {
  218. s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
  219. }
  220. }
  221. return nil
  222. }
  223. //下载列表
  224. func (s *Spider) DownListPageItem() (errs interface{}) {
  225. defer mu.Catch()
  226. start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
  227. tmpMax := max //临时记录最大页
  228. repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
  229. downloadAllNum := 0 //本轮采集tmpMax页总个数
  230. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
  231. max = s.GetIntVar("spiderHistoryMaxPage")
  232. }
  233. downtimes := 0 //记录某页重试次数(暂定3次)
  234. repeatPageNum := 0 //记录列表页所有连接重复的页码
  235. repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
  236. isRunRepeatList := false //是否执行列表页连续判重
  237. if util.Config.Modal == 1 && util.Config.Working == 0 && max > 1 && max < 101 { //7100 7400最大页小于101且大于1,对此部分爬虫采集列表页时进行连续5页判重
  238. isRunRepeatList = true
  239. max = 100 //设置最大页为100
  240. }
  241. for ; start <= max && !s.Stop; start++ {
  242. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
  243. //qu.Debug("重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
  244. if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
  245. break
  246. }
  247. if err := s.L.CallByParam(lua.P{
  248. Fn: s.L.GetGlobal("downloadAndParseListPage"),
  249. NRet: 1,
  250. Protect: true,
  251. }, lua.LNumber(start)); err != nil {
  252. //panic(s.Code + "," + err.Error())
  253. log.Println(s.Code + "," + err.Error())
  254. errs = err.Error()
  255. atomic.AddInt32(&s.Script.ErrorNum, 1)
  256. }
  257. lv := s.L.Get(-1)
  258. s.L.Pop(1)
  259. if tbl, ok := lv.(*lua.LTable); ok {
  260. list := []map[string]interface{}{}
  261. //qu.Debug("当前页数据量:", tbl.Len())
  262. if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
  263. repeatListNum := 0 // 当前列表页连接重复个数
  264. for i := 1; i <= tabLen; i++ {
  265. v := tbl.RawGetInt(i).(*lua.LTable)
  266. tmp := util.TableToMap(v)
  267. //新增历史补漏
  268. if !s.IsHistoricalMend { //不是历史补漏
  269. tmp["dataging"] = 0 //数据中打标记dataging=0
  270. if s.DownDetail {
  271. s.DownloadDetailItem(tmp, &repeatListNum)
  272. } else {
  273. tmp["comeintime"] = time.Now().Unix()
  274. atomic.AddInt32(&s.LastDowncount, 1)
  275. atomic.AddInt32(&s.TodayDowncount, 1)
  276. atomic.AddInt32(&s.TotalDowncount, 1)
  277. href := fmt.Sprint(tmp["href"])
  278. if len(href) > 5 { //有效数据
  279. db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
  280. hashHref := HexText(href)
  281. //增量(redis默认db0)
  282. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  283. //全量(判断是否已存在防止覆盖id)
  284. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  285. if !isExist {
  286. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  287. }
  288. list = append(list, tmp)
  289. }
  290. }
  291. } else { //历史补漏
  292. s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
  293. }
  294. }
  295. if start <= tmpMax { //数量赋值
  296. repeatAllNum += repeatListNum
  297. downloadAllNum += tabLen
  298. }
  299. if start > tmpMax && isRunRepeatList { //执行连续页码判重
  300. if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
  301. //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
  302. if repeatPageNum+1 == start || repeatPageNum == 0 {
  303. repeatPageTimes++ //次数加1
  304. } else {
  305. repeatPageTimes = 0 //重复次数重置0
  306. }
  307. repeatPageNum = start //赋值页码
  308. } else { //当前start页有遗漏数据
  309. repeatPageTimes = 0
  310. repeatPageNum = 0
  311. }
  312. }
  313. if !s.IsHistoricalMend && !s.DownDetail {
  314. if len(list) > 0 { //保存信息入库
  315. StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
  316. }
  317. }
  318. } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
  319. if downtimes < 2 {
  320. downtimes++
  321. start--
  322. continue
  323. } else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  324. if repeatPageNum+1 == start {
  325. repeatPageTimes++ //次数加1
  326. } else {
  327. repeatPageTimes = 0 //重复次数重置0
  328. }
  329. repeatPageNum = start //赋值页码
  330. }
  331. }
  332. } else { //请求当前列表页失败
  333. if downtimes < 2 {
  334. downtimes++
  335. start--
  336. continue
  337. } else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  338. if repeatPageNum+1 == start {
  339. repeatPageTimes++ //次数加1
  340. } else {
  341. repeatPageTimes = 0 //重复次数重置0
  342. }
  343. repeatPageNum = start //赋值页码
  344. }
  345. }
  346. downtimes = 0 //当前页下载无误,重置下载重试次数
  347. util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
  348. //临时用,更新开始页码,附件下载
  349. // mgu.Update("luaconfig2", "editor", "editor", `{"code":"`+s.Code+`"}`,
  350. // map[string]interface{}{"$inc": map[string]interface{}{"param_common.4": 1}},
  351. // true, false)
  352. }
  353. nowTime := time.Now()
  354. sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  355. set := map[string]interface{}{
  356. "site": s.Name,
  357. "channel": s.Channel,
  358. "spidercode": s.Code,
  359. "updatetime": nowTime.Unix(),
  360. "event": util.Config.Uploadevent,
  361. "modifyuser": s.MUserName,
  362. "maxpage": tmpMax,
  363. "runrate": s.SpiderRunRate,
  364. "endpage": start,
  365. "date": sDate,
  366. }
  367. inc := map[string]interface{}{
  368. "alltimes": 1,
  369. }
  370. if downloadAllNum > 0 {
  371. rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
  372. rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
  373. if rate == 1.0 {
  374. inc["oh_percent"] = 1
  375. } else if rate >= 0.9 {
  376. inc["nt_percent"] = 1
  377. } else if rate >= 0.8 {
  378. inc["et_percent"] = 1
  379. } else {
  380. inc["other_percent"] = 1
  381. }
  382. } else {
  383. inc["zero"] = 1
  384. }
  385. query := map[string]interface{}{
  386. "date": sDate,
  387. "spidercode": s.Code,
  388. }
  389. Mgo.Update("spider_downloadrate", query, map[string]interface{}{
  390. "$set": set,
  391. "$inc": inc,
  392. }, true, false)
  393. return errs
  394. }
  395. //遍历,开启三级页下载(历史补漏)
  396. func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
  397. //qu.Debug("--------------历史下载-----------------")
  398. defer mu.Catch()
  399. var err interface{}
  400. data := map[string]interface{}{}
  401. paramdata := p.(map[string]interface{})
  402. for k, v := range paramdata {
  403. data[k] = v
  404. }
  405. href := qu.ObjToString(data["href"])
  406. if len(href) <= 5 { //无效数据
  407. return
  408. }
  409. db := HexToBigIntMod(href)
  410. hashHref := HexText(href)
  411. id := ""
  412. SaveListPageData(paramdata, &id) //存储采集记录
  413. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
  414. //log.Println("full href:", href, " isExist:", isExist)
  415. logger.Debug("full href:", href, " isExist:", isExist)
  416. if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束
  417. //qu.Debug("非强制下载redis中存在,结束")
  418. //更新spider_listdata中数据下载成功标记
  419. if id != "" {
  420. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  421. }
  422. return
  423. }
  424. //qu.Debug("----------------下载、解析、入库--------------------")
  425. //下载、解析、入库
  426. data, err = s.DownloadDetailPage(paramdata, data)
  427. if err != nil || data == nil { //下载失败,结束
  428. if err != nil {
  429. logger.Error(s.Code, err, paramdata)
  430. // if len(paramdata) > 0 {
  431. // SaveErrorData(paramdata) //保存错误信息
  432. // }
  433. }
  434. //更新spider_listdata中数据下载失败标记
  435. if id != "" {
  436. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  437. }
  438. return
  439. }
  440. //更新spider_listdata中数据下载成功标记
  441. if id != "" {
  442. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  443. }
  444. flag := true
  445. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
  446. if s.IsMustDownload { //强制下载
  447. if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
  448. //qu.Debug("强制下载 redis存在")
  449. data["dataging"] = 1
  450. flag = false
  451. } else {
  452. //qu.Debug("强制下载 redis不存在")
  453. data["dataging"] = 0
  454. //WithinThreeDays(&data) //根据发布时间打标记
  455. }
  456. } else { //非强制下载
  457. if !isExist {
  458. //qu.Debug("非强制下载 redis不存在")
  459. data["dataging"] = 0
  460. //WithinThreeDays(&data) //根据发布时间打标记
  461. }
  462. }
  463. if t1 > time.Now().Unix() { //防止发布时间超前
  464. data["publishtime"] = time.Now().Unix()
  465. }
  466. delete(data, "state")
  467. delete(data, "exit")
  468. delete(data, "checkpublishtime")
  469. data["comeintime"] = time.Now().Unix()
  470. atomic.AddInt32(&s.LastDowncount, 1)
  471. atomic.AddInt32(&s.TodayDowncount, 1)
  472. atomic.AddInt32(&s.TotalDowncount, 1)
  473. data["spidercode"] = s.Code
  474. //qu.Debug("--------------开始保存---------------")
  475. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  476. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
  477. //qu.Debug("--------------保存结束---------------")
  478. }
  479. //遍历,开启三级页下载(增量)
  480. func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
  481. defer mu.Catch()
  482. var err interface{}
  483. //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
  484. data := map[string]interface{}{}
  485. paramdata := p.(map[string]interface{})
  486. for k, v := range paramdata {
  487. data[k] = v
  488. }
  489. href := qu.ObjToString(data["href"])
  490. if len(href) <= 5 { //无效数据
  491. *num++ //视为已采集
  492. return
  493. }
  494. /*
  495. //查询增量redis查看信息是否已经下载
  496. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  497. if isExist { //更新redis生命周期
  498. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  499. *num++ //已采集
  500. return
  501. }
  502. log.Println("href had++:", isExist, href)
  503. */
  504. id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
  505. if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息
  506. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  507. if isExist { //更新redis生命周期
  508. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  509. *num++ //已采集
  510. return
  511. }
  512. SaveHighListPageData(paramdata, href, num)
  513. return
  514. } else {
  515. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
  516. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  517. if isExist { //更新redis生命周期
  518. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  519. *num++ //已采集
  520. return
  521. }
  522. SaveListPageData(paramdata, &id) //保存7000、7410、7500、7700节点列表页采集的信息
  523. }
  524. //下载、解析、入库
  525. data, err = s.DownloadDetailPage(paramdata, data)
  526. if err != nil || data == nil {
  527. if err != nil {
  528. logger.Error(s.Code, err, paramdata)
  529. if len(paramdata) > 0 {
  530. SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
  531. }
  532. }
  533. //更新spider_listdata中数据下载失败标记
  534. if id != "" {
  535. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  536. }
  537. return
  538. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  539. log.Println("beforeHref:", href, "afterHref:", tmphref)
  540. //增量
  541. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  542. //全量
  543. db := HexToBigIntMod(href)
  544. hashHref := HexText(href)
  545. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  546. if !isExist {
  547. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  548. }
  549. }
  550. //更新spider_listdata中数据下载成功标记
  551. if id != "" {
  552. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  553. }
  554. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  555. if t1 > time.Now().Unix() { //防止发布时间超前
  556. data["publishtime"] = time.Now().Unix()
  557. }
  558. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
  559. delete(data, "state")
  560. delete(data, "exit")
  561. delete(data, "checkpublishtime")
  562. data["comeintime"] = time.Now().Unix()
  563. atomic.AddInt32(&s.LastDowncount, 1)
  564. atomic.AddInt32(&s.TodayDowncount, 1)
  565. atomic.AddInt32(&s.TotalDowncount, 1)
  566. data["spidercode"] = s.Code
  567. //qu.Debug("-----增量开始保存-----")
  568. // 临时保存数据
  569. // update := []map[string]interface{}{}
  570. // _id := data["_id"].(string)
  571. // update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)})
  572. // update = append(update, map[string]interface{}{
  573. // "$set": map[string]interface{}{
  574. // "jsondata": data["jsondata"],
  575. // },
  576. // })
  577. // UpdataMgoCache <- update
  578. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  579. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  580. //qu.Debug("-----增量保存结束-----")
  581. }
  582. //遍历下载名录
  583. func (s *Spider) DownloadDetailByNames(p interface{}) {
  584. defer mu.Catch()
  585. var err interface{}
  586. /*
  587. if s.Stop {
  588. return
  589. }
  590. for s.Pass {
  591. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  592. }
  593. */
  594. //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
  595. data := map[string]interface{}{}
  596. paramdata := p.(map[string]interface{})
  597. for k, v := range paramdata {
  598. data[k] = v
  599. }
  600. if s.DownDetail {
  601. href := qu.ObjToString(data["href"])
  602. if href == "" || len(href) < 5 { //无效数据
  603. return
  604. }
  605. //下载、解析、入库
  606. data, err = s.DownloadDetailPage(paramdata, data)
  607. if err != nil {
  608. logger.Error(s.Code, paramdata, err)
  609. return
  610. }
  611. }
  612. data["comeintime"] = time.Now().Unix()
  613. atomic.AddInt32(&s.LastDowncount, 1)
  614. atomic.AddInt32(&s.TodayDowncount, 1)
  615. atomic.AddInt32(&s.TotalDowncount, 1)
  616. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  617. }
  618. //下载解析内容页
  619. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  620. defer mu.Catch()
  621. s.LastHeartbeat = time.Now().Unix()
  622. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  623. tab := s.L.NewTable()
  624. for k, v := range param {
  625. if val, ok := v.(string); ok {
  626. tab.RawSet(lua.LString(k), lua.LString(val))
  627. } else if val, ok := v.(int64); ok {
  628. tab.RawSet(lua.LString(k), lua.LNumber(val))
  629. } else if val, ok := v.(int32); ok {
  630. tab.RawSet(lua.LString(k), lua.LNumber(val))
  631. } else if val, ok := v.(float64); ok {
  632. tab.RawSet(lua.LString(k), lua.LNumber(val))
  633. } else if val, ok := v.(float32); ok {
  634. tab.RawSet(lua.LString(k), lua.LNumber(val))
  635. } else if val, ok := v.(bool); ok {
  636. tab.RawSet(lua.LString(k), lua.LBool(val))
  637. }
  638. }
  639. var err error
  640. if err = s.L.CallByParam(lua.P{
  641. Fn: s.L.GetGlobal("downloadDetailPage"),
  642. NRet: 1,
  643. Protect: true,
  644. }, tab); err != nil {
  645. //panic(s.Code + "," + err.Error())
  646. log.Println(s.Code + "," + err.Error())
  647. atomic.AddInt32(&s.Script.ErrorNum, 1)
  648. return data, err
  649. }
  650. lv := s.L.Get(-1)
  651. s.L.Pop(1)
  652. //拼map
  653. if v3, ok := lv.(*lua.LTable); ok {
  654. v3.ForEach(func(k, v lua.LValue) {
  655. if tmp, ok := k.(lua.LString); ok {
  656. key := string(tmp)
  657. if value, ok := v.(lua.LString); ok {
  658. data[key] = string(value)
  659. } else if value, ok := v.(lua.LNumber); ok {
  660. data[key] = value
  661. } else if value, ok := v.(*lua.LTable); ok {
  662. tmp := util.TableToMap(value)
  663. data[key] = tmp
  664. }
  665. }
  666. })
  667. return data, err
  668. } else {
  669. return nil, err
  670. }
  671. }
  672. //高性能模式定时采集三级页信息
  673. func DetailData() {
  674. defer qu.Catch()
  675. <-InitAllLuaOver //脚本加载完毕,执行
  676. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  677. GetListDataDownloadDetail()
  678. }
  679. }
  680. func GetListDataDownloadDetail() {
  681. defer qu.Catch()
  682. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  683. Allspiders2.Range(func(k, v interface{}) bool {
  684. sp := v.(*Spider)
  685. go sp.DownloadHighDetail()
  686. time.Sleep(2 * time.Second)
  687. return true
  688. })
  689. }
  690. //高性能模式根据列表页数据下载三级页
  691. func (s *Spider) DownloadHighDetail() {
  692. defer qu.Catch()
  693. for {
  694. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  695. if !s.Stop { //爬虫是运行状态
  696. comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
  697. if delayDay := util.Config.DelaySites[s.Name]; delayDay > 0 && delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点
  698. comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
  699. }
  700. q := map[string]interface{}{
  701. "spidercode": s.Code,
  702. "state": 0, //0:入库状态;-1:采集失败;1:成功
  703. "comeintime": comeintimeQuery,
  704. }
  705. o := map[string]interface{}{"_id": -1}
  706. f := map[string]interface{}{
  707. "state": 0,
  708. "comeintime": 0,
  709. "event": 0,
  710. }
  711. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  712. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  713. if list != nil && len(*list) > 0 {
  714. for _, tmp := range *list {
  715. _id := tmp["_id"]
  716. query := map[string]interface{}{"_id": _id}
  717. competehref := qu.ObjToString(tmp["competehref"])
  718. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  719. title := qu.ObjToString(tmp["title"])
  720. one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
  721. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  722. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  723. Mgo.Update("spider_highlistdata", query, set, false, false)
  724. continue
  725. }
  726. }
  727. times := qu.IntAll(tmp["times"])
  728. success := true //数据是否下载成功的标志
  729. delete(tmp, "_id")
  730. delete(tmp, "times")
  731. href := qu.ObjToString(tmp["href"])
  732. data := map[string]interface{}{}
  733. var err interface{}
  734. for k, v := range tmp {
  735. data[k] = v
  736. }
  737. //下载、解析、入库
  738. data, err = s.DownloadDetailPage(tmp, data)
  739. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  740. if err != nil || data == nil {
  741. success = false
  742. times++
  743. if err != nil {
  744. logger.Error(s.Code, err, tmp)
  745. if len(tmp) > 0 {
  746. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  747. }
  748. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  749. DownloadErrorData(s.Code, tmp)
  750. }*/
  751. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  752. log.Println("beforeHref:", href, "afterHref:", tmphref)
  753. //增量
  754. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  755. //全量
  756. db := HexToBigIntMod(href)
  757. hashHref := HexText(href)
  758. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  759. if !isExist {
  760. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  761. }
  762. }
  763. if !success { //下载失败更新次数和状态
  764. ss := map[string]interface{}{"times": times}
  765. if times >= 3 { //3次下载失败今天不再下载,state置为1
  766. ss["state"] = -1
  767. }
  768. set := map[string]interface{}{"$set": ss}
  769. Mgo.Update("spider_highlistdata", query, set, false, false)
  770. continue
  771. }
  772. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  773. if t1 > time.Now().Unix() { //防止发布时间超前
  774. data["publishtime"] = time.Now().Unix()
  775. }
  776. delete(data, "exit")
  777. delete(data, "checkpublishtime")
  778. data["comeintime"] = time.Now().Unix()
  779. //计数
  780. tmpsp1, b := Allspiders.Load(s.Code)
  781. if b {
  782. sp1, ok := tmpsp1.(*Spider)
  783. if ok {
  784. atomic.AddInt32(&sp1.LastDowncount, 1)
  785. atomic.AddInt32(&sp1.TodayDowncount, 1)
  786. atomic.AddInt32(&sp1.TotalDowncount, 1)
  787. }
  788. }
  789. data["spidercode"] = s.Code
  790. data["dataging"] = 0
  791. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  792. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  793. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  794. Mgo.Update("spider_highlistdata", query, set, false, false)
  795. }
  796. //重载spider
  797. s.LoadScript(s.Code, s.ScriptFile, true)
  798. } else { //没有数据
  799. time.Sleep(2 * time.Minute)
  800. }
  801. //s.GetListDataDownloadDetail() //开始下一轮
  802. } else {
  803. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  804. break
  805. }
  806. }
  807. }
  808. //队列模式根据列表页数据下载三级页
  809. func (s *Spider) DownloadListDetail() {
  810. defer qu.Catch()
  811. defer func() { //爬虫下载完三级页数据或无下载数据,使用后close
  812. s.Stop = true
  813. if _, b := Allspiders2.Load(s.Code); b {
  814. Allspiders2.Store(s.Code, s)
  815. }
  816. s.L.Close()
  817. CC2 <- s.L
  818. }()
  819. q := map[string]interface{}{
  820. "spidercode": s.Code,
  821. "state": 0, //0:入库状态;-1:采集失败;1:成功
  822. "comeintime": map[string]interface{}{ //采集一周内的数据,防止有数据一直采不下来,造成积累
  823. "$gte": GetTime(-util.Config.DayNum),
  824. },
  825. }
  826. o := map[string]interface{}{"_id": -1}
  827. f := map[string]interface{}{
  828. "state": 0,
  829. "comeintime": 0,
  830. "event": 0,
  831. }
  832. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  833. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  834. if list != nil && len(*list) > 0 {
  835. for _, tmp := range *list {
  836. _id := tmp["_id"]
  837. query := map[string]interface{}{"_id": _id}
  838. competehref := qu.ObjToString(tmp["competehref"])
  839. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  840. title := qu.ObjToString(tmp["title"])
  841. one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
  842. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  843. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  844. Mgo.Update("spider_highlistdata", query, set, false, false)
  845. continue
  846. }
  847. }
  848. times := qu.IntAll(tmp["times"])
  849. success := true //数据是否下载成功的标志
  850. delete(tmp, "_id")
  851. delete(tmp, "times")
  852. href := qu.ObjToString(tmp["href"])
  853. data := map[string]interface{}{}
  854. var err interface{}
  855. for k, v := range tmp {
  856. data[k] = v
  857. }
  858. //下载、解析、入库
  859. data, err = s.DownloadDetailPage(tmp, data)
  860. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  861. if err != nil || data == nil {
  862. success = false
  863. times++
  864. if err != nil {
  865. logger.Error(s.Code, err, tmp)
  866. if len(tmp) > 0 {
  867. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  868. }
  869. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  870. DownloadErrorData(s.Code, tmp)
  871. }*/
  872. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  873. log.Println("beforeHref:", href, "afterHref:", tmphref)
  874. //增量
  875. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  876. //全量
  877. db := HexToBigIntMod(href)
  878. hashHref := HexText(href)
  879. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  880. if !isExist {
  881. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  882. }
  883. }
  884. if !success { //下载失败更新次数和状态
  885. ss := map[string]interface{}{"times": times}
  886. if times >= 3 { //3次下载失败今天不再下载,state置为1
  887. ss["state"] = -1
  888. }
  889. set := map[string]interface{}{"$set": ss}
  890. Mgo.Update("spider_highlistdata", query, set, false, false)
  891. continue
  892. }
  893. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  894. if t1 > time.Now().Unix() { //防止发布时间超前
  895. data["publishtime"] = time.Now().Unix()
  896. }
  897. delete(data, "exit")
  898. delete(data, "checkpublishtime")
  899. data["comeintime"] = time.Now().Unix()
  900. //计数
  901. tmpsp1, b := Allspiders.Load(s.Code)
  902. if b {
  903. sp1, ok := tmpsp1.(*Spider)
  904. if ok {
  905. atomic.AddInt32(&sp1.LastDowncount, 1)
  906. atomic.AddInt32(&sp1.TodayDowncount, 1)
  907. atomic.AddInt32(&sp1.TotalDowncount, 1)
  908. }
  909. }
  910. data["spidercode"] = s.Code
  911. data["dataging"] = 0
  912. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  913. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  914. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  915. Mgo.Update("spider_highlistdata", query, set, false, false)
  916. }
  917. }
  918. }
  919. //获取随机数
  920. func GetRandMath(num int) int {
  921. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  922. return r.Intn(num)
  923. }
  924. //获取hascode
  925. func GetHas1(data string) string {
  926. t := sha1.New()
  927. io.WriteString(t, data)
  928. hf := Reg.FindString(data)
  929. if !strings.HasSuffix(hf, "/") {
  930. hf = hf + "/"
  931. }
  932. return hf + fmt.Sprintf("%x", t.Sum(nil))
  933. }
  934. //对href哈希取模
  935. func HexToBigIntMod(href string) int {
  936. //取哈希值
  937. t := sha256.New()
  938. io.WriteString(t, href)
  939. hex := fmt.Sprintf("%x", t.Sum(nil))
  940. //取模
  941. n := new(big.Int)
  942. n, _ = n.SetString(hex[2:], 16)
  943. return int(n.Mod(n, big.NewInt(16)).Int64())
  944. }
  945. //求hash
  946. func HexText(href string) string {
  947. h := sha256.New()
  948. h.Write([]byte(href))
  949. return fmt.Sprintf("%x", h.Sum(nil))
  950. }
  951. //func RedisIsExist(href string) bool {
  952. // isExist := false
  953. // if len(href) > 75 { //取href的哈希判断是否存在
  954. // hashHref := GetHas1(href)
  955. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref)
  956. // }
  957. // if !isExist { //取string href判断是否存在
  958. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  959. // }
  960. // return isExist
  961. //}
  962. //判断发布时间是否在三天内
  963. //func WithinThreeDays(data *map[string]interface{}) {
  964. // withinThreeDays := false
  965. // //根据发布时间打标记
  966. // publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间
  967. // //发布时间
  968. // now := time.Now().Unix()
  969. // if now-publishtime > 259200 { //三天前数据
  970. // withinThreeDays = false
  971. // } else {
  972. // withinThreeDays = true
  973. // }
  974. // if withinThreeDays {
  975. // //qu.Debug("发布时间在三天内")
  976. // (*data)["dataging"] = 0
  977. // } else {
  978. // //qu.Debug("发布时间在三天外")
  979. // (*data)["dataging"] = 1
  980. // }
  981. //}