spider.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. mgo "mongodb"
  15. qu "qfw/util"
  16. mgu "qfw/util/mongodbutil"
  17. "strconv"
  18. //mgu "qfw/util/mongodbutil"
  19. //"qfw/util/redis"
  20. "regexp"
  21. util "spiderutil"
  22. "strings"
  23. "sync/atomic"
  24. "time"
  25. "github.com/donnie4w/go-logger/logger"
  26. "github.com/yuin/gopher-lua"
  27. )
  28. type Heart struct {
  29. DetailHeart int64 //爬虫三级页执行心跳
  30. DetailExecuteHeart int64 //三级页采集到数据心跳
  31. ListHeart int64 //爬虫列表页执行心跳
  32. ModifyUser string //爬虫维护人
  33. Site string //站点
  34. Channel string //栏目
  35. }
  36. //爬虫()
  37. type Spider struct {
  38. Script
  39. Code string //代码
  40. Name string //名称
  41. Channel string //站点
  42. DownDetail bool //是否下载详细页
  43. Stop bool //停止标志
  44. Pass bool //暂停标志
  45. LastPubshTime int64 //最后发布时间
  46. LastHeartbeat int64 //最后心跳时间
  47. SpiderRunRate int64 //执行频率
  48. ExecuteOkTime int64 //任务执行成功/完成时间
  49. Collection string //写入表名
  50. Thread int64 //线程数
  51. LastExecTime int64 //最后执行时间
  52. LastDowncount int32 //最后一次下载量
  53. TodayDowncount int32 //今日下载量
  54. YesterdayDowncount int32 //昨日下载量
  55. TotalDowncount int32 //总下载量
  56. RoundCount int32 //执行轮次
  57. StoreMode int //存储模式
  58. StoreToMsgEvent int //消息类型
  59. CoverAttr string //按属性判重数据
  60. SleepBase int //基本延时
  61. SleepRand int //随机延时
  62. TargetChannelUrl string //栏目页地址
  63. UpperLimit, LowerLimit int //正常值上限、下限
  64. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  65. MUserName, MUserEmail string //维护人,维护人邮箱
  66. Index int //数组索引
  67. //历史补漏
  68. IsHistoricalMend bool //是否是历史补漏爬虫
  69. IsMustDownload bool //是否强制下载
  70. IsCompete bool //区分新老爬虫
  71. }
  72. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  73. var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
  74. var SP = make(chan bool, 5)
  75. var SPH = make(chan bool, 5)
  76. var Mgo *mgo.MongodbSim
  77. var TimeChan = make(chan bool, 1)
  78. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  79. //心跳
  80. func UpdateHeart(site, channel, code, user, t string) {
  81. if htmp, ok := SpiderHeart.Load(code); ok {
  82. if heart, ok := htmp.(*Heart); ok {
  83. if t == "list" {
  84. heart.ListHeart = time.Now().Unix()
  85. } else if t == "detail" {
  86. heart.DetailHeart = time.Now().Unix()
  87. } else if t == "detailexcute" {
  88. heart.DetailExecuteHeart = time.Now().Unix()
  89. }
  90. }
  91. } else {
  92. heart := &Heart{
  93. ModifyUser: user,
  94. Site: site,
  95. Channel: channel,
  96. }
  97. if t == "list" {
  98. heart.ListHeart = time.Now().Unix()
  99. } else if t == "detail" {
  100. heart.DetailHeart = time.Now().Unix()
  101. } else if t == "detailexcute" {
  102. heart.DetailExecuteHeart = time.Now().Unix()
  103. }
  104. SpiderHeart.Store(code, heart)
  105. }
  106. }
  107. //任务
  108. func (s *Spider) StartJob() {
  109. s.Stop = false
  110. s.Pass = false
  111. s.RoundCount++
  112. go s.ExecJob(false)
  113. }
  114. //单次执行
  115. func (s *Spider) ExecJob(reload bool) {
  116. defer func() {
  117. size_ok, size_no := 0, 0
  118. size_no_index := []interface{}{}
  119. LoopListPath.Range(func(k, v interface{}) bool {
  120. if v != nil {
  121. size_ok++
  122. } else {
  123. size_no_index = append(size_no_index, k)
  124. size_no++
  125. }
  126. return true
  127. })
  128. logger.Debug("index_", s.Index, ",", s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",废弃数量:", size_no, ",废弃位置:", size_no_index)
  129. s.ExecuteOkTime = time.Now().Unix()
  130. util.TimeSleepFunc(5*time.Second, TimeSleepChan)
  131. if util.Config.Working == 1 {
  132. s.Stop = true
  133. if _, b := Allspiders.Load(s.Code); b {
  134. Allspiders.Store(s.Code, s)
  135. }
  136. s.L.Close()
  137. CC <- s.L
  138. }
  139. }()
  140. if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
  141. s.LoadScript(s.Code, s.ScriptFile, true)
  142. }
  143. logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
  144. s.LastDowncount = 0
  145. s.LastExecTime = time.Now().Unix()
  146. s.LastHeartbeat = time.Now().Unix()
  147. s.ExecuteOkTime = 0
  148. err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
  149. if err != nil {
  150. logger.Error(s.Code, err)
  151. }
  152. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
  153. err = s.DownListPageItem() //下载列表
  154. if err != nil {
  155. logger.Error(s.Code, err)
  156. }
  157. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
  158. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  159. SpiderCodeSendToEditor(s.Code) //发送编辑器
  160. return
  161. } else {
  162. if util.Config.Working == 0 { //高性能模式
  163. /*
  164. for !s.Stop && s.Pass {
  165. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  166. }
  167. if s.Stop {
  168. return
  169. }
  170. */
  171. //if s.IsMustDownload { //历史数据下载,只跑一轮
  172. if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
  173. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  174. b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
  175. logger.Info("Delete History Code:", s.Code, b)
  176. } else {
  177. if !s.Stop { //未下架定时执行
  178. util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
  179. s.ExecJob(true)
  180. }, TimeChan)
  181. // util.TimeAfterFunc(30*time.Second, func() {
  182. // s.ExecJob(true)
  183. // }, TimeChan)
  184. } else { //下架后子线程退出
  185. return
  186. }
  187. }
  188. } else { //排队模式
  189. return
  190. }
  191. }
  192. }
  193. //获取最新时间--作为最后更新时间
  194. func (s *Spider) GetLastPublishTime() (errs interface{}) {
  195. defer mu.Catch()
  196. var lastpublishtime string
  197. //取得最后更新时间
  198. if err := s.L.CallByParam(lua.P{
  199. Fn: s.L.GetGlobal("getLastPublishTime"),
  200. NRet: 1,
  201. Protect: true,
  202. }); err != nil {
  203. //panic(s.Code + "," + err.Error())
  204. log.Println(s.Code + "," + err.Error())
  205. errs = err.Error()
  206. atomic.AddInt32(&s.Script.ErrorNum, 1)
  207. return errs
  208. }
  209. ret := s.L.Get(-1)
  210. s.L.Pop(1)
  211. if str, ok := ret.(lua.LString); ok {
  212. lastpublishtime = string(str)
  213. }
  214. if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
  215. //防止发布时间超前
  216. if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
  217. s.LastPubshTime = time.Now().Unix()
  218. } else {
  219. s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
  220. }
  221. }
  222. return nil
  223. }
  224. //下载列表
  225. func (s *Spider) DownListPageItem() (errs interface{}) {
  226. defer mu.Catch()
  227. start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
  228. tmpMax := max //临时记录最大页
  229. repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
  230. downloadAllNum := 0 //本轮采集tmpMax页总个数
  231. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
  232. max = s.GetIntVar("spiderHistoryMaxPage")
  233. }
  234. downtimes := 0 //记录某页重试次数(暂定3次)
  235. repeatPageNum := 0 //记录列表页所有连接重复的页码
  236. repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
  237. isRunRepeatList := false //是否执行列表页连续判重
  238. if util.Config.Modal == 1 && util.Config.Working == 0 && max > 1 && max < 101 { //7100 7400最大页小于101且大于1,对此部分爬虫采集列表页时进行连续5页判重
  239. isRunRepeatList = true
  240. max = 100 //设置最大页为100
  241. }
  242. for ; start <= max && !s.Stop; start++ {
  243. //qu.Debug("重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
  244. if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
  245. break
  246. }
  247. if err := s.L.CallByParam(lua.P{
  248. Fn: s.L.GetGlobal("downloadAndParseListPage"),
  249. NRet: 1,
  250. Protect: true,
  251. }, lua.LNumber(start)); err != nil {
  252. //panic(s.Code + "," + err.Error())
  253. log.Println(s.Code + "," + err.Error())
  254. errs = err.Error()
  255. atomic.AddInt32(&s.Script.ErrorNum, 1)
  256. }
  257. lv := s.L.Get(-1)
  258. s.L.Pop(1)
  259. if tbl, ok := lv.(*lua.LTable); ok {
  260. list := []map[string]interface{}{}
  261. //qu.Debug("当前页数据量:", tbl.Len())
  262. if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
  263. repeatListNum := 0 // 当前列表页连接重复个数
  264. for i := 1; i <= tabLen; i++ {
  265. v := tbl.RawGetInt(i).(*lua.LTable)
  266. tmp := util.TableToMap(v)
  267. //新增历史补漏
  268. if !s.IsHistoricalMend { //不是历史补漏
  269. tmp["dataging"] = 0 //数据中打标记dataging=0
  270. if s.DownDetail {
  271. s.DownloadDetailItem(tmp, &repeatListNum)
  272. } else {
  273. tmp["comeintime"] = time.Now().Unix()
  274. atomic.AddInt32(&s.LastDowncount, 1)
  275. atomic.AddInt32(&s.TodayDowncount, 1)
  276. atomic.AddInt32(&s.TotalDowncount, 1)
  277. href := fmt.Sprint(tmp["href"])
  278. if len(href) > 5 { //有效数据
  279. db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
  280. hashHref := HexText(href)
  281. //增量(redis默认db0)
  282. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  283. //全量(判断是否已存在防止覆盖id)
  284. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  285. if !isExist {
  286. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  287. }
  288. list = append(list, tmp)
  289. }
  290. }
  291. } else { //历史补漏
  292. s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
  293. }
  294. }
  295. if start <= tmpMax { //数量赋值
  296. repeatAllNum += repeatListNum
  297. downloadAllNum += tabLen
  298. }
  299. if start > tmpMax && isRunRepeatList { //执行连续页码判重
  300. if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
  301. //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
  302. if repeatPageNum+1 == start || repeatPageNum == 0 {
  303. repeatPageTimes++ //次数加1
  304. } else {
  305. repeatPageTimes = 0 //重复次数重置0
  306. }
  307. repeatPageNum = start //赋值页码
  308. } else { //当前start页有遗漏数据
  309. repeatPageTimes = 0
  310. repeatPageNum = 0
  311. }
  312. }
  313. if !s.IsHistoricalMend && !s.DownDetail {
  314. if len(list) > 0 { //保存信息入库
  315. StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
  316. }
  317. }
  318. } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
  319. if downtimes < 2 {
  320. downtimes++
  321. start--
  322. continue
  323. } else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  324. if repeatPageNum+1 == start {
  325. repeatPageTimes++ //次数加1
  326. } else {
  327. repeatPageTimes = 0 //重复次数重置0
  328. }
  329. repeatPageNum = start //赋值页码
  330. }
  331. }
  332. } else { //请求当前列表页失败
  333. if downtimes < 2 {
  334. downtimes++
  335. start--
  336. continue
  337. } else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  338. if repeatPageNum+1 == start {
  339. repeatPageTimes++ //次数加1
  340. } else {
  341. repeatPageTimes = 0 //重复次数重置0
  342. }
  343. repeatPageNum = start //赋值页码
  344. }
  345. }
  346. downtimes = 0 //当前页下载无误,重置下载重试次数
  347. util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
  348. //临时用,更新开始页码,附件下载
  349. // mgu.Update("luaconfig2", "editor", "editor", `{"code":"`+s.Code+`"}`,
  350. // map[string]interface{}{"$inc": map[string]interface{}{"param_common.4": 1}},
  351. // true, false)
  352. }
  353. nowTime := time.Now()
  354. sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  355. set := map[string]interface{}{
  356. "site": s.Name,
  357. "channel": s.Channel,
  358. "spidercode": s.Code,
  359. "updatetime": nowTime.Unix(),
  360. "event": util.Config.Uploadevent,
  361. "modifyuser": s.MUserName,
  362. "maxpage": tmpMax,
  363. "runrate": s.SpiderRunRate,
  364. "endpage": start,
  365. "date": sDate,
  366. }
  367. inc := map[string]interface{}{}
  368. if downloadAllNum > 0 {
  369. inc["alltimes"] = 1
  370. rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
  371. rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
  372. if rate == 1.0 {
  373. inc["oh_percent"] = 1
  374. } else if rate >= 0.9 {
  375. inc["nt_percent"] = 1
  376. } else if rate >= 0.8 {
  377. inc["et_percent"] = 1
  378. } else {
  379. inc["other_percent"] = 1
  380. }
  381. } else {
  382. inc["zero"] = 1
  383. }
  384. query := map[string]interface{}{
  385. "date": sDate,
  386. "spidercode": s.Code,
  387. }
  388. Mgo.Update("spider_downloadrate", query, map[string]interface{}{
  389. "$set": set,
  390. "$inc": inc,
  391. }, true, false)
  392. return errs
  393. }
  394. //遍历,开启三级页下载(历史补漏)
  395. func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
  396. //qu.Debug("--------------历史下载-----------------")
  397. defer mu.Catch()
  398. var err interface{}
  399. data := map[string]interface{}{}
  400. paramdata := p.(map[string]interface{})
  401. for k, v := range paramdata {
  402. data[k] = v
  403. }
  404. href := qu.ObjToString(data["href"])
  405. if len(href) <= 5 { //无效数据
  406. return
  407. }
  408. db := HexToBigIntMod(href)
  409. hashHref := HexText(href)
  410. SaveListPageData(paramdata) //存储采集记录
  411. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
  412. //log.Println("full href:", href, " isExist:", isExist)
  413. logger.Debug("full href:", href, " isExist:", isExist)
  414. if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束
  415. //qu.Debug("非强制下载redis中存在,结束")
  416. return
  417. }
  418. //qu.Debug("----------------下载、解析、入库--------------------")
  419. //下载、解析、入库
  420. data, err = s.DownloadDetailPage(paramdata, data)
  421. if err != nil || data == nil { //下载失败,结束
  422. if err != nil {
  423. logger.Error(s.Code, err, paramdata)
  424. // if len(paramdata) > 0 {
  425. // SaveErrorData(paramdata) //保存错误信息
  426. // }
  427. }
  428. return
  429. }
  430. flag := true
  431. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
  432. if s.IsMustDownload { //强制下载
  433. if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
  434. //qu.Debug("强制下载 redis存在")
  435. data["dataging"] = 1
  436. flag = false
  437. } else {
  438. //qu.Debug("强制下载 redis不存在")
  439. data["dataging"] = 0
  440. //WithinThreeDays(&data) //根据发布时间打标记
  441. }
  442. } else { //非强制下载
  443. if !isExist {
  444. //qu.Debug("非强制下载 redis不存在")
  445. data["dataging"] = 0
  446. //WithinThreeDays(&data) //根据发布时间打标记
  447. }
  448. }
  449. if t1 > time.Now().Unix() { //防止发布时间超前
  450. data["publishtime"] = time.Now().Unix()
  451. }
  452. delete(data, "exit")
  453. delete(data, "checkpublishtime")
  454. data["comeintime"] = time.Now().Unix()
  455. atomic.AddInt32(&s.LastDowncount, 1)
  456. atomic.AddInt32(&s.TodayDowncount, 1)
  457. atomic.AddInt32(&s.TotalDowncount, 1)
  458. data["spidercode"] = s.Code
  459. //qu.Debug("--------------开始保存---------------")
  460. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  461. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
  462. //qu.Debug("--------------保存结束---------------")
  463. }
  464. //遍历,开启三级页下载(增量)
  465. func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
  466. defer mu.Catch()
  467. var err interface{}
  468. //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
  469. data := map[string]interface{}{}
  470. paramdata := p.(map[string]interface{})
  471. for k, v := range paramdata {
  472. data[k] = v
  473. }
  474. href := qu.ObjToString(data["href"])
  475. if len(href) <= 5 { //无效数据
  476. *num++ //视为已采集
  477. return
  478. }
  479. /*
  480. //查询增量redis查看信息是否已经下载
  481. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  482. if isExist { //更新redis生命周期
  483. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  484. *num++ //已采集
  485. return
  486. }
  487. log.Println("href had++:", isExist, href)
  488. */
  489. if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息
  490. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  491. if isExist { //更新redis生命周期
  492. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  493. *num++ //已采集
  494. return
  495. }
  496. SaveHighListPageData(paramdata, href, num)
  497. return
  498. } else {
  499. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
  500. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  501. if isExist { //更新redis生命周期
  502. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  503. *num++ //已采集
  504. return
  505. }
  506. SaveListPageData(paramdata) //保存7000、7500、7700节点列表页采集的信息
  507. }
  508. //下载、解析、入库
  509. data, err = s.DownloadDetailPage(paramdata, data)
  510. if err != nil || data == nil {
  511. if err != nil {
  512. logger.Error(s.Code, err, paramdata)
  513. if len(paramdata) > 0 {
  514. SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
  515. }
  516. }
  517. return
  518. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  519. log.Println("beforeHref:", href, "afterHref:", tmphref)
  520. //增量
  521. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  522. //全量
  523. db := HexToBigIntMod(href)
  524. hashHref := HexText(href)
  525. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  526. if !isExist {
  527. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  528. }
  529. }
  530. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  531. if t1 > time.Now().Unix() { //防止发布时间超前
  532. data["publishtime"] = time.Now().Unix()
  533. }
  534. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
  535. delete(data, "exit")
  536. delete(data, "checkpublishtime")
  537. data["comeintime"] = time.Now().Unix()
  538. atomic.AddInt32(&s.LastDowncount, 1)
  539. atomic.AddInt32(&s.TodayDowncount, 1)
  540. atomic.AddInt32(&s.TotalDowncount, 1)
  541. data["spidercode"] = s.Code
  542. //qu.Debug("-----增量开始保存-----")
  543. // 临时保存数据
  544. // update := []map[string]interface{}{}
  545. // _id := data["_id"].(string)
  546. // update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)})
  547. // update = append(update, map[string]interface{}{
  548. // "$set": map[string]interface{}{
  549. // "jsondata": data["jsondata"],
  550. // },
  551. // })
  552. // UpdataMgoCache <- update
  553. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  554. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  555. //qu.Debug("-----增量保存结束-----")
  556. }
  557. //遍历下载名录
  558. func (s *Spider) DownloadDetailByNames(p interface{}) {
  559. defer mu.Catch()
  560. var err interface{}
  561. /*
  562. if s.Stop {
  563. return
  564. }
  565. for s.Pass {
  566. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  567. }
  568. */
  569. //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
  570. data := map[string]interface{}{}
  571. paramdata := p.(map[string]interface{})
  572. for k, v := range paramdata {
  573. data[k] = v
  574. }
  575. if s.DownDetail {
  576. href := qu.ObjToString(data["href"])
  577. if href == "" || len(href) < 5 { //无效数据
  578. return
  579. }
  580. //下载、解析、入库
  581. data, err = s.DownloadDetailPage(paramdata, data)
  582. if err != nil {
  583. logger.Error(s.Code, paramdata, err)
  584. return
  585. }
  586. }
  587. data["comeintime"] = time.Now().Unix()
  588. atomic.AddInt32(&s.LastDowncount, 1)
  589. atomic.AddInt32(&s.TodayDowncount, 1)
  590. atomic.AddInt32(&s.TotalDowncount, 1)
  591. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  592. }
  593. //下载解析内容页
  594. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  595. defer mu.Catch()
  596. s.LastHeartbeat = time.Now().Unix()
  597. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  598. tab := s.L.NewTable()
  599. for k, v := range param {
  600. if val, ok := v.(string); ok {
  601. tab.RawSet(lua.LString(k), lua.LString(val))
  602. } else if val, ok := v.(int64); ok {
  603. tab.RawSet(lua.LString(k), lua.LNumber(val))
  604. } else if val, ok := v.(int32); ok {
  605. tab.RawSet(lua.LString(k), lua.LNumber(val))
  606. } else if val, ok := v.(float64); ok {
  607. tab.RawSet(lua.LString(k), lua.LNumber(val))
  608. } else if val, ok := v.(float32); ok {
  609. tab.RawSet(lua.LString(k), lua.LNumber(val))
  610. } else if val, ok := v.(bool); ok {
  611. tab.RawSet(lua.LString(k), lua.LBool(val))
  612. }
  613. }
  614. var err error
  615. if err = s.L.CallByParam(lua.P{
  616. Fn: s.L.GetGlobal("downloadDetailPage"),
  617. NRet: 1,
  618. Protect: true,
  619. }, tab); err != nil {
  620. //panic(s.Code + "," + err.Error())
  621. log.Println(s.Code + "," + err.Error())
  622. atomic.AddInt32(&s.Script.ErrorNum, 1)
  623. return data, err
  624. }
  625. lv := s.L.Get(-1)
  626. s.L.Pop(1)
  627. //拼map
  628. if v3, ok := lv.(*lua.LTable); ok {
  629. v3.ForEach(func(k, v lua.LValue) {
  630. if tmp, ok := k.(lua.LString); ok {
  631. key := string(tmp)
  632. if value, ok := v.(lua.LString); ok {
  633. data[key] = string(value)
  634. } else if value, ok := v.(lua.LNumber); ok {
  635. data[key] = value
  636. } else if value, ok := v.(*lua.LTable); ok {
  637. tmp := util.TableToMap(value)
  638. data[key] = tmp
  639. }
  640. }
  641. })
  642. return data, err
  643. } else {
  644. return nil, err
  645. }
  646. }
  647. //高性能模式定时采集三级页信息
  648. func DetailData() {
  649. defer qu.Catch()
  650. <-InitAllLuaOver //脚本加载完毕,执行
  651. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  652. GetListDataDownloadDetail()
  653. }
  654. }
  655. func GetListDataDownloadDetail() {
  656. defer qu.Catch()
  657. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  658. Allspiders2.Range(func(k, v interface{}) bool {
  659. sp := v.(*Spider)
  660. go sp.DownloadHighDetail()
  661. time.Sleep(2 * time.Second)
  662. return true
  663. })
  664. }
  665. //高性能模式根据列表页数据下载三级页
  666. func (s *Spider) DownloadHighDetail() {
  667. defer qu.Catch()
  668. for {
  669. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  670. if !s.Stop { //爬虫是运行状态
  671. q := map[string]interface{}{
  672. "spidercode": s.Code,
  673. "state": 0, //0:入库状态;-1:采集失败;1:成功
  674. "comeintime": map[string]interface{}{ //采集一周内的数据,防止有数据一直采不下来,造成积累
  675. "$gte": GetTime(-util.Config.DayNum),
  676. },
  677. }
  678. o := map[string]interface{}{"_id": -1}
  679. f := map[string]interface{}{
  680. "state": 0,
  681. "comeintime": 0,
  682. "event": 0,
  683. }
  684. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  685. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  686. if list != nil && len(*list) > 0 {
  687. for _, tmp := range *list {
  688. _id := tmp["_id"]
  689. query := map[string]interface{}{"_id": _id}
  690. competehref := qu.ObjToString(tmp["competehref"])
  691. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  692. title := qu.ObjToString(tmp["title"])
  693. one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
  694. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  695. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  696. Mgo.Update("spider_highlistdata", query, set, false, false)
  697. continue
  698. }
  699. }
  700. times := qu.IntAll(tmp["times"])
  701. success := true //数据是否下载成功的标志
  702. delete(tmp, "_id")
  703. delete(tmp, "times")
  704. href := qu.ObjToString(tmp["href"])
  705. data := map[string]interface{}{}
  706. var err interface{}
  707. for k, v := range tmp {
  708. data[k] = v
  709. }
  710. //下载、解析、入库
  711. data, err = s.DownloadDetailPage(tmp, data)
  712. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  713. if err != nil || data == nil {
  714. success = false
  715. times++
  716. if err != nil {
  717. logger.Error(s.Code, err, tmp)
  718. if len(tmp) > 0 {
  719. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  720. }
  721. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  722. DownloadErrorData(s.Code, tmp)
  723. }*/
  724. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  725. log.Println("beforeHref:", href, "afterHref:", tmphref)
  726. //增量
  727. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  728. //全量
  729. db := HexToBigIntMod(href)
  730. hashHref := HexText(href)
  731. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  732. if !isExist {
  733. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  734. }
  735. }
  736. if !success { //下载失败更新次数和状态
  737. ss := map[string]interface{}{"times": times}
  738. if times >= 3 { //3次下载失败今天不再下载,state置为1
  739. ss["state"] = -1
  740. }
  741. set := map[string]interface{}{"$set": ss}
  742. Mgo.Update("spider_highlistdata", query, set, false, false)
  743. continue
  744. }
  745. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  746. if t1 > time.Now().Unix() { //防止发布时间超前
  747. data["publishtime"] = time.Now().Unix()
  748. }
  749. delete(data, "exit")
  750. delete(data, "checkpublishtime")
  751. data["comeintime"] = time.Now().Unix()
  752. //计数
  753. tmpsp1, b := Allspiders.Load(s.Code)
  754. if b {
  755. sp1, ok := tmpsp1.(*Spider)
  756. if ok {
  757. atomic.AddInt32(&sp1.LastDowncount, 1)
  758. atomic.AddInt32(&sp1.TodayDowncount, 1)
  759. atomic.AddInt32(&sp1.TotalDowncount, 1)
  760. }
  761. }
  762. data["spidercode"] = s.Code
  763. data["dataging"] = 0
  764. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  765. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  766. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  767. Mgo.Update("spider_highlistdata", query, set, false, false)
  768. }
  769. //重载spider
  770. s.LoadScript(s.Code, s.ScriptFile, true)
  771. } else { //没有数据
  772. time.Sleep(2 * time.Minute)
  773. }
  774. //s.GetListDataDownloadDetail() //开始下一轮
  775. } else {
  776. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  777. break
  778. }
  779. }
  780. }
  781. //队列模式根据列表页数据下载三级页
  782. func (s *Spider) DownloadListDetail() {
  783. defer qu.Catch()
  784. defer func() { //爬虫下载完三级页数据或无下载数据,使用后close
  785. s.Stop = true
  786. if _, b := Allspiders2.Load(s.Code); b {
  787. Allspiders2.Store(s.Code, s)
  788. }
  789. s.L.Close()
  790. CC2 <- s.L
  791. }()
  792. q := map[string]interface{}{
  793. "spidercode": s.Code,
  794. "state": 0, //0:入库状态;-1:采集失败;1:成功
  795. "comeintime": map[string]interface{}{ //采集一周内的数据,防止有数据一直采不下来,造成积累
  796. "$gte": GetTime(-util.Config.DayNum),
  797. },
  798. }
  799. o := map[string]interface{}{"_id": -1}
  800. f := map[string]interface{}{
  801. "state": 0,
  802. "comeintime": 0,
  803. "event": 0,
  804. }
  805. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  806. list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
  807. if list != nil && len(*list) > 0 {
  808. for _, tmp := range *list {
  809. _id := tmp["_id"]
  810. query := map[string]interface{}{"_id": _id}
  811. competehref := qu.ObjToString(tmp["competehref"])
  812. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  813. title := qu.ObjToString(tmp["title"])
  814. one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
  815. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  816. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  817. Mgo.Update("spider_highlistdata", query, set, false, false)
  818. continue
  819. }
  820. }
  821. times := qu.IntAll(tmp["times"])
  822. success := true //数据是否下载成功的标志
  823. delete(tmp, "_id")
  824. delete(tmp, "times")
  825. href := qu.ObjToString(tmp["href"])
  826. data := map[string]interface{}{}
  827. var err interface{}
  828. for k, v := range tmp {
  829. data[k] = v
  830. }
  831. //下载、解析、入库
  832. data, err = s.DownloadDetailPage(tmp, data)
  833. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  834. if err != nil || data == nil {
  835. success = false
  836. times++
  837. if err != nil {
  838. logger.Error(s.Code, err, tmp)
  839. if len(tmp) > 0 {
  840. SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  841. }
  842. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  843. DownloadErrorData(s.Code, tmp)
  844. }*/
  845. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  846. log.Println("beforeHref:", href, "afterHref:", tmphref)
  847. //增量
  848. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  849. //全量
  850. db := HexToBigIntMod(href)
  851. hashHref := HexText(href)
  852. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  853. if !isExist {
  854. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  855. }
  856. }
  857. if !success { //下载失败更新次数和状态
  858. ss := map[string]interface{}{"times": times}
  859. if times >= 3 { //3次下载失败今天不再下载,state置为1
  860. ss["state"] = -1
  861. }
  862. set := map[string]interface{}{"$set": ss}
  863. Mgo.Update("spider_highlistdata", query, set, false, false)
  864. continue
  865. }
  866. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  867. if t1 > time.Now().Unix() { //防止发布时间超前
  868. data["publishtime"] = time.Now().Unix()
  869. }
  870. delete(data, "exit")
  871. delete(data, "checkpublishtime")
  872. data["comeintime"] = time.Now().Unix()
  873. //计数
  874. tmpsp1, b := Allspiders.Load(s.Code)
  875. if b {
  876. sp1, ok := tmpsp1.(*Spider)
  877. if ok {
  878. atomic.AddInt32(&sp1.LastDowncount, 1)
  879. atomic.AddInt32(&sp1.TodayDowncount, 1)
  880. atomic.AddInt32(&sp1.TotalDowncount, 1)
  881. }
  882. }
  883. data["spidercode"] = s.Code
  884. data["dataging"] = 0
  885. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  886. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  887. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  888. Mgo.Update("spider_highlistdata", query, set, false, false)
  889. }
  890. }
  891. }
  892. //获取随机数
  893. func GetRandMath(num int) int {
  894. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  895. return r.Intn(num)
  896. }
  897. //获取hascode
  898. func GetHas1(data string) string {
  899. t := sha1.New()
  900. io.WriteString(t, data)
  901. hf := Reg.FindString(data)
  902. if !strings.HasSuffix(hf, "/") {
  903. hf = hf + "/"
  904. }
  905. return hf + fmt.Sprintf("%x", t.Sum(nil))
  906. }
  907. //对href哈希取模
  908. func HexToBigIntMod(href string) int {
  909. //取哈希值
  910. t := sha256.New()
  911. io.WriteString(t, href)
  912. hex := fmt.Sprintf("%x", t.Sum(nil))
  913. //取模
  914. n := new(big.Int)
  915. n, _ = n.SetString(hex[2:], 16)
  916. return int(n.Mod(n, big.NewInt(16)).Int64())
  917. }
  918. //求hash
  919. func HexText(href string) string {
  920. h := sha256.New()
  921. h.Write([]byte(href))
  922. return fmt.Sprintf("%x", h.Sum(nil))
  923. }
  924. //func RedisIsExist(href string) bool {
  925. // isExist := false
  926. // if len(href) > 75 { //取href的哈希判断是否存在
  927. // hashHref := GetHas1(href)
  928. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref)
  929. // }
  930. // if !isExist { //取string href判断是否存在
  931. // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  932. // }
  933. // return isExist
  934. //}
  935. //判断发布时间是否在三天内
  936. //func WithinThreeDays(data *map[string]interface{}) {
  937. // withinThreeDays := false
  938. // //根据发布时间打标记
  939. // publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间
  940. // //发布时间
  941. // now := time.Now().Unix()
  942. // if now-publishtime > 259200 { //三天前数据
  943. // withinThreeDays = false
  944. // } else {
  945. // withinThreeDays = true
  946. // }
  947. // if withinThreeDays {
  948. // //qu.Debug("发布时间在三天内")
  949. // (*data)["dataging"] = 0
  950. // } else {
  951. // //qu.Debug("发布时间在三天外")
  952. // (*data)["dataging"] = 1
  953. // }
  954. //}