history.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package spider
  2. import (
  3. "fmt"
  4. "github.com/donnie4w/go-logger/logger"
  5. qu "qfw/util"
  6. sputil "spiderutil"
  7. "sync"
  8. "time"
  9. )
  10. var HistoryAllSpiders sync.Map = sync.Map{}
  11. //历史节点下载详情页
  12. func HistoryEventDownloadDetail() {
  13. defer qu.Catch()
  14. if !sputil.Config.IsHistoryEvent { //不是历史节点return
  15. return
  16. }
  17. GetHistoryDownloadSpider() //定时检测数据集汇总爬虫
  18. }
  19. //执行爬虫
  20. func (s *Spider) StartSpider() {
  21. defer qu.Catch()
  22. for {
  23. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  24. if !s.Stop { //爬虫是运行状态
  25. //s.DownloadHistoryDetail()
  26. s.DownloadDetail(true, true)
  27. } else { //爬虫停止运行,删除
  28. s.L.Close()
  29. HistoryAllSpiders.Delete(s.Code)
  30. logger.Info("Delete Code:", s.Code, "Stop:", s.Stop)
  31. break
  32. }
  33. }
  34. }
  35. //定时检测数据集汇总爬虫
  36. func GetHistoryDownloadSpider() {
  37. defer qu.Catch()
  38. logger.Info("定时统计下载历史爬虫开始...")
  39. sess := MgoS.GetMgoConn()
  40. defer MgoS.DestoryMongoConn(sess)
  41. wg := &sync.WaitGroup{}
  42. ch := make(chan bool, 2)
  43. match := map[string]interface{}{ //查询未下载过的数据
  44. "state": 0,
  45. }
  46. group := map[string]interface{}{
  47. "_id": "$spidercode",
  48. "count": map[string]interface{}{
  49. "$sum": 1,
  50. },
  51. }
  52. p := []map[string]interface{}{
  53. map[string]interface{}{"$match": match},
  54. map[string]interface{}{"$group": group},
  55. }
  56. it := sess.DB(MgoS.DbName).C("spider_historydata").Pipe(p).Iter()
  57. n := 0
  58. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  59. wg.Add(1)
  60. ch <- true
  61. go func(tmp map[string]interface{}) {
  62. defer func() {
  63. <-ch
  64. wg.Done()
  65. }()
  66. code := qu.ObjToString(tmp["_id"])
  67. count := qu.IntAll(tmp["count"])
  68. logger.Info("code:", code, " 当前待采集历史信息量:", count)
  69. //查询爬虫信息
  70. lua, _ := MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
  71. if len(*lua) > 0 {
  72. state := qu.IntAll((*lua)["state"])
  73. if state >= 7 || state == 4 { //已作废、无发布、需登录、无法处理、已删除、已上线(python)这些状态爬虫的数据不再下载
  74. MgoS.Update("spider_historydata",
  75. map[string]interface{}{"spidercode": code, "state": 0},
  76. map[string]interface{}{"$set": map[string]interface{}{"luastate": state, "state": -1, "updatetime": time.Now().Unix()}},
  77. false, true,
  78. )
  79. } else {
  80. old := qu.IntAll((*lua)["old_lua"])
  81. script := ""
  82. if old == 1 {
  83. script = fmt.Sprint((*lua)["luacontent"])
  84. } else {
  85. if (*lua)["oldlua"] != nil {
  86. if (*lua)["luacontent"] != nil {
  87. script = (*lua)["luacontent"].(string)
  88. }
  89. } else {
  90. script = GetScriptByTmp((*lua))
  91. }
  92. }
  93. spTmp, b := HistoryAllSpiders.Load(code)
  94. isNew := true
  95. if b { //更新正在运行爬虫信息
  96. sp, ok := spTmp.(*Spider)
  97. if ok {
  98. sp.ScriptFile = script
  99. sp.UserName = qu.ObjToString((*lua)["createuser"])
  100. sp.UserEmail = qu.ObjToString((*lua)["createuseremail"])
  101. sp.MUserName = qu.ObjToString((*lua)["modifyuser"])
  102. sp.MUserEmail = qu.ObjToString((*lua)["next"])
  103. isNew = false
  104. }
  105. }
  106. if isNew {
  107. sp, errstr := CreateSpider(code, script, true, false)
  108. if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功
  109. sp.IsMainThread = true //多线程采集时,判断哪个是主线程,由主线程采集时更新心跳
  110. HistoryAllSpiders.Store(code, sp) //存入集合
  111. logger.Info("start job:", code)
  112. go sp.StartSpider()
  113. } else {
  114. logger.Info(code, "脚本加载失败,请检查!")
  115. nowT := time.Now().Unix()
  116. username := "异常"
  117. if sp != nil {
  118. username = sp.MUserName
  119. }
  120. MgoS.Update("spider_loadfail",
  121. map[string]interface{}{
  122. "code": code,
  123. "modifytime": map[string]interface{}{
  124. "$gte": nowT - 12*3600,
  125. "$lte": nowT + 12*3600,
  126. },
  127. },
  128. map[string]interface{}{
  129. "$set": map[string]interface{}{
  130. "code": code,
  131. "type": "初始化",
  132. "script": script,
  133. "updatetime": nowT,
  134. "modifyuser": username,
  135. "event": sputil.Config.Uploadevent,
  136. "err": errstr,
  137. },
  138. }, true, false)
  139. }
  140. }
  141. }
  142. }
  143. }(tmp)
  144. tmp = map[string]interface{}{}
  145. }
  146. wg.Wait()
  147. //time.AfterFunc(time.Second*30, GetHistoryDownloadSpider)
  148. time.AfterFunc(time.Minute*30, GetHistoryDownloadSpider)
  149. }