spider.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "fmt"
  8. "io"
  9. "log"
  10. mu "mfw/util"
  11. mgo "mongodb"
  12. qu "qfw/util"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. es "qfw/util/elastic"
  17. "regexp"
  18. util "spiderutil"
  19. "sync/atomic"
  20. "time"
  21. "github.com/donnie4w/go-logger/logger"
  22. "github.com/yuin/gopher-lua"
  23. )
  24. //心跳
  25. type Heart struct {
  26. DetailHeart int64 //爬虫三级页执行心跳
  27. DetailExecuteHeart int64 //三级页采集到数据心跳
  28. FindListHeart int64 //findListHtml执行心跳
  29. ListHeart int64 //爬虫列表页执行心跳
  30. ModifyUser string //爬虫维护人
  31. Site string //站点
  32. Channel string //栏目
  33. }
  34. //流量
  35. type SpiderFlow struct {
  36. Flow int64 //流量
  37. ModifyUser string //爬虫维护人
  38. Site string //站点
  39. Channel string //栏目
  40. //Code string
  41. }
  42. //爬虫()
  43. type Spider struct {
  44. Script
  45. Code string //代码
  46. Name string //名称
  47. Channel string //站点
  48. DownDetail bool //是否下载详细页
  49. Stop bool //停止标志
  50. Pass bool //暂停标志
  51. LastPubshTime int64 //最后发布时间
  52. LastHeartbeat int64 //最后心跳时间
  53. SpiderRunRate int64 //执行频率
  54. ExecuteOkTime int64 //任务执行成功/完成时间
  55. Collection string //写入表名
  56. Thread int64 //线程数
  57. LastExecTime int64 //最后执行时间
  58. LastDowncount int32 //最后一次下载量
  59. TodayDowncount int32 //今日下载量
  60. YesterdayDowncount int32 //昨日下载量
  61. TotalDowncount int32 //总下载量
  62. RoundCount int32 //执行轮次
  63. StoreMode int //存储模式
  64. StoreToMsgEvent int //消息类型
  65. CoverAttr string //按属性判重数据
  66. SleepBase int //基本延时
  67. SleepRand int //随机延时
  68. TargetChannelUrl string //栏目页地址
  69. UpperLimit, LowerLimit int //正常值上限、下限
  70. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  71. MUserName, MUserEmail string //维护人,维护人邮箱
  72. //Index int //数组索引
  73. //历史补漏
  74. IsHistoricalMend bool //是否是历史补漏爬虫
  75. IsMustDownload bool //是否强制下载
  76. IsCompete bool //区分新老爬虫
  77. Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
  78. IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程)
  79. }
  80. var Es *es.Elastic
  81. var EsIndex string
  82. var EsType string
  83. var MgoS *mgo.MongodbSim
  84. //var MgoE *mgo.MongodbSim
  85. var MgoEB *mgo.MongodbSim
  86. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  87. var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
  88. var SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据
  89. var SP = make(chan bool, 5)
  90. var SPH = make(chan bool, 5)
  91. var SPS = make(chan bool, 5)
  92. var TimeChan = make(chan bool, 1)
  93. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  94. var DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`)
  95. var RepDomainNameReg = regexp.MustCompile(`[::/]+`)
  96. var RestrictAccessReg = regexp.MustCompile(`访问被拒绝`)
  97. var Today string
  98. var SpiderFlowMap = sync.Map{} //code:{"2022-05-16":SpiderFlow}
  99. var AllThreadNum int64
  100. var DelaySiteMap map[string]*DelaySite //延迟采集站点集合
  101. type DelaySite struct {
  102. DelayTime int
  103. Compete bool
  104. }
  105. //心跳
  106. func UpdateHeart(site, channel, code, user, t string) {
  107. //sp, spiderOk := LoopListPath.Load(code)
  108. //if spiderOk && sp != nil {
  109. if htmp, ok := SpiderHeart.Load(code); ok {
  110. if heart, ok := htmp.(*Heart); ok {
  111. if t == "list" {
  112. heart.ListHeart = time.Now().Unix()
  113. } else if t == "findlist" {
  114. heart.FindListHeart = time.Now().Unix()
  115. } else if t == "detail" {
  116. heart.DetailHeart = time.Now().Unix()
  117. } else if t == "detailexcute" {
  118. heart.DetailExecuteHeart = time.Now().Unix()
  119. }
  120. }
  121. } else {
  122. heart := &Heart{
  123. ModifyUser: user,
  124. Site: site,
  125. Channel: channel,
  126. }
  127. if t == "list" {
  128. heart.ListHeart = time.Now().Unix()
  129. } else if t == "findlist" {
  130. heart.FindListHeart = time.Now().Unix()
  131. } else if t == "detail" {
  132. heart.DetailHeart = time.Now().Unix()
  133. } else if t == "detailexcute" {
  134. heart.DetailExecuteHeart = time.Now().Unix()
  135. }
  136. SpiderHeart.Store(code, heart)
  137. }
  138. //}
  139. }
  140. //任务
  141. func (s *Spider) StartJob() {
  142. s.Stop = false
  143. s.Pass = false
  144. s.RoundCount++
  145. go s.ExecJob(false)
  146. }
  147. //单次执行
  148. func (s *Spider) ExecJob(reload bool) {
  149. defer func() {
  150. size_ok, size_no := 0, 0
  151. size_no_index := []interface{}{}
  152. LoopListPath.Range(func(k, v interface{}) bool {
  153. if v != nil {
  154. size_ok++
  155. } else {
  156. size_no_index = append(size_no_index, k)
  157. size_no++
  158. }
  159. return true
  160. })
  161. logger.Debug(s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",下线数量:", size_no, ",下线爬虫:", size_no_index)
  162. s.ExecuteOkTime = time.Now().Unix()
  163. util.TimeSleepFunc(5*time.Second, TimeSleepChan)
  164. if util.Config.Working == 1 {
  165. s.Stop = true
  166. if _, b := Allspiders.Load(s.Code); b {
  167. Allspiders.Store(s.Code, s)
  168. }
  169. s.L.Close()
  170. CC <- s.L
  171. }
  172. }()
  173. if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
  174. s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
  175. }
  176. logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
  177. s.LastDowncount = 0
  178. s.LastExecTime = time.Now().Unix()
  179. s.LastHeartbeat = time.Now().Unix()
  180. s.ExecuteOkTime = 0
  181. err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
  182. if err != nil {
  183. logger.Error(s.Code, err)
  184. }
  185. err = s.DownListPageItem() //下载列表
  186. if err != nil {
  187. logger.Error(s.Code, err)
  188. }
  189. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
  190. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  191. SpiderCodeSendToEditor(s.Code) //历史转增量爬虫发送编辑器,切换节点上下架
  192. return
  193. } else {
  194. if util.Config.Working == 0 { //高性能模式
  195. /*
  196. for !s.Stop && s.Pass {
  197. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  198. }
  199. if s.Stop {
  200. return
  201. }
  202. */
  203. //if s.IsMustDownload { //历史数据下载,只跑一轮
  204. if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
  205. UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
  206. b := MgoEB.Update("luaconfig", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
  207. logger.Info("Delete History Code:", s.Code, b)
  208. } else {
  209. if !s.Stop { //未下架定时执行
  210. util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
  211. s.ExecJob(true)
  212. }, TimeChan)
  213. // util.TimeAfterFunc(30*time.Second, func() {
  214. // s.ExecJob(true)
  215. // }, TimeChan)
  216. } else { //下架后子线程退出
  217. return
  218. }
  219. }
  220. } else { //排队模式
  221. return
  222. }
  223. }
  224. }
  225. //获取最新时间--作为最后更新时间
  226. func (s *Spider) GetLastPublishTime() (errs interface{}) {
  227. defer mu.Catch()
  228. var lastpublishtime string
  229. //取得最后更新时间
  230. if err := s.L.CallByParam(lua.P{
  231. Fn: s.L.GetGlobal("getLastPublishTime"),
  232. NRet: 1,
  233. Protect: true,
  234. }); err != nil {
  235. //panic(s.Code + "," + err.Error())
  236. log.Println(s.Code + "," + err.Error())
  237. errs = err.Error()
  238. atomic.AddInt32(&s.Script.ErrorNum, 1)
  239. return errs
  240. }
  241. ret := s.L.Get(-1)
  242. s.L.Pop(1)
  243. if str, ok := ret.(lua.LString); ok {
  244. lastpublishtime = string(str)
  245. }
  246. if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
  247. //防止发布时间超前
  248. if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
  249. s.LastPubshTime = time.Now().Unix()
  250. } else {
  251. s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
  252. }
  253. }
  254. return nil
  255. }
  256. //下载列表
  257. func (s *Spider) DownListPageItem() (errs interface{}) {
  258. defer mu.Catch()
  259. s.AlreadyGetPageHeart = map[int]bool{} //重置记录
  260. start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
  261. s.MaxPage = max //
  262. //tmpMax := max //临时记录最大页
  263. repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
  264. downloadAllNum := 0 //本轮采集tmpMax页总个数
  265. if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
  266. max = s.GetIntVar("spiderHistoryMaxPage")
  267. }
  268. downtimes := 0 //记录某页重试次数(暂定3次)
  269. repeatPageNum := 0 //记录列表页所有连接重复的页码
  270. repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
  271. repeatPageTimesLimit := 10 //记录页码连续判重的次数上线(高性能模式10页,队列模式5页)
  272. isRunRepeatList := false //是否执行列表页连续判重
  273. if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
  274. isRunRepeatList = true
  275. max = 100 //高性能模式设置最大页为100
  276. if util.Config.Working == 1 { //队列模式
  277. repeatPageTimesLimit = 3 //连续判重页3
  278. max = 50 //队列模式最大页50
  279. }
  280. }
  281. for ; start <= max && !s.Stop; start++ {
  282. if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  283. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
  284. }
  285. //logger.Info("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
  286. //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
  287. // break
  288. //}
  289. if isRunRepeatList && repeatPageTimes >= repeatPageTimesLimit { //重复次数超过10次,不再翻页
  290. break
  291. }
  292. if err := s.L.CallByParam(lua.P{
  293. Fn: s.L.GetGlobal("downloadAndParseListPage"),
  294. NRet: 1,
  295. Protect: true,
  296. }, lua.LNumber(start)); err != nil {
  297. //panic(s.Code + "," + err.Error())
  298. logger.Error("列表页采集报错", start, s.Code+","+err.Error())
  299. errs = err.Error()
  300. atomic.AddInt32(&s.Script.ErrorNum, 1)
  301. //列表页采集报错进行重试,超过重试次数视为该页已采
  302. if downtimes < 2 {
  303. downtimes++
  304. start--
  305. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  306. } else if isRunRepeatList { //超过重试次数,视为本页重复
  307. if repeatPageNum+1 == start {
  308. repeatPageTimes++ //次数加1
  309. } else {
  310. repeatPageTimes = 0 //重复次数重置0
  311. }
  312. repeatPageNum = start //赋值页码
  313. downtimes = 0
  314. }
  315. continue
  316. }
  317. lv := s.L.Get(-1)
  318. s.L.Pop(1)
  319. if tbl, ok := lv.(*lua.LTable); ok {
  320. list := []map[string]interface{}{}
  321. //qu.Debug("当前页数据量:", tbl.Len())
  322. if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
  323. repeatListNum := 0 // 当前列表页连接重复个数
  324. for i := 1; i <= tabLen; i++ {
  325. v := tbl.RawGetInt(i).(*lua.LTable)
  326. tmp := util.TableToMap(v)
  327. //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据
  328. if !s.IsHistoricalMend { //不是历史补漏
  329. tmp["dataging"] = 0 //数据中打标记dataging=0
  330. if s.DownDetail {
  331. s.DownloadDetailItem(tmp, &repeatListNum)
  332. } else {
  333. tmp["comeintime"] = time.Now().Unix()
  334. //atomic.AddInt32(&s.LastDowncount, 1)
  335. //atomic.AddInt32(&s.TodayDowncount, 1)
  336. //atomic.AddInt32(&s.TotalDowncount, 1)
  337. href := fmt.Sprint(tmp["href"])
  338. if len(href) > 5 { //有效数据
  339. hashHref := util.HexText(href)
  340. util.RedisClusterSet(hashHref, "", -1) //全量redis
  341. list = append(list, tmp)
  342. }
  343. }
  344. } else { //历史补漏
  345. s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
  346. }
  347. }
  348. //if start <= tmpMax { //数量赋值
  349. repeatAllNum += repeatListNum
  350. downloadAllNum += tabLen
  351. //}
  352. //if start > tmpMax && isRunRepeatList { //执行连续页码判重
  353. if isRunRepeatList { //执行连续页码判重
  354. if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
  355. //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
  356. if repeatPageNum+1 == start || repeatPageNum == 0 {
  357. repeatPageTimes++ //次数加1
  358. } else {
  359. repeatPageTimes = 0 //重复次数重置0
  360. }
  361. repeatPageNum = start //赋值页码
  362. } else { //当前start页有遗漏数据
  363. repeatPageTimes = 0
  364. repeatPageNum = 0
  365. }
  366. }
  367. if !s.IsHistoricalMend && !s.DownDetail {
  368. if len(list) > 0 { //保存信息入库
  369. StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
  370. }
  371. }
  372. } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
  373. if downtimes < 2 {
  374. downtimes++
  375. start--
  376. continue
  377. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  378. } else if isRunRepeatList { //超过重试次数,视为本页重复
  379. if repeatPageNum+1 == start {
  380. repeatPageTimes++ //次数加1
  381. } else {
  382. repeatPageTimes = 0 //重复次数重置0
  383. }
  384. repeatPageNum = start //赋值页码
  385. }
  386. }
  387. } else { //请求当前列表页失败
  388. if downtimes < 2 {
  389. downtimes++
  390. start--
  391. continue
  392. //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
  393. } else if isRunRepeatList { //超过重试次数,视为本页重复
  394. if repeatPageNum+1 == start {
  395. repeatPageTimes++ //次数加1
  396. } else {
  397. repeatPageTimes = 0 //重复次数重置0
  398. }
  399. repeatPageNum = start //赋值页码
  400. }
  401. }
  402. downtimes = 0 //当前页下载无误,重置下载重试次数
  403. util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
  404. }
  405. logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start, s.Stop)
  406. if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
  407. nowTime := time.Now()
  408. sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  409. set := map[string]interface{}{
  410. "site": s.Name,
  411. "channel": s.Channel,
  412. "spidercode": s.Code,
  413. "updatetime": nowTime.Unix(),
  414. "event": util.Config.Uploadevent,
  415. "modifyuser": s.MUserName,
  416. "maxpage": s.MaxPage,
  417. "runrate": s.SpiderRunRate,
  418. "endpage": start,
  419. "date": sDate,
  420. }
  421. inc := map[string]interface{}{
  422. "alltimes": 1,
  423. }
  424. //记录翻页是否成功
  425. if s.PageOneTextHash != "" {
  426. if s.PageTwoTextHash != "" {
  427. if s.PageOneTextHash != s.PageTwoTextHash {
  428. inc["page_success"] = 1
  429. } else {
  430. inc["page_fail"] = 1
  431. }
  432. } else {
  433. inc["page_fail"] = 1
  434. }
  435. } else if s.PageTwoTextHash != "" {
  436. inc["page_onefail"] = 1
  437. }
  438. if downloadAllNum > 0 {
  439. rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
  440. rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
  441. if rate == 1.0 {
  442. inc["oh_percent"] = 1
  443. } else if rate >= 0.9 {
  444. inc["nt_percent"] = 1
  445. } else if rate >= 0.8 {
  446. inc["et_percent"] = 1
  447. } else {
  448. inc["other_percent"] = 1
  449. }
  450. } else {
  451. inc["zero"] = 1
  452. }
  453. query := map[string]interface{}{
  454. "date": sDate,
  455. "spidercode": s.Code,
  456. }
  457. MgoS.Update("spider_downloadrate", query, map[string]interface{}{
  458. "$set": set,
  459. "$inc": inc,
  460. }, true, false)
  461. }
  462. //信息重置
  463. s.RecordedHeartInfo = false
  464. s.PageOneTextHash = ""
  465. s.PageTwoTextHash = ""
  466. return errs
  467. }
  468. func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
  469. defer qu.Catch()
  470. href := qu.ObjToString(tmp["href"])
  471. url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "")
  472. href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "")
  473. if url_dn != href_dn {
  474. SaveMgoCache <- map[string]interface{}{
  475. "site": s.Name,
  476. "channel": s.Channel,
  477. "spidercode": s.Code,
  478. "url": s.TargetChannelUrl,
  479. "href": href,
  480. "modifyuser": s.MUserName,
  481. "comeintime": time.Now().Unix(),
  482. }
  483. }
  484. }
  485. //遍历,开启三级页下载(历史补漏)
  486. func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
  487. //qu.Debug("--------------历史下载-----------------")
  488. defer mu.Catch()
  489. var err interface{}
  490. data := map[string]interface{}{}
  491. paramdata := p.(map[string]interface{})
  492. for k, v := range paramdata {
  493. data[k] = v
  494. }
  495. href := qu.ObjToString(data["href"])
  496. if len(href) <= 5 { //无效数据
  497. return
  498. }
  499. hashHref := util.HexText(href)
  500. isExist := util.RedisClusterExists(hashHref) //全量redis判重
  501. //logger.Debug("full href:", href, " isExist:", isExist)
  502. if !s.IsMustDownload { //非强制下载
  503. if isExist { //数据存在,直接return
  504. return
  505. } else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata
  506. num := 0
  507. SaveHighListPageData(paramdata, s.SCode, hashHref, &num)
  508. return
  509. }
  510. }
  511. //2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架
  512. id := ""
  513. SaveListPageData(paramdata, &id, false) //存储采集记录
  514. //qu.Debug("----------------下载、解析、入库--------------------")
  515. //下载详情页
  516. data, err = s.DownloadDetailPage(paramdata, data)
  517. if err != nil || data == nil { //下载失败,结束
  518. if err != nil {
  519. logger.Error(s.Code, err, paramdata)
  520. // if len(paramdata) > 0 {
  521. // SaveErrorData(paramdata) //保存错误信息
  522. // }
  523. }
  524. //更新spider_listdata中数据下载失败标记
  525. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
  526. return
  527. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  528. util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
  529. }
  530. //详情页过滤数据
  531. set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
  532. if data["delete"] != nil {
  533. util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
  534. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  535. set["delete"] = true
  536. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  537. return
  538. }
  539. //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
  540. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  541. flag := true
  542. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
  543. if s.IsMustDownload { //强制下载
  544. if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
  545. //qu.Debug("强制下载 redis存在")
  546. data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
  547. flag = false
  548. } else {
  549. //qu.Debug("强制下载 redis不存在")
  550. data["dataging"] = 0
  551. }
  552. } else { //非强制下载
  553. if !isExist {
  554. //qu.Debug("非强制下载 redis不存在")
  555. data["dataging"] = 0
  556. }
  557. }
  558. if t1 > time.Now().Unix() { //防止发布时间超前
  559. data["publishtime"] = time.Now().Unix()
  560. }
  561. delete(data, "state")
  562. delete(data, "exit")
  563. delete(data, "checkpublishtime")
  564. data["comeintime"] = time.Now().Unix()
  565. //atomic.AddInt32(&s.LastDowncount, 1)
  566. //atomic.AddInt32(&s.TodayDowncount, 1)
  567. //atomic.AddInt32(&s.TotalDowncount, 1)
  568. data["spidercode"] = s.Code
  569. //qu.Debug("--------------开始保存---------------")
  570. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  571. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
  572. //qu.Debug("--------------保存结束---------------")
  573. }
  574. //遍历,开启三级页下载(增量)
  575. func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
  576. defer mu.Catch()
  577. var err interface{}
  578. data := map[string]interface{}{}
  579. paramdata := p.(map[string]interface{})
  580. for k, v := range paramdata {
  581. data[k] = v
  582. }
  583. href := qu.ObjToString(data["href"])
  584. if len(href) <= 5 { //无效数据
  585. *num++ //视为已采集
  586. return
  587. }
  588. hashHref := util.HexText(href)
  589. id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
  590. if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //除7410、7500、7510、7700节点外所有节点只采集列表页信息
  591. isExist := util.RedisClusterExists(hashHref) //全量信息中已采集
  592. if isExist {
  593. *num++ //已采集
  594. return
  595. }
  596. SaveHighListPageData(paramdata, s.SCode, hashHref, num)
  597. return
  598. } else {
  599. if !s.Stop {
  600. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
  601. }
  602. isExist := util.RedisClusterExists(hashHref) //全量信息中已采集
  603. if isExist {
  604. *num++ //已采集
  605. return
  606. }
  607. isEsRepeat := false
  608. if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
  609. title := qu.ObjToString(paramdata["title"])
  610. eTime := time.Now().Unix()
  611. sTime := eTime - int64(7*86400)
  612. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  613. if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
  614. isEsRepeat = true
  615. }
  616. }
  617. SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息
  618. if isEsRepeat { //类竞品数据title判重数据加入redis
  619. util.RedisClusterSet(hashHref, "", -1) //全量存值
  620. return
  621. }
  622. }
  623. //下载详情页
  624. data, err = s.DownloadDetailPage(paramdata, data)
  625. if err != nil || data == nil {
  626. *num++ //顺序采集模式,在记录重复数据个数时,采集失败记为重复(避免下载失败数据每轮次采集都不会被判重,造成全采次数+1)
  627. if err != nil {
  628. logger.Error(s.Code, err, paramdata)
  629. //if len(paramdata) > 0 {
  630. // SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
  631. //}
  632. }
  633. //更新spider_listdata中数据下载失败标记
  634. MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}})
  635. return
  636. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  637. util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
  638. }
  639. //详情页下载数据成功心跳
  640. if !s.Stop {
  641. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
  642. }
  643. set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix(), "byid": id}
  644. //详情页过滤数据
  645. if data["delete"] != nil {
  646. util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
  647. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  648. set["delete"] = true
  649. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  650. return
  651. }
  652. //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
  653. MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
  654. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  655. if t1 > time.Now().Unix() { //防止发布时间超前
  656. data["publishtime"] = time.Now().Unix()
  657. }
  658. delete(data, "state")
  659. delete(data, "exit")
  660. delete(data, "checkpublishtime")
  661. data["comeintime"] = time.Now().Unix()
  662. //atomic.AddInt32(&s.LastDowncount, 1)
  663. //atomic.AddInt32(&s.TodayDowncount, 1)
  664. //atomic.AddInt32(&s.TotalDowncount, 1)
  665. data["spidercode"] = s.Code
  666. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  667. data["infoformat"] = s.Infoformat //爬虫类型
  668. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  669. }
  670. //遍历下载名录
  671. func (s *Spider) DownloadDetailByNames(p interface{}) {
  672. defer mu.Catch()
  673. var err interface{}
  674. /*
  675. if s.Stop {
  676. return
  677. }
  678. for s.Pass {
  679. util.TimeSleepFunc(2*time.Second, TimeSleepChan)
  680. }
  681. */
  682. data := map[string]interface{}{}
  683. paramdata := p.(map[string]interface{})
  684. for k, v := range paramdata {
  685. data[k] = v
  686. }
  687. if s.DownDetail {
  688. href := qu.ObjToString(data["href"])
  689. if href == "" || len(href) < 5 { //无效数据
  690. return
  691. }
  692. //下载、解析、入库
  693. data, err = s.DownloadDetailPage(paramdata, data)
  694. if err != nil {
  695. logger.Error(s.Code, paramdata, err)
  696. return
  697. }
  698. }
  699. data["comeintime"] = time.Now().Unix()
  700. //atomic.AddInt32(&s.LastDowncount, 1)
  701. //atomic.AddInt32(&s.TodayDowncount, 1)
  702. //atomic.AddInt32(&s.TotalDowncount, 1)
  703. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  704. }
  705. //下载解析详情页
  706. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  707. defer mu.Catch()
  708. s.LastHeartbeat = time.Now().Unix()
  709. util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  710. tab := s.L.NewTable()
  711. for k, v := range param {
  712. if val, ok := v.(string); ok {
  713. tab.RawSet(lua.LString(k), lua.LString(val))
  714. } else if val, ok := v.(int64); ok {
  715. tab.RawSet(lua.LString(k), lua.LNumber(val))
  716. } else if val, ok := v.(int32); ok {
  717. tab.RawSet(lua.LString(k), lua.LNumber(val))
  718. } else if val, ok := v.(float64); ok {
  719. tab.RawSet(lua.LString(k), lua.LNumber(val))
  720. } else if val, ok := v.(float32); ok {
  721. tab.RawSet(lua.LString(k), lua.LNumber(val))
  722. } else if val, ok := v.(bool); ok {
  723. tab.RawSet(lua.LString(k), lua.LBool(val))
  724. }
  725. }
  726. var err error
  727. if err = s.L.CallByParam(lua.P{
  728. Fn: s.L.GetGlobal("downloadDetailPage"),
  729. NRet: 1,
  730. Protect: true,
  731. }, tab); err != nil {
  732. //panic(s.Code + "," + err.Error())
  733. log.Println(s.Code + "," + err.Error())
  734. atomic.AddInt32(&s.Script.ErrorNum, 1)
  735. return data, err
  736. }
  737. lv := s.L.Get(-1)
  738. s.L.Pop(1)
  739. //拼map
  740. if v3, ok := lv.(*lua.LTable); ok {
  741. v3.ForEach(func(k, v lua.LValue) {
  742. if tmp, ok := k.(lua.LString); ok {
  743. key := string(tmp)
  744. if value, ok := v.(lua.LString); ok {
  745. data[key] = string(value)
  746. } else if value, ok := v.(lua.LNumber); ok {
  747. data[key] = value
  748. } else if value, ok := v.(*lua.LTable); ok {
  749. tmp := util.TableToMap(value)
  750. data[key] = tmp
  751. }
  752. }
  753. })
  754. return data, err
  755. } else {
  756. return nil, err
  757. }
  758. }
  759. //高性能模式定时采集三级页信息
  760. func DetailData() {
  761. defer qu.Catch()
  762. <-InitAllLuaOver //脚本加载完毕,执行
  763. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  764. GetListDataDownloadDetail()
  765. }
  766. }
  767. func GetListDataDownloadDetail() {
  768. defer qu.Catch()
  769. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  770. Allspiders2.Range(func(k, v interface{}) bool {
  771. sp := v.(*Spider)
  772. go sp.DownloadHighDetail(true)
  773. time.Sleep(1 * time.Second)
  774. return true
  775. })
  776. }
  777. //高性能模式根据列表页数据下载三级页
  778. func (s *Spider) DownloadHighDetail(reload bool) {
  779. defer qu.Catch()
  780. for {
  781. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  782. if !s.Stop { //爬虫是运行状态
  783. s.DownloadDetail(reload, false)
  784. } else {
  785. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  786. break
  787. }
  788. }
  789. }
  790. //队列模式根据列表页数据下载三级页
  791. func (s *Spider) DownloadListDetail(reload bool) {
  792. defer qu.Catch()
  793. s.DownloadDetail(reload, false)
  794. //队列模式爬虫下载完三级页数据或无下载数据,使用后close
  795. s.Stop = true
  796. if _, b := Allspiders2.Load(s.Code); b {
  797. Allspiders2.Store(s.Code, s)
  798. }
  799. s.L.Close()
  800. CC2 <- s.L
  801. }
  802. //下载详情页
  803. func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
  804. defer qu.Catch()
  805. coll := "spider_highlistdata"
  806. isEsRepeat := false //是否进行es判重
  807. q := map[string]interface{}{
  808. "spidercode": s.Code,
  809. "state": 0, //0:入库状态;-1:采集失败;1:成功
  810. }
  811. o := map[string]interface{}{"_id": -1}
  812. if !isHistory { //非历史数据下载,补充comeintime时间检索条件
  813. comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
  814. if delaySite := DelaySiteMap[s.Name]; delaySite != nil {
  815. isEsRepeat = delaySite.Compete
  816. if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
  817. //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
  818. comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delaySite.DelayTime)
  819. }
  820. }
  821. q["comeintime"] = comeintimeQuery
  822. } else {
  823. coll = "spider_historydata"
  824. o["_id"] = 1 //历史数据正序
  825. }
  826. f := map[string]interface{}{
  827. "state": 0,
  828. "comeintime": 0,
  829. "event": 0,
  830. }
  831. if !isHistory && !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
  832. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
  833. }
  834. countNum := MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数
  835. if isHistory && countNum == 0 { //下载历史数据量为0,手动stop
  836. s.Stop = true
  837. return
  838. }
  839. //logger.Info("Thread Info: Code:", s.SCode, " count:", countNum)
  840. if countNum > 0 {
  841. threadNum := countNum / util.Config.ThreadBaseNum //线程数
  842. if threadNum > util.Config.ThreadUpperLimit { //设置单个爬虫线程上限
  843. threadNum = util.Config.ThreadUpperLimit
  844. }
  845. logger.Info("Thread Info: Code:", s.SCode, " count:", countNum, " thread num:", threadNum)
  846. list, _ := MgoS.Find(coll, q, o, f, false, 0, 200)
  847. if list != nil && len(*list) > 0 {
  848. spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数)
  849. if threadNum > 1 { //初始化多个sp
  850. if !isHistory {
  851. if v, ok := LoopListPath.Load(s.Code); ok && v != nil {
  852. if info, ok := v.(map[string]string); ok {
  853. NewSpiderByScript(threadNum, s.Code, info, spChan)
  854. } else {
  855. logger.Debug("LoopListPath Not Has Code:", s.Code)
  856. spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
  857. }
  858. } else {
  859. logger.Debug("LoopListPath Not Has Code:", s.Code)
  860. spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
  861. }
  862. } else {
  863. info := map[string]string{
  864. "script": s.ScriptFile,
  865. "createuser": s.UserName,
  866. "createuseremail": s.UserEmail,
  867. "modifyuser": s.MUserName,
  868. "modifyemail": s.MUserEmail,
  869. }
  870. NewSpiderByScript(threadNum, s.Code, info, spChan)
  871. }
  872. }
  873. spChan <- s //主线程sp放入通道
  874. wg := &sync.WaitGroup{}
  875. spLock := &sync.Mutex{}
  876. updateArr := [][]map[string]interface{}{}
  877. for _, tmp := range *list {
  878. spTmp := <-spChan //通道中取出sp对象
  879. wg.Add(1)
  880. atomic.AddInt64(&AllThreadNum, 1)
  881. go func(tmp map[string]interface{}, sp *Spider) {
  882. defer func() {
  883. spChan <- sp //处理完数据sp对象放回通道中
  884. wg.Done()
  885. atomic.AddInt64(&AllThreadNum, -1)
  886. }()
  887. if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据
  888. return
  889. }
  890. _id := tmp["_id"]
  891. query := map[string]interface{}{"_id": _id}
  892. href := qu.ObjToString(tmp["href"])
  893. hashHref := util.HexText(href)
  894. update := []map[string]interface{}{}
  895. //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
  896. //为了避免重复下载,进行全量redis判重
  897. isExist := util.RedisClusterExists(hashHref)
  898. if isExist {
  899. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "href", "updatetime": time.Now().Unix()}} //已存在state置为1
  900. update = append(update, query)
  901. update = append(update, set)
  902. spLock.Lock()
  903. updateArr = append(updateArr, update)
  904. spLock.Unlock()
  905. return
  906. }
  907. if isEsRepeat { //es数据title判重
  908. title := qu.ObjToString(tmp["title"])
  909. eTime := time.Now().Unix()
  910. sTime := eTime - int64(7*86400)
  911. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  912. count := Es.Count(EsIndex, EsType, esQuery)
  913. if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
  914. util.RedisClusterSet(hashHref, "", -1)
  915. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "title", "updatetime": time.Now().Unix()}} //已存在state置为1
  916. update = append(update, query)
  917. update = append(update, set)
  918. spLock.Lock()
  919. updateArr = append(updateArr, update)
  920. spLock.Unlock()
  921. return
  922. }
  923. }
  924. times := qu.IntAll(tmp["times"]) //获取下载次数
  925. success := true //数据是否下载成功的标志
  926. delete(tmp, "_id")
  927. delete(tmp, "times")
  928. data := map[string]interface{}{}
  929. var err interface{}
  930. for k, v := range tmp {
  931. data[k] = v
  932. }
  933. //下载、解析、入库
  934. data, err = sp.DownloadDetailPage(tmp, data)
  935. if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息
  936. UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
  937. }
  938. if err != nil || data == nil {
  939. success = false
  940. times++
  941. if err != nil {
  942. logger.Error(s.Code, err, tmp)
  943. //if len(tmp) > 0 && !isHistory { //下载历史数据时不保存错误信息
  944. // SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  945. //}
  946. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  947. DownloadErrorData(s.Code, tmp)
  948. }*/
  949. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  950. util.RedisClusterSet(hashHref, "", -1)
  951. }
  952. if !success { //下载失败更新次数和状态
  953. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  954. if times >= 3 { //3次下载失败今天不再下载,state置为1
  955. ss["state"] = -1
  956. }
  957. set := map[string]interface{}{"$set": ss}
  958. update = append(update, query)
  959. update = append(update, set)
  960. spLock.Lock()
  961. updateArr = append(updateArr, update)
  962. spLock.Unlock()
  963. return
  964. } else if data["delete"] != nil { //三级页过滤
  965. util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
  966. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  967. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
  968. update = append(update, query)
  969. update = append(update, set)
  970. spLock.Lock()
  971. updateArr = append(updateArr, update)
  972. spLock.Unlock()
  973. return
  974. }
  975. //正文、附件分析,下载异常数据重新下载
  976. if r := AnalysisProjectInfo(data); r != "" { //顺序采集暂不加此块判断(异常数据不会加redis,导致一直下载)
  977. times++
  978. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  979. if times >= 3 { //3次下载失败今天不再下载,state置为-1
  980. ss["state"] = -1
  981. ss["detailfilerr"] = r
  982. }
  983. set := map[string]interface{}{"$set": ss}
  984. update = append(update, query)
  985. update = append(update, set)
  986. spLock.Lock()
  987. updateArr = append(updateArr, update)
  988. spLock.Unlock()
  989. return
  990. }
  991. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  992. if t1 > time.Now().Unix() { //防止发布时间超前
  993. data["publishtime"] = time.Now().Unix()
  994. }
  995. delete(data, "exit")
  996. delete(data, "checkpublishtime")
  997. //计数
  998. //tmpsp1, b := Allspiders.Load(s.Code)
  999. //if b {
  1000. // sp1, ok := tmpsp1.(*Spider)
  1001. // if ok {
  1002. // atomic.AddInt32(&sp1.LastDowncount, 1)
  1003. // atomic.AddInt32(&sp1.TodayDowncount, 1)
  1004. // atomic.AddInt32(&sp1.TotalDowncount, 1)
  1005. // }
  1006. //}
  1007. data["comeintime"] = time.Now().Unix()
  1008. data["spidercode"] = s.Code
  1009. data["dataging"] = 0
  1010. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  1011. data["infoformat"] = s.Infoformat //爬虫类型
  1012. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  1013. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
  1014. update = append(update, query)
  1015. update = append(update, set)
  1016. spLock.Lock()
  1017. updateArr = append(updateArr, update)
  1018. spLock.Unlock()
  1019. //到此数据下载完成
  1020. }(tmp, spTmp)
  1021. }
  1022. wg.Wait()
  1023. //更新数据
  1024. if len(updateArr) > 0 {
  1025. MgoS.UpdateBulk(coll, updateArr...)
  1026. updateArr = [][]map[string]interface{}{}
  1027. }
  1028. close(spChan) //关闭通道
  1029. //释放sp对象(保留主线程sp,IsMainThread=true)
  1030. for sp := range spChan {
  1031. if sp != nil && !sp.IsMainThread {
  1032. sp.L.Close()
  1033. }
  1034. }
  1035. if !s.Stop && reload { //高性能模式下载完三级页数据,sp对象需要重载
  1036. //重载主线程sp
  1037. s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
  1038. }
  1039. }
  1040. } else if reload { //高性能模式无数据sleep2分钟
  1041. time.Sleep(2 * time.Minute)
  1042. }
  1043. }
  1044. //初始化sp对象
  1045. func NewSpiderByScript(num int, code string, info map[string]string, spChan chan *Spider) {
  1046. for i := 1; i <= num; i++ {
  1047. spTmp, errstr := CreateSpider(code, info["script"], true, true)
  1048. if errstr == "" && spTmp != nil { //脚本加载成功
  1049. spTmp.UserName = info["createuser"]
  1050. spTmp.UserEmail = info["createuseremail"]
  1051. spTmp.MUserName = info["modifyuser"]
  1052. spTmp.MUserEmail = info["modifyemail"]
  1053. spChan <- spTmp
  1054. } else {
  1055. spChan <- nil
  1056. }
  1057. }
  1058. }
  1059. //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功
  1060. func AnalysisProjectInfo(data map[string]interface{}) string {
  1061. defer qu.Catch()
  1062. detail := qu.ObjToString(data["detail"])
  1063. if RestrictAccessReg.MatchString(detail) { //限制访问
  1064. return "ip"
  1065. }
  1066. if detail == "详情请访问原网页!" || detail == "<br/>详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页”
  1067. if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
  1068. if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
  1069. fileOk := false
  1070. for _, data := range attachments {
  1071. if d, ok := data.(map[string]interface{}); ok {
  1072. fid := qu.ObjToString(d["fid"])
  1073. if fid != "" { //附件上传成功
  1074. return ""
  1075. }
  1076. }
  1077. }
  1078. if !fileOk {
  1079. return "detail_file"
  1080. }
  1081. } else {
  1082. return "detail_file"
  1083. }
  1084. } else {
  1085. return "detail_file"
  1086. }
  1087. }
  1088. return ""
  1089. }
  1090. //打印线程数
  1091. func AllThreadLog() {
  1092. logger.Info("Detail Download All Thread:", AllThreadNum)
  1093. time.AfterFunc(1*time.Minute, AllThreadLog)
  1094. }
  1095. //获取hascode
  1096. func GetHas1(data string) string {
  1097. t := sha1.New()
  1098. io.WriteString(t, data)
  1099. hf := Reg.FindString(data)
  1100. if !strings.HasSuffix(hf, "/") {
  1101. hf = hf + "/"
  1102. }
  1103. return hf + fmt.Sprintf("%x", t.Sum(nil))
  1104. }