spider.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. qu "qfw/util"
  15. es "qfw/util/elastic"
  16. "regexp"
  17. util "spiderutil"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/donnie4w/go-logger/logger"
  23. "github.com/yuin/gopher-lua"
  24. )
  25. //爬虫()
  26. type Spider struct {
  27. Script
  28. Code string //代码
  29. Name string //名称
  30. DownDetail bool //是否下载详细页
  31. Stop bool //停止标志
  32. Pass bool //暂停标志
  33. LastPubshTime int64 //最后发布时间
  34. LastHeartbeat int64 //最后心跳时间
  35. SpiderRunRate int64 //执行频率
  36. ExecuteOkTime int64 //任务执行成功/完成时间
  37. Collection string //写入表名
  38. Thread int64 //线程数
  39. LastExecTime int64 //最后执行时间
  40. LastDowncount int32 //最后一次下载量
  41. TodayDowncount int32 //今日下载量
  42. YesterdayDowncount int32 //昨日下载量
  43. TotalDowncount int32 //总下载量
  44. RoundCount int32 //执行轮次
  45. StoreMode int //存储模式
  46. StoreToMsgEvent int //消息类型
  47. CoverAttr string //按属性判重数据
  48. SleepBase int //基本延时
  49. SleepRand int //随机延时
  50. TargetChannelUrl string //栏目页地址
  51. UpperLimit, LowerLimit int //正常值上限、下限
  52. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  53. MUserName, MUserEmail string //维护人,维护人邮箱
  54. Index int //数组索引
  55. //历史补漏
  56. IsHistoricalMend bool //是否是历史补漏爬虫
  57. IsMustDownload bool //是否强制下载
  58. IsCompete bool //区分新老爬虫
  59. }
  60. var Es *es.Elastic
  61. var EsIndex string
  62. var EsType string
  63. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  64. var SP = make(chan bool, 5)
  65. var TimeChan = make(chan bool, 1)
  66. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  67. var DelaySites map[string]int //延迟采集站点集合
  68. //高性能模式定时采集三级页信息
  69. func DetailData() {
  70. defer qu.Catch()
  71. <-InitAllLuaOver //脚本加载完毕,执行
  72. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  73. GetListDataDownloadDetail()
  74. }
  75. }
  76. func GetListDataDownloadDetail() {
  77. defer qu.Catch()
  78. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  79. Allspiders.Range(func(k, v interface{}) bool {
  80. go DownloadHighDetail(k.(string))
  81. time.Sleep(2 * time.Second)
  82. return true
  83. })
  84. }
  85. //高性能模式根据列表页数据下载三级页
  86. func DownloadHighDetail(code string) {
  87. defer qu.Catch()
  88. for {
  89. //logger.Info("爬虫代码:", s.Code, "已下架:", s.Stop)
  90. //if !s.Stop { //爬虫是运行状态
  91. //TODO 延迟采集还未添加
  92. /*
  93. 1、每轮开始先查询当天下载的数据
  94. 2、本次查询无数据依次向前推一天查询数据(暂定50条数据)
  95. */
  96. o := map[string]interface{}{"_id": 1} //排序
  97. f := map[string]interface{}{ //查询字段
  98. "state": 0,
  99. "comeintime": 0,
  100. "event": 0,
  101. }
  102. q := map[string]interface{}{
  103. "spidercode": code,
  104. "state": 0, //0:入库状态;-1:采集失败;1:成功
  105. }
  106. list := &[]map[string]interface{}{} //查询数据的集合
  107. for day := 0; day <= util.Config.DayNum; day++ {
  108. startTime := GetTime(-day)
  109. comeintime := map[string]interface{}{"$gte": startTime} //指定查询数据的时间
  110. if day != 0 { //不是当天,指定数据范围
  111. comeintime["$lt"] = GetTime(-day + 1)
  112. } else if code == "a_gcy_mcgg" { //
  113. endTime := time.Now().Unix() - 12*3600
  114. if endTime > startTime {
  115. comeintime = map[string]interface{}{
  116. "$gte": startTime,
  117. "$lt": endTime,
  118. }
  119. } else {
  120. continue
  121. }
  122. }
  123. q["comeintime"] = comeintime
  124. list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100)
  125. //logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list))
  126. if list != nil && len(*list) > 0 {
  127. break
  128. } else {
  129. time.Sleep(1 * time.Second)
  130. }
  131. }
  132. if list != nil && len(*list) > 0 {
  133. spChan := make(chan *Spider, len(AllspidersMap[code]))
  134. AllspidersMapLock.Lock()
  135. for _, sp := range AllspidersMap[code] {
  136. spChan <- sp
  137. }
  138. AllspidersMapLock.Unlock()
  139. wg := &sync.WaitGroup{}
  140. for _, l := range *list {
  141. spTmp := <-spChan
  142. wg.Add(1)
  143. go func(tmp map[string]interface{}, sp *Spider) {
  144. defer func() {
  145. spChan <- sp
  146. wg.Done()
  147. }()
  148. _id := tmp["_id"]
  149. query := map[string]interface{}{"_id": _id}
  150. href := qu.ObjToString(tmp["href"])
  151. //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
  152. //为了避免重复下载,进行增量redis判重
  153. isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
  154. if isExist {
  155. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  156. MgoS.Update("spider_highlistdata", query, set, false, false)
  157. return
  158. }
  159. if code == "a_gcy_mcgg" { //竞品数据es title判重
  160. title := qu.ObjToString(tmp["title"])
  161. eTime := time.Now().Unix()
  162. sTime := eTime - int64(7*86400)
  163. esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  164. count := Es.Count(EsIndex, EsType, esQuery)
  165. if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
  166. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  167. MgoS.Update("spider_highlistdata", query, set, false, false)
  168. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  169. return
  170. }
  171. }
  172. competehref := qu.ObjToString(tmp["competehref"])
  173. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  174. title := qu.ObjToString(tmp["title"])
  175. one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
  176. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  177. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  178. MgoS.Update("spider_highlistdata", query, set, false, false)
  179. return
  180. }
  181. }
  182. times := qu.IntAll(tmp["times"])
  183. success := true //数据是否下载成功的标志
  184. delete(tmp, "_id")
  185. delete(tmp, "times")
  186. data := map[string]interface{}{}
  187. var err interface{}
  188. for k, v := range tmp {
  189. data[k] = v
  190. }
  191. //下载、解析、入库
  192. data, err = sp.DownloadDetailPage(tmp, data)
  193. if err != nil || data == nil {
  194. success = false
  195. times++
  196. if err != nil {
  197. logger.Error(sp.Code, err, tmp)
  198. if len(tmp) > 0 {
  199. SaveErrorData(sp.MUserName, tmp, err) //保存错误信息
  200. }
  201. if errstr, ok := err.(*lua.ApiError); ok {
  202. errText := errstr.Object.String()
  203. logger.Info(errText, errText == "d.nx != 0")
  204. }
  205. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  206. DownloadErrorData(s.Code, tmp)
  207. }*/
  208. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  209. log.Println("beforeHref:", href, "afterHref:", href)
  210. //增量
  211. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  212. //全量
  213. db := HexToBigIntMod(tmphref)
  214. hashHref := HexText(href)
  215. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
  216. if !isExist {
  217. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  218. }
  219. }
  220. if !success { //下载失败更新次数和状态
  221. ss := map[string]interface{}{"times": times}
  222. if times >= 3 { //3次下载失败今天不再下载,state置为1
  223. ss["state"] = -1
  224. }
  225. set := map[string]interface{}{"$set": ss}
  226. MgoS.Update("spider_highlistdata", query, set, false, false)
  227. return
  228. } else { //三级页过滤
  229. deleteData := FilterByDetail(href, query, data) //针对列表页无法过滤需要在详情页过滤的数据,进行过滤处理
  230. if deleteData {
  231. return
  232. }
  233. }
  234. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  235. if t1 > time.Now().Unix() { //防止发布时间超前
  236. data["publishtime"] = time.Now().Unix()
  237. }
  238. delete(data, "exit")
  239. delete(data, "checkpublishtime")
  240. data["comeintime"] = time.Now().Unix()
  241. //计数
  242. tmpsp1, b := Allspiders.Load(sp.Code)
  243. if b {
  244. sp1, ok := tmpsp1.(*Spider)
  245. if ok {
  246. atomic.AddInt32(&sp1.LastDowncount, 1)
  247. atomic.AddInt32(&sp1.TodayDowncount, 1)
  248. atomic.AddInt32(&sp1.TotalDowncount, 1)
  249. }
  250. }
  251. data["spidercode"] = sp.Code
  252. data["dataging"] = 0
  253. data["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  254. Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, data, true)
  255. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
  256. MgoS.Update("spider_highlistdata", query, set, false, false)
  257. }(l, spTmp)
  258. }
  259. wg.Wait()
  260. //一轮次跑完重载脚本
  261. ReloadScript(code)
  262. } else { //没有数据
  263. time.Sleep(2 * time.Minute)
  264. }
  265. }
  266. }
  267. func FilterByDetail(href string, query, data map[string]interface{}) bool {
  268. if data["delete"] != nil {
  269. //增量
  270. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
  271. //全量
  272. db := HexToBigIntMod(href)
  273. hashHref := HexText(href)
  274. util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
  275. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  276. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
  277. MgoS.Update("spider_highlistdata", query, set, false, false)
  278. return true
  279. }
  280. return false
  281. }
  282. //下载解析内容页
  283. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  284. defer mu.Catch()
  285. s.LastHeartbeat = time.Now().Unix()
  286. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  287. tab := s.L.NewTable()
  288. for k, v := range param {
  289. if val, ok := v.(string); ok {
  290. tab.RawSet(lua.LString(k), lua.LString(val))
  291. } else if val, ok := v.(int64); ok {
  292. tab.RawSet(lua.LString(k), lua.LNumber(val))
  293. } else if val, ok := v.(int32); ok {
  294. tab.RawSet(lua.LString(k), lua.LNumber(val))
  295. } else if val, ok := v.(float64); ok {
  296. tab.RawSet(lua.LString(k), lua.LNumber(val))
  297. } else if val, ok := v.(float32); ok {
  298. tab.RawSet(lua.LString(k), lua.LNumber(val))
  299. } else if val, ok := v.(bool); ok {
  300. tab.RawSet(lua.LString(k), lua.LBool(val))
  301. }
  302. }
  303. var err error
  304. if err = s.L.CallByParam(lua.P{
  305. Fn: s.L.GetGlobal("downloadDetailPage"),
  306. NRet: 1,
  307. Protect: true,
  308. }, tab); err != nil {
  309. //panic(s.Code + "," + err.Error())
  310. log.Println(s.Code + "," + err.Error())
  311. atomic.AddInt32(&s.Script.ErrorNum, 1)
  312. return data, err
  313. }
  314. lv := s.L.Get(-1)
  315. s.L.Pop(1)
  316. //拼map
  317. if v3, ok := lv.(*lua.LTable); ok {
  318. v3.ForEach(func(k, v lua.LValue) {
  319. if tmp, ok := k.(lua.LString); ok {
  320. key := string(tmp)
  321. if value, ok := v.(lua.LString); ok {
  322. data[key] = string(value)
  323. } else if value, ok := v.(lua.LNumber); ok {
  324. data[key] = value
  325. } else if value, ok := v.(*lua.LTable); ok {
  326. tmp := util.TableToMap(value)
  327. data[key] = tmp
  328. }
  329. }
  330. })
  331. return data, err
  332. } else {
  333. return nil, err
  334. }
  335. }
  336. //重载脚本
  337. func ReloadScript(code string) {
  338. scriptMap := getSpiderScriptDB(code)
  339. if codeInfo := scriptMap[code]; codeInfo != nil {
  340. AllspidersMapLock.Lock()
  341. for _, sp := range AllspidersMap[code] {
  342. sp.ScriptFile = codeInfo["script"]
  343. if codeInfo["createuser"] != "" {
  344. sp.UserName = codeInfo["createuser"]
  345. }
  346. if codeInfo["createuseremail"] != "" {
  347. sp.UserEmail = codeInfo["createuseremail"]
  348. }
  349. sp.MUserName = codeInfo["modifyuser"]
  350. sp.MUserEmail = codeInfo["modifyemail"]
  351. sp.LoadScript(code, sp.ScriptFile, true)
  352. }
  353. AllspidersMapLock.Unlock()
  354. }
  355. // for k, v := range scriptMap {
  356. // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
  357. // sp := spd.(*Spider)
  358. // sp.ScriptFile = v["script"]
  359. // if v["createuser"] != "" {
  360. // sp.UserName = v["createuser"]
  361. // }
  362. // if v["createuseremail"] != "" {
  363. // sp.UserEmail = v["createuseremail"]
  364. // }
  365. // sp.MUserName = v["modifyuser"]
  366. // sp.MUserEmail = v["modifyemail"]
  367. // //sp.LoadScript(k, sp.ScriptFile, true) //更新上架,重载脚本
  368. // Allspiders.Store(k, sp)
  369. // logger.Info("上架重载脚本", sp.Code)
  370. // }
  371. // }
  372. }
  373. //获取随机数
  374. func GetRandMath(num int) int {
  375. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  376. return r.Intn(num)
  377. }
  378. //获取hascode
  379. func GetHas1(data string) string {
  380. t := sha1.New()
  381. io.WriteString(t, data)
  382. hf := Reg.FindString(data)
  383. if !strings.HasSuffix(hf, "/") {
  384. hf = hf + "/"
  385. }
  386. return hf + fmt.Sprintf("%x", t.Sum(nil))
  387. }
  388. //对href哈希取模
  389. func HexToBigIntMod(href string) int {
  390. //取哈希值
  391. t := sha256.New()
  392. io.WriteString(t, href)
  393. hex := fmt.Sprintf("%x", t.Sum(nil))
  394. //取模
  395. n := new(big.Int)
  396. n, _ = n.SetString(hex[2:], 16)
  397. return int(n.Mod(n, big.NewInt(16)).Int64())
  398. }
  399. //求hash
  400. func HexText(href string) string {
  401. h := sha256.New()
  402. h.Write([]byte(href))
  403. return fmt.Sprintf("%x", h.Sum(nil))
  404. }