spider.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. qu "qfw/util"
  15. es "qfw/util/elastic"
  16. "regexp"
  17. util "spiderutil"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/donnie4w/go-logger/logger"
  23. "github.com/yuin/gopher-lua"
  24. )
  25. //爬虫()
  26. type Spider struct {
  27. Script
  28. Code string //代码
  29. Name string //名称
  30. DownDetail bool //是否下载详细页
  31. Stop bool //停止标志
  32. Pass bool //暂停标志
  33. LastPubshTime int64 //最后发布时间
  34. LastHeartbeat int64 //最后心跳时间
  35. SpiderRunRate int64 //执行频率
  36. ExecuteOkTime int64 //任务执行成功/完成时间
  37. Collection string //写入表名
  38. Thread int64 //线程数
  39. LastExecTime int64 //最后执行时间
  40. LastDowncount int32 //最后一次下载量
  41. TodayDowncount int32 //今日下载量
  42. YesterdayDowncount int32 //昨日下载量
  43. TotalDowncount int32 //总下载量
  44. RoundCount int32 //执行轮次
  45. StoreMode int //存储模式
  46. StoreToMsgEvent int //消息类型
  47. CoverAttr string //按属性判重数据
  48. SleepBase int //基本延时
  49. SleepRand int //随机延时
  50. TargetChannelUrl string //栏目页地址
  51. UpperLimit, LowerLimit int //正常值上限、下限
  52. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  53. MUserName, MUserEmail string //维护人,维护人邮箱
  54. Index int //数组索引
  55. //历史补漏
  56. IsHistoricalMend bool //是否是历史补漏爬虫
  57. IsMustDownload bool //是否强制下载
  58. IsCompete bool //区分新老爬虫
  59. Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
  60. }
  61. var Es *es.Elastic
  62. var EsIndex string
  63. var EsType string
  64. var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
  65. var SP = make(chan bool, 5)
  66. var TimeChan = make(chan bool, 1)
  67. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  68. var DelaySiteMap map[string]*DelaySite //延迟采集站点集合
  69. type DelaySite struct {
  70. DelayTime int
  71. Compete bool
  72. }
  73. //高性能模式定时采集三级页信息
  74. func DetailData() {
  75. defer qu.Catch()
  76. <-InitAllLuaOver //脚本加载完毕,执行
  77. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  78. GetListDataDownloadDetail()
  79. }
  80. }
  81. func GetListDataDownloadDetail() {
  82. defer qu.Catch()
  83. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  84. Allspiders.Range(func(k, v interface{}) bool {
  85. go DownloadHighDetail(k.(string))
  86. time.Sleep(2 * time.Second)
  87. return true
  88. })
  89. }
  90. //高性能模式根据列表页数据下载三级页
  91. func DownloadHighDetail(code string) {
  92. defer qu.Catch()
  93. for {
  94. //logger.Info("爬虫代码:", s.Code, "已下架:", s.Stop)
  95. //if !s.Stop { //爬虫是运行状态
  96. /*
  97. 1、每轮开始先查询当天下载的数据
  98. 2、本次查询无数据依次向前推一天查询数据(暂定50条数据)
  99. */
  100. o := map[string]interface{}{"_id": 1} //排序
  101. f := map[string]interface{}{ //查询字段
  102. "state": 0,
  103. "comeintime": 0,
  104. "event": 0,
  105. }
  106. q := map[string]interface{}{
  107. "spidercode": code,
  108. "state": 0, //0:入库状态;-1:采集失败;1:成功
  109. }
  110. list := &[]map[string]interface{}{} //查询数据的集合
  111. for day := 0; day <= util.Config.DayNum; day++ {
  112. startTime := GetTime(-day)
  113. comeintime := map[string]interface{}{"$gte": startTime} //指定查询数据的时间
  114. if day != 0 { //不是当天,指定数据范围
  115. comeintime["$lt"] = GetTime(-day + 1)
  116. }
  117. //} else if code == "a_gcy_mcgg" { //延迟采集站点(延迟采集站点不加入多线程采集luaspecialcode库中)
  118. // endTime := time.Now().Unix() - 12*3600
  119. // if endTime > startTime {
  120. // comeintime = map[string]interface{}{
  121. // "$gte": startTime,
  122. // "$lt": endTime,
  123. // }
  124. // } else {
  125. // continue
  126. // }
  127. //
  128. //}
  129. q["comeintime"] = comeintime
  130. list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100)
  131. //logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list))
  132. if list != nil && len(*list) > 0 {
  133. break
  134. } else {
  135. time.Sleep(1 * time.Second)
  136. }
  137. }
  138. if list != nil && len(*list) > 0 {
  139. spChan := make(chan *Spider, len(AllspidersMap[code]))
  140. AllspidersMapLock.Lock()
  141. for _, sp := range AllspidersMap[code] {
  142. spChan <- sp
  143. }
  144. AllspidersMapLock.Unlock()
  145. wg := &sync.WaitGroup{}
  146. for _, l := range *list {
  147. spTmp := <-spChan
  148. wg.Add(1)
  149. go func(tmp map[string]interface{}, sp *Spider) {
  150. defer func() {
  151. spChan <- sp
  152. wg.Done()
  153. }()
  154. _id := tmp["_id"]
  155. query := map[string]interface{}{"_id": _id}
  156. href := qu.ObjToString(tmp["href"])
  157. hashHref := HexText(href)
  158. //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
  159. //为了避免重复下载,进行全量redis判重
  160. isExist := util.RedisClusterExists(hashHref)
  161. if isExist {
  162. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  163. MgoS.Update("spider_highlistdata", query, set, false, false)
  164. return
  165. }
  166. //if code == "a_gcy_mcgg" { //竞品数据es title判重
  167. // title := qu.ObjToString(tmp["title"])
  168. // eTime := time.Now().Unix()
  169. // sTime := eTime - int64(7*86400)
  170. // esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
  171. // count := Es.Count(EsIndex, EsType, esQuery)
  172. // if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
  173. // set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  174. // MgoS.Update("spider_highlistdata", query, set, false, false)
  175. // util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
  176. // return
  177. // }
  178. //}
  179. //competehref := qu.ObjToString(tmp["competehref"])
  180. //if competehref != "" { //验证三方网站数据剑鱼是否已采集
  181. // title := qu.ObjToString(tmp["title"])
  182. // one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
  183. // if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  184. // set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
  185. // MgoS.Update("spider_highlistdata", query, set, false, false)
  186. // return
  187. // }
  188. //}
  189. times := qu.IntAll(tmp["times"])
  190. success := true //数据是否下载成功的标志
  191. delete(tmp, "_id")
  192. delete(tmp, "times")
  193. data := map[string]interface{}{}
  194. var err interface{}
  195. for k, v := range tmp {
  196. data[k] = v
  197. }
  198. //下载、解析、入库
  199. data, err = sp.DownloadDetailPage(tmp, data)
  200. if err != nil || data == nil {
  201. success = false
  202. times++
  203. if err != nil {
  204. logger.Error(sp.Code, err, tmp)
  205. if len(tmp) > 0 {
  206. SaveErrorData(sp.MUserName, tmp, err) //保存错误信息
  207. }
  208. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  209. DownloadErrorData(s.Code, tmp)
  210. }*/
  211. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  212. util.RedisClusterSet(hashHref, "", -1)
  213. }
  214. if !success { //下载失败更新次数和状态
  215. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  216. if times >= 3 { //3次下载失败今天不再下载,state置为1
  217. ss["state"] = -1
  218. }
  219. set := map[string]interface{}{"$set": ss}
  220. MgoS.Update("spider_highlistdata", query, set, false, false)
  221. return
  222. } else if data["delete"] != nil { //三级页过滤
  223. util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
  224. //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
  225. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
  226. MgoS.Update("spider_highlistdata", query, set, false, false)
  227. return
  228. }
  229. //正文、附件分析,下载异常数据重新下载
  230. if AnalysisProjectInfo(data) {
  231. times++
  232. ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
  233. if times >= 3 { //3次下载失败今天不再下载,state置为1
  234. ss["state"] = -1
  235. ss["detailfilerr"] = true
  236. }
  237. set := map[string]interface{}{"$set": ss}
  238. MgoS.Update("spider_highlistdata", query, set, false, false)
  239. return
  240. }
  241. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  242. if t1 > time.Now().Unix() { //防止发布时间超前
  243. data["publishtime"] = time.Now().Unix()
  244. }
  245. delete(data, "exit")
  246. delete(data, "checkpublishtime")
  247. data["comeintime"] = time.Now().Unix()
  248. data["spidercode"] = sp.Code
  249. data["dataging"] = 0
  250. data["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  251. data["infoformat"] = sp.Infoformat //爬虫类型
  252. Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, data, true)
  253. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
  254. MgoS.Update("spider_highlistdata", query, set, false, false)
  255. }(l, spTmp)
  256. }
  257. wg.Wait()
  258. //一轮次跑完重载脚本
  259. ReloadScript(code)
  260. } else { //没有数据
  261. time.Sleep(2 * time.Minute)
  262. }
  263. }
  264. }
  265. //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功
  266. func AnalysisProjectInfo(data map[string]interface{}) bool {
  267. defer qu.Catch()
  268. detail := qu.ObjToString(data["detail"])
  269. if detail == "详情请访问原网页!" || detail == "<br/>详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页”
  270. if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
  271. if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
  272. fileOk := false
  273. for _, data := range attachments {
  274. if d, ok := data.(map[string]interface{}); ok {
  275. fid := qu.ObjToString(d["fid"])
  276. if fid != "" { //附件上传成功
  277. fileOk = true
  278. break
  279. }
  280. }
  281. }
  282. return !fileOk
  283. } else {
  284. return true
  285. }
  286. } else {
  287. return true
  288. }
  289. }
  290. return false
  291. }
  292. //下载解析内容页
  293. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  294. defer mu.Catch()
  295. s.LastHeartbeat = time.Now().Unix()
  296. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  297. tab := s.L.NewTable()
  298. for k, v := range param {
  299. if val, ok := v.(string); ok {
  300. tab.RawSet(lua.LString(k), lua.LString(val))
  301. } else if val, ok := v.(int64); ok {
  302. tab.RawSet(lua.LString(k), lua.LNumber(val))
  303. } else if val, ok := v.(int32); ok {
  304. tab.RawSet(lua.LString(k), lua.LNumber(val))
  305. } else if val, ok := v.(float64); ok {
  306. tab.RawSet(lua.LString(k), lua.LNumber(val))
  307. } else if val, ok := v.(float32); ok {
  308. tab.RawSet(lua.LString(k), lua.LNumber(val))
  309. } else if val, ok := v.(bool); ok {
  310. tab.RawSet(lua.LString(k), lua.LBool(val))
  311. }
  312. }
  313. var err error
  314. if err = s.L.CallByParam(lua.P{
  315. Fn: s.L.GetGlobal("downloadDetailPage"),
  316. NRet: 1,
  317. Protect: true,
  318. }, tab); err != nil {
  319. //panic(s.Code + "," + err.Error())
  320. log.Println(s.Code + "," + err.Error())
  321. atomic.AddInt32(&s.Script.ErrorNum, 1)
  322. return data, err
  323. }
  324. lv := s.L.Get(-1)
  325. s.L.Pop(1)
  326. //拼map
  327. if v3, ok := lv.(*lua.LTable); ok {
  328. v3.ForEach(func(k, v lua.LValue) {
  329. if tmp, ok := k.(lua.LString); ok {
  330. key := string(tmp)
  331. if value, ok := v.(lua.LString); ok {
  332. data[key] = string(value)
  333. } else if value, ok := v.(lua.LNumber); ok {
  334. data[key] = value
  335. } else if value, ok := v.(*lua.LTable); ok {
  336. tmp := util.TableToMap(value)
  337. data[key] = tmp
  338. }
  339. }
  340. })
  341. return data, err
  342. } else {
  343. return nil, err
  344. }
  345. }
  346. //重载脚本
  347. func ReloadScript(code string) {
  348. scriptMap := getSpiderScriptDB(code)
  349. if codeInfo := scriptMap[code]; codeInfo != nil {
  350. AllspidersMapLock.Lock()
  351. for _, sp := range AllspidersMap[code] {
  352. sp.ScriptFile = codeInfo["script"]
  353. if codeInfo["createuser"] != "" {
  354. sp.UserName = codeInfo["createuser"]
  355. }
  356. if codeInfo["createuseremail"] != "" {
  357. sp.UserEmail = codeInfo["createuseremail"]
  358. }
  359. sp.MUserName = codeInfo["modifyuser"]
  360. sp.MUserEmail = codeInfo["modifyemail"]
  361. sp.LoadScript(&sp.Name, code, sp.ScriptFile, true)
  362. }
  363. AllspidersMapLock.Unlock()
  364. }
  365. // for k, v := range scriptMap {
  366. // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
  367. // sp := spd.(*Spider)
  368. // sp.ScriptFile = v["script"]
  369. // if v["createuser"] != "" {
  370. // sp.UserName = v["createuser"]
  371. // }
  372. // if v["createuseremail"] != "" {
  373. // sp.UserEmail = v["createuseremail"]
  374. // }
  375. // sp.MUserName = v["modifyuser"]
  376. // sp.MUserEmail = v["modifyemail"]
  377. // //sp.LoadScript(k, sp.ScriptFile, true) //更新上架,重载脚本
  378. // Allspiders.Store(k, sp)
  379. // logger.Info("上架重载脚本", sp.Code)
  380. // }
  381. // }
  382. }
  383. //获取随机数
  384. func GetRandMath(num int) int {
  385. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  386. return r.Intn(num)
  387. }
  388. //获取hascode
  389. func GetHas1(data string) string {
  390. t := sha1.New()
  391. io.WriteString(t, data)
  392. hf := Reg.FindString(data)
  393. if !strings.HasSuffix(hf, "/") {
  394. hf = hf + "/"
  395. }
  396. return hf + fmt.Sprintf("%x", t.Sum(nil))
  397. }
  398. //对href哈希取模
  399. func HexToBigIntMod(href string) int {
  400. //取哈希值
  401. t := sha256.New()
  402. io.WriteString(t, href)
  403. hex := fmt.Sprintf("%x", t.Sum(nil))
  404. //取模
  405. n := new(big.Int)
  406. n, _ = n.SetString(hex[2:], 16)
  407. return int(n.Mod(n, big.NewInt(16)).Int64())
  408. }
  409. //求hash
  410. func HexText(href string) string {
  411. h := sha256.New()
  412. h.Write([]byte(href))
  413. return fmt.Sprintf("%x", h.Sum(nil))
  414. }