spider.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. mgo "mongodb"
  15. qu "qfw/util"
  16. mgu "qfw/util/mongodbutil"
  17. "strconv"
  18. //mgu "qfw/util/mongodbutil"
  19. //"qfw/util/redis"
  20. es "qfw/util/elastic"
  21. "regexp"
  22. util "spiderutil"
  23. "strings"
  24. "sync/atomic"
  25. "time"
  26. "github.com/donnie4w/go-logger/logger"
  27. "github.com/yuin/gopher-lua"
  28. )
  29. type Heart struct {
  30. DetailHeart int64 //爬虫三级页执行心跳
  31. DetailExecuteHeart int64 //三级页采集到数据心跳
  32. FindListHeart int64 //findListHtml执行心跳
  33. ListHeart int64 //爬虫列表页执行心跳
  34. ModifyUser string //爬虫维护人
  35. Site string //站点
  36. Channel string //栏目
  37. }
  38. //爬虫()
  39. type Spider struct {
  40. Script
  41. Code string //代码
  42. Name string //名称
  43. Channel string //站点
  44. DownDetail bool //是否下载详细页
  45. Stop bool //停止标志
  46. Pass bool //暂停标志
  47. LastPubshTime int64 //最后发布时间
  48. LastHeartbeat int64 //最后心跳时间
  49. SpiderRunRate int64 //执行频率
  50. ExecuteOkTime int64 //任务执行成功/完成时间
  51. Collection string //写入表名
  52. Thread int64 //线程数
  53. LastExecTime int64 //最后执行时间
  54. LastDowncount int32 //最后一次下载量
  55. TodayDowncount int32 //今日下载量
  56. YesterdayDowncount int32 //昨日下载量
  57. TotalDowncount int32 //总下载量
  58. RoundCount int32 //执行轮次
  59. StoreMode int //存储模式
  60. StoreToMsgEvent int //消息类型
  61. CoverAttr string //按属性判重数据
  62. SleepBase int //基本延时
  63. SleepRand int //随机延时
  64. TargetChannelUrl string //栏目页地址
  65. UpperLimit, LowerLimit int //正常值上限、下限
  66. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  67. MUserName, MUserEmail string //维护人,维护人邮箱
  68. Index int //数组索引
  69. //历史补漏
  70. IsHistoricalMend bool //是否是历史补漏爬虫
  71. IsMustDownload bool //是否强制下载
  72. IsCompete bool //区分新老爬虫
  73. }
  74. var Es *es.Elastic
  75. var EsIndex string
  76. var EsType string
  77. var Mgo *mgo.MongodbSim
  78. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  79. var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
  80. var SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据
  81. var SP = make(chan bool, 5)
  82. var SPH = make(chan bool, 5)
  83. var SPS = make(chan bool, 5)
  84. var TimeChan = make(chan bool, 1)
  85. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  86. var DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`)
  87. var RepDomainNameReg = regexp.MustCompile(`[::/]+`)
  88. var DelaySites map[string]int //延迟采集站点集合
  89. //心跳
  90. func UpdateHeart(site, channel, code, user, t string) {
  91. if htmp, ok := SpiderHeart.Load(code); ok {
  92. if heart, ok := htmp.(*Heart); ok {
  93. if t == "list" {
  94. heart.ListHeart = time.Now().Unix()
  95. } else if t == "findlist" {
  96. heart.FindListHeart = time.Now().Unix()
  97. } else if t == "detail" {
  98. heart.DetailHeart = time.Now().Unix()
  99. } else if t == "detailexcute" {
  100. heart.DetailExecuteHeart = time.Now().Unix()
  101. }
  102. }
  103. } else {
  104. heart := &Heart{
  105. ModifyUser: user,
  106. Site: site,
  107. Channel: channel,
  108. }
  109. if t == "list" {
  110. heart.ListHeart = time.Now().Unix()
  111. } else if t == "findlist" {
  112. heart.FindListHeart = time.Now().Unix()
  113. } else if t == "detail" {
  114. heart.DetailHeart = time.Now().Unix()
  115. } else if t == "detailexcute" {
  116. heart.DetailExecuteHeart = time.Now().Unix()
  117. }
  118. SpiderHeart.Store(code, heart)
  119. }
  120. }
  121. //任务
  122. func (s *Spider) StartJob() {
  123. s.Stop = false
  124. s.Pass = false
  125. s.RoundCount++
  126. go s.ExecJob(false)
  127. }
  128. //单次执行
  129. func (s *Spider) ExecJob(reload bool) {
  130. defer func() {
  131. size_ok, size_no := 0, 0
  132. size_no_index := []interface{}{}
  133. LoopListPath.Range(func(k, v interface{}) bool {
  134. if v != nil {
  135. size_ok++
  136. } else {
  137. size_no_index = append(size_no_index, k)
  138. size_no++
  139. }
  140. return true
  141. })
  142. logger.Debug("index_", s.Index, ",", s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",废弃数量:", size_no, ",废弃位置:", size_no_index)
  143. s.ExecuteOkTime = time.Now().Unix()
  144. util.TimeSleepFunc(5*time.Second, TimeSleepChan)
  145. if util.Config.Working == 1 {
  146. s.Stop = true
  147. if _, b := Allspiders.Load(s.Code); b {
  148. Allspiders.Store(s.Code, s)
  149. }
  150. s.L.Close()
  151. CC <- s.L
  152. }
  153. }()
  154. if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
  155. s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true)
  156. }
  157. logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
  158. s.LastDowncount = 0
  159. s.LastExecTime = time.Now().Unix()
  160. s.LastHeartbeat = time.Now().Unix()
  161. s.ExecuteOkTime = 0
  162. err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
  163. if err != nil {
  164. logger.Error(s.Code, err)
  165. }
  166. err = s.DownListPageItem() //下载列表
  167. if err != nil {
  168. logger.Error(s.Code, err)
  169. }
  170. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
  171. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  172. SpiderCodeSendToEditor(s.Code) //发送编辑器
  173. return
  174. } else {
  175. if util.Config.Working == 0 { //高性能模式
  176. /*
  177. for !s.Stop && s.Pass {
  178. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  179. }
  180. if s.Stop {
  181. return
  182. }
  183. */
  184. //if s.IsMustDownload { //历史数据下载,只跑一轮
  185. if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
  186. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  187. b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
  188. logger.Info("Delete History Code:", s.Code, b)
  189. } else {
  190. if !s.Stop { //未下架定时执行
  191. util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
  192. s.ExecJob(true)
  193. }, TimeChan)
  194. // util.TimeAfterFunc(30*time.Second, func() {
  195. // s.ExecJob(true)
  196. // }, TimeChan)
  197. } else { //下架后子线程退出
  198. return
  199. }
  200. }
  201. } else { //排队模式
  202. return
  203. }
  204. }
  205. }
  206. //获取最新时间--作为最后更新时间
  207. func (s *Spider) GetLastPublishTime() (errs interface{}) {
  208. defer mu.Catch()
  209. var lastpublishtime string
  210. //取得最后更新时间
  211. if err := s.L.CallByParam(lua.P{
  212. Fn: s.L.GetGlobal("getLastPublishTime"),
  213. NRet: 1,
  214. Protect: true,
  215. }); err != nil {
  216. //panic(s.Code + "," + err.Error())
  217. log.Println(s.Code + "," + err.Error())
  218. errs = err.Error()
  219. atomic.AddInt32(&s.Script.ErrorNum, 1)
  220. return errs
  221. }
  222. ret := s.L.Get(-1)
  223. s.L.Pop(1)
  224. if str, ok := ret.(lua.LString); ok {
  225. lastpublishtime = string(str)
  226. }
  227. if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
  228. //防止发布时间超前
  229. if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
  230. s.LastPubshTime = time.Now().Unix()
  231. } else {
  232. s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
  233. }
  234. }
  235. return nil
  236. }
  237. //下载列表
  238. func (s *Spider) DownListPageItem() (errs interface{}) {
  239. defer mu.Catch()
  240. start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
  241. tmpMax := max //临时记录最大页
  242. repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
  243. downloadAllNum := 0 //本轮采集tmpMax页总个数
  244. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
  245. max = s.GetIntVar("spiderHistoryMaxPage")
  246. }
  247. downtimes := 0 //记录某页重试次数(暂定3次)
  248. repeatPageNum := 0 //记录列表页所有连接重复的页码
  249. repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
  250. isRunRepeatList := false //是否执行列表页连续判重
  251. if util.Config.Modal == 1 && util.Config.Working == 0 && max > 1 && max < 101 { //7100 7400最大页小于101且大于1,对此部分爬虫采集列表页时进行连续5页判重
  252. isRunRepeatList = true
  253. max = 100 //设置最大页为100
  254. }
  255. for ; start <= max && !s.Stop; start++ {
  256. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  257. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
  258. }
  259. //qu.Debug("重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
  260. //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
  261. // break
  262. //}
  263. if isRunRepeatList && repeatPageTimes >= 10 { //重复次数超过10次,不再翻页
  264. break
  265. }
  266. if err := s.L.CallByParam(lua.P{
  267. Fn: s.L.GetGlobal("downloadAndParseListPage"),
  268. NRet: 1,
  269. Protect: true,
  270. }, lua.LNumber(start)); err != nil {
  271. //panic(s.Code + "," + err.Error())
  272. logger.Error("列表页采集报错", start, s.Code+","+err.Error())
  273. errs = err.Error()
  274. atomic.AddInt32(&s.Script.ErrorNum, 1)
  275. //列表页采集报错进行重试,超过重试次数视为该页已采
  276. if downtimes < 2 {
  277. downtimes++
  278. start--
  279. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  280. } else if isRunRepeatList { //超过重试次数,视为本页重复
  281. if repeatPageNum+1 == start {
  282. repeatPageTimes++ //次数加1
  283. } else {
  284. repeatPageTimes = 0 //重复次数重置0
  285. }
  286. repeatPageNum = start //赋值页码
  287. downtimes = 0
  288. }
  289. continue
  290. }
  291. lv := s.L.Get(-1)
  292. s.L.Pop(1)
  293. if tbl, ok := lv.(*lua.LTable); ok {
  294. list := []map[string]interface{}{}
  295. //qu.Debug("当前页数据量:", tbl.Len())
  296. if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
  297. repeatListNum := 0 // 当前列表页连接重复个数
  298. for i := 1; i <= tabLen; i++ {
  299. v := tbl.RawGetInt(i).(*lua.LTable)
  300. tmp := util.TableToMap(v)
  301. //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据
  302. if !s.IsHistoricalMend { //不是历史补漏
  303. tmp["dataging"] = 0 //数据中打标记dataging=0
  304. if s.DownDetail {
  305. s.DownloadDetailItem(tmp, &repeatListNum)
  306. } else {
  307. tmp["comeintime"] = time.Now().Unix()
  308. atomic.AddInt32(&s.LastDowncount, 1)
  309. atomic.AddInt32(&s.TodayDowncount, 1)
  310. atomic.AddInt32(&s.TotalDowncount, 1)
  311. href := fmt.Sprint(tmp["href"])
  312. if len(href) > 5 { //有效数据
  313. db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
  314. hashHref := HexText(href)
  315. //增量(redis默认db0)
  316. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  317. //全量(判断是否已存在防止覆盖id)
  318. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  319. if !isExist {
  320. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  321. }
  322. list = append(list, tmp)
  323. }
  324. }
  325. } else { //历史补漏
  326. s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
  327. }
  328. }
  329. //if start <= tmpMax { //数量赋值
  330. repeatAllNum += repeatListNum
  331. downloadAllNum += tabLen
  332. //}
  333. //if start > tmpMax && isRunRepeatList { //执行连续页码判重
  334. if isRunRepeatList { //执行连续页码判重
  335. if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
  336. //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
  337. if repeatPageNum+1 == start || repeatPageNum == 0 {
  338. repeatPageTimes++ //次数加1
  339. } else {
  340. repeatPageTimes = 0 //重复次数重置0
  341. }
  342. repeatPageNum = start //赋值页码
  343. } else { //当前start页有遗漏数据
  344. repeatPageTimes = 0
  345. repeatPageNum = 0
  346. }
  347. }
  348. if !s.IsHistoricalMend && !s.DownDetail {
  349. if len(list) > 0 { //保存信息入库
  350. StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
  351. }
  352. }
  353. } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
  354. if downtimes < 2 {
  355. downtimes++
  356. start--
  357. continue
  358. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  359. } else if isRunRepeatList { //超过重试次数,视为本页重复
  360. if repeatPageNum+1 == start {
  361. repeatPageTimes++ //次数加1
  362. } else {
  363. repeatPageTimes = 0 //重复次数重置0
  364. }
  365. repeatPageNum = start //赋值页码
  366. }
  367. }
  368. } else { //请求当前列表页失败
  369. if downtimes < 2 {
  370. downtimes++
  371. start--
  372. continue
  373. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  374. } else if isRunRepeatList { //超过重试次数,视为本页重复
  375. if repeatPageNum+1 == start {
  376. repeatPageTimes++ //次数加1
  377. } else {
  378. repeatPageTimes = 0 //重复次数重置0
  379. }
  380. repeatPageNum = start //赋值页码
  381. }
  382. }
  383. downtimes = 0 //当前页下载无误,重置下载重试次数
  384. util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
  385. }
  386. nowTime := time.Now()
  387. sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  388. set := map[string]interface{}{
  389. "site": s.Name,
  390. "channel": s.Channel,
  391. "spidercode": s.Code,
  392. "updatetime": nowTime.Unix(),
  393. "event": util.Config.Uploadevent,
  394. "modifyuser": s.MUserName,
  395. "maxpage": tmpMax,
  396. "runrate": s.SpiderRunRate,
  397. "endpage": start,
  398. "date": sDate,
  399. }
  400. inc := map[string]interface{}{
  401. "alltimes": 1,
  402. }
  403. if downloadAllNum > 0 {
  404. rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
  405. rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
  406. if rate == 1.0 {
  407. inc["oh_percent"] = 1
  408. } else if rate >= 0.9 {
  409. inc["nt_percent"] = 1
  410. } else if rate >= 0.8 {
  411. inc["et_percent"] = 1
  412. } else {
  413. inc["other_percent"] = 1
  414. }
  415. } else {
  416. inc["zero"] = 1
  417. }
  418. query := map[string]interface{}{
  419. "date": sDate,
  420. "spidercode": s.Code,
  421. }
  422. logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start)
  423. Mgo.Update("spider_downloadrate", query, map[string]interface{}{
  424. "$set": set,
  425. "$inc": inc,
  426. }, true, false)
  427. return errs
  428. }
  429. func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
  430. defer qu.Catch()
  431. href := qu.ObjToString(tmp["href"])
  432. url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "")
  433. href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "")
  434. if url_dn != href_dn {
  435. SaveMgoCache <- map[string]interface{}{
  436. "site": s.Name,
  437. "channel": s.Channel,
  438. "spidercode": s.Code,
  439. "url": s.TargetChannelUrl,
  440. "href": href,
  441. "modifyuser": s.MUserName,
  442. "comeintime": time.Now().Unix(),
  443. }
  444. }
  445. }
  446. //遍历,开启三级页下载(历史补漏)
  447. func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
  448. //qu.Debug("--------------历史下载-----------------")
  449. defer mu.Catch()
  450. var err interface{}
  451. data := map[string]interface{}{}
  452. paramdata := p.(map[string]interface{})
  453. for k, v := range paramdata {
  454. data[k] = v
  455. }
  456. href := qu.ObjToString(data["href"])
  457. if len(href) <= 5 { //无效数据
  458. return
  459. }
  460. db := HexToBigIntMod(href)
  461. hashHref := HexText(href)
  462. id := ""
  463. SaveListPageData(paramdata, &id, false) //存储采集记录
  464. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
  465. //log.Println("full href:", href, " isExist:", isExist)
  466. logger.Debug("full href:", href, " isExist:", isExist)
  467. if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束
  468. //qu.Debug("非强制下载redis中存在,结束")
  469. //更新spider_listdata中数据下载成功标记
  470. if id != "" {
  471. //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
  472. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  473. }
  474. return
  475. }
  476. //qu.Debug("----------------下载、解析、入库--------------------")
  477. //下载、解析、入库
  478. data, err = s.DownloadDetailPage(paramdata, data)
  479. if err != nil || data == nil { //下载失败,结束
  480. if err != nil {
  481. logger.Error(s.Code, err, paramdata)
  482. // if len(paramdata) > 0 {
  483. // SaveErrorData(paramdata) //保存错误信息
  484. // }
  485. }
  486. //更新spider_listdata中数据下载失败标记
  487. if id != "" {
  488. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  489. }
  490. return
  491. }
  492. //更新spider_listdata中数据下载成功标记
  493. if id != "" {
  494. //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
  495. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  496. }
  497. flag := true
  498. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
  499. if s.IsMustDownload { //强制下载
  500. if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
  501. //qu.Debug("强制下载 redis存在")
  502. data["dataging"] = 1
  503. flag = false
  504. } else {
  505. //qu.Debug("强制下载 redis不存在")
  506. data["dataging"] = 0
  507. //WithinThreeDays(&data) //根据发布时间打标记
  508. }
  509. } else { //非强制下载
  510. if !isExist {
  511. //qu.Debug("非强制下载 redis不存在")
  512. data["dataging"] = 0
  513. //WithinThreeDays(&data) //根据发布时间打标记
  514. }
  515. }
  516. if t1 > time.Now().Unix() { //防止发布时间超前
  517. data["publishtime"] = time.Now().Unix()
  518. }
  519. delete(data, "state")
  520. delete(data, "exit")
  521. delete(data, "checkpublishtime")
  522. data["comeintime"] = time.Now().Unix()
  523. atomic.AddInt32(&s.LastDowncount, 1)
  524. atomic.AddInt32(&s.TodayDowncount, 1)
  525. atomic.AddInt32(&s.TotalDowncount, 1)
  526. data["spidercode"] = s.Code
  527. //qu.Debug("--------------开始保存---------------")
  528. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  529. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
  530. //qu.Debug("--------------保存结束---------------")
  531. }
  532. //遍历,开启三级页下载(增量)
  533. func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
  534. defer mu.Catch()
  535. var err interface{}
  536. data := map[string]interface{}{}
  537. paramdata := p.(map[string]interface{})
  538. for k, v := range paramdata {
  539. data[k] = v
  540. }
  541. href := qu.ObjToString(data["href"])
  542. if len(href) <= 5 { //无效数据
  543. *num++ //视为已采集
  544. return
  545. }
  546. /*
  547. //查询增量redis查看信息是否已经下载
  548. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  549. if isExist { //更新redis生命周期
  550. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  551. *num++ //已采集
  552. return
  553. }
  554. log.Println("href had++:", isExist, href)
  555. */
  556. id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
  557. if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息
  558. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  559. if isExist { //更新redis生命周期
  560. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  561. *num++ //已采集
  562. return
  563. }
  564. SaveHighListPageData(paramdata, s.SCode, href, num)
  565. return
  566. } else {
  567. if !s.Stop {
  568. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
  569. }
  570. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  571. if isExist { //更新redis生命周期
  572. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  573. *num++ //已采集
  574. return
  575. }
  576. isEsRepeat := false
  577. if delayDay := DelaySites[s.Name]; delayDay > 0 { //类竞品站点爬虫title做es7天内判重检验(顺序采集无法延迟,只能判重)
  578. title := qu.ObjToString(paramdata["title"])
  579. eTime := time.Now().Unix()
  580. sTime := eTime - int64(7*86400)
  581. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  582. if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
  583. isEsRepeat = true
  584. }
  585. }
  586. SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7700节点列表页采集的信息
  587. if isEsRepeat { //类竞品数据title判重数据加入redis
  588. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  589. return
  590. }
  591. }
  592. //下载、解析、入库
  593. data, err = s.DownloadDetailPage(paramdata, data)
  594. if err != nil || data == nil {
  595. if err != nil {
  596. logger.Error(s.Code, err, paramdata)
  597. if len(paramdata) > 0 {
  598. SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
  599. }
  600. }
  601. //更新spider_listdata中数据下载失败标记
  602. if id != "" {
  603. Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  604. }
  605. return
  606. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  607. log.Println("beforeHref:", href, "afterHref:", tmphref)
  608. //增量
  609. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  610. //全量
  611. db := HexToBigIntMod(href)
  612. hashHref := HexText(href)
  613. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  614. if !isExist {
  615. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  616. }
  617. }
  618. //更新spider_listdata中数据下载成功标记
  619. if id != "" {
  620. Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
  621. //Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
  622. }
  623. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  624. if t1 > time.Now().Unix() { //防止发布时间超前
  625. data["publishtime"] = time.Now().Unix()
  626. }
  627. if !s.Stop {
  628. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
  629. }
  630. delete(data, "state")
  631. delete(data, "exit")
  632. delete(data, "checkpublishtime")
  633. data["comeintime"] = time.Now().Unix()
  634. atomic.AddInt32(&s.LastDowncount, 1)
  635. atomic.AddInt32(&s.TodayDowncount, 1)
  636. atomic.AddInt32(&s.TotalDowncount, 1)
  637. data["spidercode"] = s.Code
  638. //qu.Debug("-----增量开始保存-----")
  639. // 临时保存数据
  640. // update := []map[string]interface{}{}
  641. // _id := data["_id"].(string)
  642. // update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)})
  643. // update = append(update, map[string]interface{}{
  644. // "$set": map[string]interface{}{
  645. // "jsondata": data["jsondata"],
  646. // },
  647. // })
  648. // UpdataMgoCache <- update
  649. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  650. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  651. //qu.Debug("-----增量保存结束-----")
  652. }
  653. //遍历下载名录
  654. func (s *Spider) DownloadDetailByNames(p interface{}) {
  655. defer mu.Catch()
  656. var err interface{}
  657. /*
  658. if s.Stop {
  659. return
  660. }
  661. for s.Pass {
  662. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  663. }
  664. */
  665. //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
  666. data := map[string]interface{}{}
  667. paramdata := p.(map[string]interface{})
  668. for k, v := range paramdata {
  669. data[k] = v
  670. }
  671. if s.DownDetail {
  672. href := qu.ObjToString(data["href"])
  673. if href == "" || len(href) < 5 { //无效数据
  674. return
  675. }
  676. //下载、解析、入库
  677. data, err = s.DownloadDetailPage(paramdata, data)
  678. if err != nil {
  679. logger.Error(s.Code, paramdata, err)
  680. return
  681. }
  682. }
  683. data["comeintime"] = time.Now().Unix()
  684. atomic.AddInt32(&s.LastDowncount, 1)
  685. atomic.AddInt32(&s.TodayDowncount, 1)
  686. atomic.AddInt32(&s.TotalDowncount, 1)
  687. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  688. }
  689. //下载解析内容页
  690. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  691. defer mu.Catch()
  692. s.LastHeartbeat = time.Now().Unix()
  693. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  694. tab := s.L.NewTable()
  695. for k, v := range param {
  696. if val, ok := v.(string); ok {
  697. tab.RawSet(lua.LString(k), lua.LString(val))
  698. } else if val, ok := v.(int64); ok {
  699. tab.RawSet(lua.LString(k), lua.LNumber(val))
  700. } else if val, ok := v.(int32); ok {
  701. tab.RawSet(lua.LString(k), lua.LNumber(val))
  702. } else if val, ok := v.(float64); ok {
  703. tab.RawSet(lua.LString(k), lua.LNumber(val))
  704. } else if val, ok := v.(float32); ok {
  705. tab.RawSet(lua.LString(k), lua.LNumber(val))
  706. } else if val, ok := v.(bool); ok {
  707. tab.RawSet(lua.LString(k), lua.LBool(val))
  708. }
  709. }
  710. var err error
  711. if err = s.L.CallByParam(lua.P{
  712. Fn: s.L.GetGlobal("downloadDetailPage"),
  713. NRet: 1,
  714. Protect: true,
  715. }, tab); err != nil {
  716. //panic(s.Code + "," + err.Error())
  717. log.Println(s.Code + "," + err.Error())
  718. atomic.AddInt32(&s.Script.ErrorNum, 1)
  719. return data, err
  720. }
  721. lv := s.L.Get(-1)
  722. s.L.Pop(1)
  723. //拼map
  724. if v3, ok := lv.(*lua.LTable); ok {
  725. v3.ForEach(func(k, v lua.LValue) {
  726. if tmp, ok := k.(lua.LString); ok {
  727. key := string(tmp)
  728. if value, ok := v.(lua.LString); ok {
  729. data[key] = string(value)
  730. } else if value, ok := v.(lua.LNumber); ok {
  731. data[key] = value
  732. } else if value, ok := v.(*lua.LTable); ok {
  733. tmp := util.TableToMap(value)
  734. data[key] = tmp
  735. }
  736. }
  737. })
  738. return data, err
  739. } else {
  740. return nil, err
  741. }
  742. }
  743. //高性能模式定时采集三级页信息
  744. func DetailData() {
  745. defer qu.Catch()
  746. <-InitAllLuaOver //脚本加载完毕,执行
  747. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  748. GetListDataDownloadDetail()
  749. }
  750. }
  751. func GetListDataDownloadDetail() {
  752. defer qu.Catch()
  753. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  754. Allspiders2.Range(func(k, v interface{}) bool {
  755. sp := v.(*Spider)
  756. go sp.DownloadHighDetail()
  757. time.Sleep(2 * time.Second)
  758. return true
  759. })
  760. }
  761. //高性能模式根据列表页数据下载三级页
  762. func (s *Spider) DownloadHighDetail() {
  763. defer qu.Catch()
  764. for {
  765. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  766. if !s.Stop { //爬虫是运行状态
  767. comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
  768. isEsRepeat := false //是否进行es判重
  769. if delayDay := DelaySites[s.Name]; delayDay > 0 {
  770. isEsRepeat = true
  771. if delayDay <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
  772. //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
  773. comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delayDay)
  774. }
  775. }
  776. q := map[string]interface{}{
  777. "spidercode": s.Code,
  778. "state": 0, //0:入库状态;-1:采集失败;1:成功
  779. "comeintime": comeintimeQuery,
  780. }
  781. o := map[string]interface{}{"_id": -1}
  782. f := map[string]interface{}{
  783. "state": 0,
  784. "comeintime": 0,
  785. "event": 0,
  786. }
  787. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  788. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  789. }
  790. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  791. if list != nil && len(*list) > 0 {
  792. for _, tmp := range *list {
  793. _id := tmp["_id"]
  794. query := map[string]interface{}{"_id": _id}
  795. href := qu.ObjToString(tmp["href"])
  796. //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
  797. //为了避免重复下载,进行增量redis判重
  798. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  799. if isExist {
  800. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  801. Mgo.Update("spider_highlistdata", query, set, false, false)
  802. continue
  803. }
  804. if isEsRepeat { //es数据title判重
  805. title := qu.ObjToString(tmp["title"])
  806. eTime := time.Now().Unix()
  807. sTime := eTime - int64(7*86400)
  808. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  809. count := Es.Count(EsIndex, EsType, esQuery)
  810. if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
  811. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  812. Mgo.Update("spider_highlistdata", query, set, false, false)
  813. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  814. continue
  815. }
  816. }
  817. times := qu.IntAll(tmp["times"])
  818. success := true //数据是否下载成功的标志
  819. delete(tmp, "_id")
  820. delete(tmp, "times")
  821. data := map[string]interface{}{}
  822. var err interface{}
  823. for k, v := range tmp {
  824. data[k] = v
  825. }
  826. //下载、解析、入库
  827. data, err = s.DownloadDetailPage(tmp, data)
  828. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  829. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  830. }
  831. if err != nil || data == nil {
  832. success = false
  833. times++
  834. if err != nil {
  835. logger.Error(s.Code, err, tmp)
  836. if len(tmp) > 0 {
  837. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  838. }
  839. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  840. DownloadErrorData(s.Code, tmp)
  841. }*/
  842. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  843. log.Println("beforeHref:", href, "afterHref:", tmphref)
  844. //增量
  845. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  846. //全量
  847. db := HexToBigIntMod(href)
  848. hashHref := HexText(href)
  849. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  850. if !isExist {
  851. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  852. }
  853. }
  854. if !success { //下载失败更新次数和状态
  855. ss := map[string]interface{}{"times": times}
  856. if times >= 3 { //3次下载失败今天不再下载,state置为1
  857. ss["state"] = -1
  858. }
  859. set := map[string]interface{}{"$set": ss}
  860. Mgo.Update("spider_highlistdata", query, set, false, false)
  861. continue
  862. }
  863. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  864. if t1 > time.Now().Unix() { //防止发布时间超前
  865. data["publishtime"] = time.Now().Unix()
  866. }
  867. delete(data, "exit")
  868. delete(data, "checkpublishtime")
  869. data["comeintime"] = time.Now().Unix()
  870. //计数
  871. tmpsp1, b := Allspiders.Load(s.Code)
  872. if b {
  873. sp1, ok := tmpsp1.(*Spider)
  874. if ok {
  875. atomic.AddInt32(&sp1.LastDowncount, 1)
  876. atomic.AddInt32(&sp1.TodayDowncount, 1)
  877. atomic.AddInt32(&sp1.TotalDowncount, 1)
  878. }
  879. }
  880. data["spidercode"] = s.Code
  881. data["dataging"] = 0
  882. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  883. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  884. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  885. Mgo.Update("spider_highlistdata", query, set, false, false)
  886. }
  887. //重载spider
  888. s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true)
  889. } else { //没有数据
  890. time.Sleep(2 * time.Minute)
  891. }
  892. //s.GetListDataDownloadDetail() //开始下一轮
  893. } else {
  894. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  895. break
  896. }
  897. }
  898. }
  899. //队列模式根据列表页数据下载三级页
  900. func (s *Spider) DownloadListDetail() {
  901. defer qu.Catch()
  902. defer func() { //爬虫下载完三级页数据或无下载数据,使用后close
  903. s.Stop = true
  904. if _, b := Allspiders2.Load(s.Code); b {
  905. Allspiders2.Store(s.Code, s)
  906. }
  907. s.L.Close()
  908. CC2 <- s.L
  909. }()
  910. comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
  911. isEsRepeat := false //是否进行es判重
  912. if delayDay := DelaySites[s.Name]; delayDay > 0 {
  913. isEsRepeat = true
  914. if delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay天采集(由于7410、7500、7700为顺序采集,无法延时)
  915. //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
  916. comeintimeQuery["$lte"] = time.Now().Unix() - int64(86400*delayDay)
  917. }
  918. }
  919. q := map[string]interface{}{
  920. "spidercode": s.Code,
  921. "state": 0, //0:入库状态;-1:采集失败;1:成功
  922. "comeintime": comeintimeQuery,
  923. }
  924. o := map[string]interface{}{"_id": -1}
  925. f := map[string]interface{}{
  926. "state": 0,
  927. "comeintime": 0,
  928. "event": 0,
  929. }
  930. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  931. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1下载数据心跳
  932. }
  933. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  934. if list != nil && len(*list) > 0 {
  935. for _, tmp := range *list {
  936. _id := tmp["_id"]
  937. query := map[string]interface{}{"_id": _id}
  938. href := qu.ObjToString(tmp["href"])
  939. //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
  940. //为了避免重复下载,进行增量redis判重
  941. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  942. if isExist {
  943. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  944. Mgo.Update("spider_highlistdata", query, set, false, false)
  945. continue
  946. }
  947. if isEsRepeat { //es数据title判重
  948. title := qu.ObjToString(tmp["title"])
  949. eTime := time.Now().Unix()
  950. sTime := eTime - int64(7*86400)
  951. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  952. if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
  953. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  954. Mgo.Update("spider_highlistdata", query, set, false, false)
  955. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  956. continue
  957. }
  958. }
  959. times := qu.IntAll(tmp["times"])
  960. success := true //数据是否下载成功的标志
  961. delete(tmp, "_id")
  962. delete(tmp, "times")
  963. data := map[string]interface{}{}
  964. var err interface{}
  965. for k, v := range tmp {
  966. data[k] = v
  967. }
  968. //下载、解析、入库
  969. data, err = s.DownloadDetailPage(tmp, data)
  970. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  971. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  972. }
  973. if err != nil || data == nil {
  974. success = false
  975. times++
  976. if err != nil {
  977. logger.Error(s.Code, err, tmp)
  978. if len(tmp) > 0 {
  979. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  980. }
  981. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  982. DownloadErrorData(s.Code, tmp)
  983. }*/
  984. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  985. log.Println("beforeHref:", href, "afterHref:", tmphref)
  986. //增量
  987. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  988. //全量
  989. db := HexToBigIntMod(href)
  990. hashHref := HexText(href)
  991. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  992. if !isExist {
  993. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  994. }
  995. }
  996. if !success { //下载失败更新次数和状态
  997. ss := map[string]interface{}{"times": times}
  998. if times >= 3 { //3次下载失败今天不再下载,state置为1
  999. ss["state"] = -1
  1000. }
  1001. set := map[string]interface{}{"$set": ss}
  1002. Mgo.Update("spider_highlistdata", query, set, false, false)
  1003. continue
  1004. }
  1005. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  1006. if t1 > time.Now().Unix() { //防止发布时间超前
  1007. data["publishtime"] = time.Now().Unix()
  1008. }
  1009. delete(data, "exit")
  1010. delete(data, "checkpublishtime")
  1011. data["comeintime"] = time.Now().Unix()
  1012. //计数
  1013. tmpsp1, b := Allspiders.Load(s.Code)
  1014. if b {
  1015. sp1, ok := tmpsp1.(*Spider)
  1016. if ok {
  1017. atomic.AddInt32(&sp1.LastDowncount, 1)
  1018. atomic.AddInt32(&sp1.TodayDowncount, 1)
  1019. atomic.AddInt32(&sp1.TotalDowncount, 1)
  1020. }
  1021. }
  1022. data["spidercode"] = s.Code
  1023. data["dataging"] = 0
  1024. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  1025. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  1026. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  1027. Mgo.Update("spider_highlistdata", query, set, false, false)
  1028. }
  1029. }
  1030. }
  1031. //获取随机数
  1032. func GetRandMath(num int) int {
  1033. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1034. return r.Intn(num)
  1035. }
  1036. //获取hascode
  1037. func GetHas1(data string) string {
  1038. t := sha1.New()
  1039. io.WriteString(t, data)
  1040. hf := Reg.FindString(data)
  1041. if !strings.HasSuffix(hf, "/") {
  1042. hf = hf + "/"
  1043. }
  1044. return hf + fmt.Sprintf("%x", t.Sum(nil))
  1045. }
  1046. //对href哈希取模
  1047. func HexToBigIntMod(href string) int {
  1048. //取哈希值
  1049. t := sha256.New()
  1050. io.WriteString(t, href)
  1051. hex := fmt.Sprintf("%x", t.Sum(nil))
  1052. //取模
  1053. n := new(big.Int)
  1054. n, _ = n.SetString(hex[2:], 16)
  1055. return int(n.Mod(n, big.NewInt(16)).Int64())
  1056. }
  1057. //求hash
  1058. func HexText(href string) string {
  1059. h := sha256.New()
  1060. h.Write([]byte(href))
  1061. return fmt.Sprintf("%x", h.Sum(nil))
  1062. }
  1063. //func RedisIsExist(href string) bool {
  1064. // isExist := false
  1065. // if len(href) > 75 { //取href的哈希判断是否存在
  1066. // hashHref := GetHas1(href)
  1067. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref)
  1068. // }
  1069. // if !isExist { //取string href判断是否存在
  1070. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  1071. // }
  1072. // return isExist
  1073. //}
  1074. //判断发布时间是否在三天内
  1075. //func WithinThreeDays(data *map[string]interface{}) {
  1076. // withinThreeDays := false
  1077. // //根据发布时间打标记
  1078. // publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间
  1079. // //发布时间
  1080. // now := time.Now().Unix()
  1081. // if now-publishtime > 259200 { //三天前数据
  1082. // withinThreeDays = false
  1083. // } else {
  1084. // withinThreeDays = true
  1085. // }
  1086. // if withinThreeDays {
  1087. // //qu.Debug("发布时间在三天内")
  1088. // (*data)["dataging"] = 0
  1089. // } else {
  1090. // //qu.Debug("发布时间在三天外")
  1091. // (*data)["dataging"] = 1
  1092. // }
  1093. //}