spider.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /**
  2. 爬虫,脚本接口,需要扩展
  3. */
  4. package spider
  5. import (
  6. "crypto/sha1"
  7. "crypto/sha256"
  8. "fmt"
  9. "io"
  10. "log"
  11. "math/big"
  12. "math/rand"
  13. mu "mfw/util"
  14. qu "qfw/util"
  15. "regexp"
  16. util "spiderutil"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/donnie4w/go-logger/logger"
  22. "github.com/yuin/gopher-lua"
  23. )
  24. //爬虫()
  25. type Spider struct {
  26. Script
  27. Code string //代码
  28. Name string //名称
  29. DownDetail bool //是否下载详细页
  30. Stop bool //停止标志
  31. Pass bool //暂停标志
  32. LastPubshTime int64 //最后发布时间
  33. LastHeartbeat int64 //最后心跳时间
  34. SpiderRunRate int64 //执行频率
  35. ExecuteOkTime int64 //任务执行成功/完成时间
  36. Collection string //写入表名
  37. Thread int64 //线程数
  38. LastExecTime int64 //最后执行时间
  39. LastDowncount int32 //最后一次下载量
  40. TodayDowncount int32 //今日下载量
  41. YesterdayDowncount int32 //昨日下载量
  42. TotalDowncount int32 //总下载量
  43. RoundCount int32 //执行轮次
  44. StoreMode int //存储模式
  45. StoreToMsgEvent int //消息类型
  46. CoverAttr string //按属性判重数据
  47. SleepBase int //基本延时
  48. SleepRand int //随机延时
  49. TargetChannelUrl string //栏目页地址
  50. UpperLimit, LowerLimit int //正常值上限、下限
  51. UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
  52. MUserName, MUserEmail string //维护人,维护人邮箱
  53. Index int //数组索引
  54. //历史补漏
  55. IsHistoricalMend bool //是否是历史补漏爬虫
  56. IsMustDownload bool //是否强制下载
  57. }
  58. var TimeChan = make(chan bool, 1)
  59. var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
  60. //高性能模式定时采集三级页信息
  61. func DetailData() {
  62. defer qu.Catch()
  63. <-InitAllLuaOver //脚本加载完毕,执行
  64. if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
  65. GetListDataDownloadDetail()
  66. }
  67. }
  68. func GetListDataDownloadDetail() {
  69. defer qu.Catch()
  70. logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
  71. Allspiders.Range(func(k, v interface{}) bool {
  72. go DownloadHighDetail(k.(string))
  73. time.Sleep(2 * time.Second)
  74. return true
  75. })
  76. }
  77. //高性能模式根据列表页数据下载三级页
  78. func DownloadHighDetail(code string) {
  79. defer qu.Catch()
  80. for {
  81. //logger.Info("爬虫代码:", s.Code, "已下架:", s.Stop)
  82. //if !s.Stop { //爬虫是运行状态
  83. /*
  84. 1、每轮开始先查询当天下载的数据
  85. 2、本次查询无数据依次向前推一天查询数据(暂定50条数据)
  86. */
  87. o := map[string]interface{}{"_id": 1} //排序
  88. f := map[string]interface{}{ //查询字段
  89. "state": 0,
  90. "comeintime": 0,
  91. "event": 0,
  92. }
  93. q := map[string]interface{}{
  94. "spidercode": code,
  95. "state": 0, //0:入库状态;-1:采集失败;1:成功
  96. }
  97. list := &[]map[string]interface{}{} //查询数据的集合
  98. for day := 0; day <= util.Config.DayNum; day++ {
  99. comeintime := map[string]interface{}{"$gte": GetTime(-day)} //指定查询数据的时间
  100. if day != 0 { //不是当天,指定数据范围
  101. comeintime["$lt"] = GetTime(-day + 1)
  102. }
  103. q["comeintime"] = comeintime
  104. list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100)
  105. //logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list))
  106. if list != nil && len(*list) > 0 {
  107. break
  108. } else {
  109. time.Sleep(1 * time.Second)
  110. }
  111. }
  112. if list != nil && len(*list) > 0 {
  113. spChan := make(chan *Spider, len(AllspidersMap[code]))
  114. AllspidersMapLock.Lock()
  115. for _, sp := range AllspidersMap[code] {
  116. spChan <- sp
  117. }
  118. AllspidersMapLock.Unlock()
  119. wg := &sync.WaitGroup{}
  120. for _, l := range *list {
  121. spTmp := <-spChan
  122. wg.Add(1)
  123. go func(tmp map[string]interface{}, sp *Spider) {
  124. defer func() {
  125. spChan <- sp
  126. wg.Done()
  127. }()
  128. _id := tmp["_id"]
  129. query := map[string]interface{}{"_id": _id}
  130. competehref := qu.ObjToString(tmp["competehref"])
  131. if competehref != "" { //验证三方网站数据剑鱼是否已采集
  132. title := qu.ObjToString(tmp["title"])
  133. one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
  134. if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
  135. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
  136. MgoS.Update("spider_highlistdata", query, set, false, false)
  137. return
  138. }
  139. }
  140. times := qu.IntAll(tmp["times"])
  141. success := true //数据是否下载成功的标志
  142. delete(tmp, "_id")
  143. delete(tmp, "times")
  144. href := qu.ObjToString(tmp["href"])
  145. data := map[string]interface{}{}
  146. var err interface{}
  147. for k, v := range tmp {
  148. data[k] = v
  149. }
  150. //下载、解析、入库
  151. data, err = sp.DownloadDetailPage(tmp, data)
  152. if err != nil || data == nil {
  153. success = false
  154. times++
  155. if err != nil {
  156. logger.Error(sp.Code, err, tmp)
  157. if len(tmp) > 0 {
  158. SaveErrorData(sp.MUserName, tmp, err) //保存错误信息
  159. }
  160. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  161. DownloadErrorData(s.Code, tmp)
  162. }*/
  163. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  164. log.Println("beforeHref:", href, "afterHref:", tmphref)
  165. //增量
  166. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+tmphref, tmphref, 3600*24*30)
  167. //全量
  168. db := HexToBigIntMod(tmphref)
  169. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref)
  170. if !isExist {
  171. util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref, "", -1)
  172. }
  173. }
  174. if !success { //下载失败更新次数和状态
  175. ss := map[string]interface{}{"times": times}
  176. if times >= 3 { //3次下载失败今天不再下载,state置为1
  177. ss["state"] = -1
  178. }
  179. set := map[string]interface{}{"$set": ss}
  180. MgoS.Update("spider_highlistdata", query, set, false, false)
  181. return
  182. }
  183. t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  184. if t1 > time.Now().Unix() { //防止发布时间超前
  185. data["publishtime"] = time.Now().Unix()
  186. }
  187. delete(data, "exit")
  188. delete(data, "checkpublishtime")
  189. data["comeintime"] = time.Now().Unix()
  190. //计数
  191. tmpsp1, b := Allspiders.Load(sp.Code)
  192. if b {
  193. sp1, ok := tmpsp1.(*Spider)
  194. if ok {
  195. atomic.AddInt32(&sp1.LastDowncount, 1)
  196. atomic.AddInt32(&sp1.TodayDowncount, 1)
  197. atomic.AddInt32(&sp1.TotalDowncount, 1)
  198. }
  199. }
  200. data["spidercode"] = sp.Code
  201. data["dataging"] = 0
  202. Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, data, true)
  203. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
  204. MgoS.Update("spider_highlistdata", query, set, false, false)
  205. }(l, spTmp)
  206. }
  207. wg.Wait()
  208. //一轮次跑完重载脚本
  209. ReloadScript(code)
  210. } else { //没有数据
  211. time.Sleep(2 * time.Minute)
  212. }
  213. }
  214. }
  215. //下载解析内容页
  216. func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
  217. defer mu.Catch()
  218. s.LastHeartbeat = time.Now().Unix()
  219. util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
  220. tab := s.L.NewTable()
  221. for k, v := range param {
  222. if val, ok := v.(string); ok {
  223. tab.RawSet(lua.LString(k), lua.LString(val))
  224. } else if val, ok := v.(int64); ok {
  225. tab.RawSet(lua.LString(k), lua.LNumber(val))
  226. } else if val, ok := v.(int32); ok {
  227. tab.RawSet(lua.LString(k), lua.LNumber(val))
  228. } else if val, ok := v.(float64); ok {
  229. tab.RawSet(lua.LString(k), lua.LNumber(val))
  230. } else if val, ok := v.(float32); ok {
  231. tab.RawSet(lua.LString(k), lua.LNumber(val))
  232. } else if val, ok := v.(bool); ok {
  233. tab.RawSet(lua.LString(k), lua.LBool(val))
  234. }
  235. }
  236. var err error
  237. if err = s.L.CallByParam(lua.P{
  238. Fn: s.L.GetGlobal("downloadDetailPage"),
  239. NRet: 1,
  240. Protect: true,
  241. }, tab); err != nil {
  242. //panic(s.Code + "," + err.Error())
  243. log.Println(s.Code + "," + err.Error())
  244. atomic.AddInt32(&s.Script.ErrorNum, 1)
  245. return data, err
  246. }
  247. lv := s.L.Get(-1)
  248. s.L.Pop(1)
  249. //拼map
  250. if v3, ok := lv.(*lua.LTable); ok {
  251. v3.ForEach(func(k, v lua.LValue) {
  252. if tmp, ok := k.(lua.LString); ok {
  253. key := string(tmp)
  254. if value, ok := v.(lua.LString); ok {
  255. data[key] = string(value)
  256. } else if value, ok := v.(lua.LNumber); ok {
  257. data[key] = value
  258. } else if value, ok := v.(*lua.LTable); ok {
  259. tmp := util.TableToMap(value)
  260. data[key] = tmp
  261. }
  262. }
  263. })
  264. return data, err
  265. } else {
  266. return nil, err
  267. }
  268. }
  269. //重载脚本
  270. func ReloadScript(code string) {
  271. scriptMap := getSpiderScriptDB(code)
  272. if codeInfo := scriptMap[code]; codeInfo != nil {
  273. AllspidersMapLock.Lock()
  274. for _, sp := range AllspidersMap[code] {
  275. sp.ScriptFile = codeInfo["script"]
  276. if codeInfo["createuser"] != "" {
  277. sp.UserName = codeInfo["createuser"]
  278. }
  279. if codeInfo["createuseremail"] != "" {
  280. sp.UserEmail = codeInfo["createuseremail"]
  281. }
  282. sp.MUserName = codeInfo["modifyuser"]
  283. sp.MUserEmail = codeInfo["modifyemail"]
  284. sp.LoadScript(code, sp.ScriptFile, true)
  285. }
  286. AllspidersMapLock.Unlock()
  287. }
  288. // for k, v := range scriptMap {
  289. // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
  290. // sp := spd.(*Spider)
  291. // sp.ScriptFile = v["script"]
  292. // if v["createuser"] != "" {
  293. // sp.UserName = v["createuser"]
  294. // }
  295. // if v["createuseremail"] != "" {
  296. // sp.UserEmail = v["createuseremail"]
  297. // }
  298. // sp.MUserName = v["modifyuser"]
  299. // sp.MUserEmail = v["modifyemail"]
  300. // //sp.LoadScript(k, sp.ScriptFile, true) //更新上架,重载脚本
  301. // Allspiders.Store(k, sp)
  302. // logger.Info("上架重载脚本", sp.Code)
  303. // }
  304. // }
  305. }
  306. //获取随机数
  307. func GetRandMath(num int) int {
  308. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  309. return r.Intn(num)
  310. }
  311. //获取hascode
  312. func GetHas1(data string) string {
  313. t := sha1.New()
  314. io.WriteString(t, data)
  315. hf := Reg.FindString(data)
  316. if !strings.HasSuffix(hf, "/") {
  317. hf = hf + "/"
  318. }
  319. return hf + fmt.Sprintf("%x", t.Sum(nil))
  320. }
  321. //对href哈希取模
  322. func HexToBigIntMod(href string) int {
  323. //取哈希值
  324. t := sha256.New()
  325. io.WriteString(t, href)
  326. hex := fmt.Sprintf("%x", t.Sum(nil))
  327. //取模
  328. n := new(big.Int)
  329. n, _ = n.SetString(hex[2:], 16)
  330. return int(n.Mod(n, big.NewInt(16)).Int64())
  331. }