history.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package spider
  2. import (
  3. "fmt"
  4. "github.com/donnie4w/go-logger/logger"
  5. qu "qfw/util"
  6. sputil "spiderutil"
  7. "sync"
  8. "time"
  9. )
  10. var HistoryAllSpiders sync.Map = sync.Map{}
  11. //历史节点下载详情页
  12. func HistoryEventDownloadDetail() {
  13. defer qu.Catch()
  14. if !sputil.Config.IsHistoryEvent { //不是历史节点return
  15. return
  16. }
  17. GetHistoryDownloadSpider() //定时检测数据集汇总爬虫
  18. }
  19. //执行爬虫
  20. func (s *Spider) StartSpider() {
  21. defer qu.Catch()
  22. for {
  23. logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
  24. if !s.Stop { //爬虫是运行状态
  25. //s.DownloadHistoryDetail()
  26. s.DownloadDetail(true, true)
  27. } else { //爬虫停止运行,删除
  28. s.L.Close()
  29. HistoryAllSpiders.Delete(s.Code)
  30. logger.Info("Delete Code:", s.Code, "Stop:", s.Stop)
  31. break
  32. }
  33. }
  34. }
  35. //加载应采集数据,进行采集
  36. func (s *Spider) DownloadHistoryDetail() {
  37. defer qu.Catch()
  38. q := map[string]interface{}{"spidercode": s.Code, "state": 0}
  39. o := map[string]interface{}{"_id": 1}
  40. f := map[string]interface{}{
  41. "state": 0,
  42. "comeintime": 0,
  43. "event": 0,
  44. }
  45. //UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录采集三级页心跳
  46. list, _ := MgoS.Find("spider_historydata", q, o, f, false, 0, 200)
  47. if len(*list) == 0 { //数据量为0,表示无可下载数据,爬虫作废
  48. s.Stop = true
  49. return
  50. }
  51. //采集(目前未开多线程)
  52. for _, tmp := range *list {
  53. id := tmp["_id"]
  54. href := qu.ObjToString(tmp["href"])
  55. hashHref := HexText(href)
  56. isExist := sputil.RedisClusterExists(hashHref) //全量href redis判重
  57. if isExist {
  58. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "href", "updatetime": time.Now().Unix()}} //已存在state置为1
  59. MgoS.UpdateById("spider_historydata", id, set)
  60. return
  61. }
  62. success := true //数据是否下载成功的标志
  63. delete(tmp, "_id") //删除列表页信息无用字段_id
  64. data := map[string]interface{}{}
  65. for k, v := range tmp {
  66. data[k] = v
  67. }
  68. //下载、解析、入库
  69. data, err := s.DownloadDetailPage(tmp, data)
  70. //UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //下载数据心跳
  71. if err != nil || data == nil {
  72. success = false
  73. if err != nil {
  74. logger.Error(s.Code, err, tmp)
  75. //if len(tmp) > 0 {
  76. // SaveErrorData(s.MUserName, tmp, err) //保存错误信息
  77. //}
  78. } /*else if data == nil && times >= 3 { //下载问题,建editor任务
  79. DownloadErrorData(s.Code, tmp)
  80. }*/
  81. } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
  82. sputil.RedisClusterSet(hashHref, "", -1)
  83. }
  84. if !success { //下载失败
  85. set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}}
  86. MgoS.UpdateById("spider_historydata", id, set)
  87. return
  88. } else if data["delete"] != nil { //三级页过滤
  89. sputil.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
  90. //更新mgo 要删除的数据更新spider_historydata state=1不再下载,更新redis
  91. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
  92. MgoS.UpdateById("spider_historydata", id, set)
  93. return
  94. }
  95. //正文、附件分析,下载异常数据重新下载
  96. if AnalysisProjectInfo(data) {
  97. set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "detailfilerr": true, "updatetime": time.Now().Unix()}}
  98. MgoS.UpdateById("spider_historydata", id, set)
  99. return
  100. }
  101. t1 := sputil.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
  102. if t1 > time.Now().Unix() { //防止发布时间超前
  103. data["publishtime"] = time.Now().Unix()
  104. }
  105. delete(data, "exit")
  106. delete(data, "checkpublishtime")
  107. data["comeintime"] = time.Now().Unix()
  108. data["spidercode"] = s.Code
  109. data["dataging"] = 0
  110. data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
  111. //发送保存服务
  112. Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
  113. set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
  114. MgoS.UpdateById("spider_historydata", id, set)
  115. }
  116. //采集完LoadScript
  117. s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
  118. }
  119. //定时检测数据集汇总爬虫
  120. func GetHistoryDownloadSpider() {
  121. defer qu.Catch()
  122. logger.Info("定时统计下载历史爬虫开始...")
  123. sess := MgoS.GetMgoConn()
  124. defer MgoS.DestoryMongoConn(sess)
  125. wg := &sync.WaitGroup{}
  126. ch := make(chan bool, 2)
  127. match := map[string]interface{}{ //查询未下载过的数据
  128. "state": 0,
  129. }
  130. group := map[string]interface{}{
  131. "_id": "$spidercode",
  132. "count": map[string]interface{}{
  133. "$sum": 1,
  134. },
  135. }
  136. p := []map[string]interface{}{
  137. map[string]interface{}{"$match": match},
  138. map[string]interface{}{"$group": group},
  139. }
  140. it := sess.DB(MgoS.DbName).C("spider_historydata").Pipe(p).Iter()
  141. n := 0
  142. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  143. wg.Add(1)
  144. ch <- true
  145. go func(tmp map[string]interface{}) {
  146. defer func() {
  147. <-ch
  148. wg.Done()
  149. }()
  150. code := qu.ObjToString(tmp["_id"])
  151. count := qu.IntAll(tmp["count"])
  152. logger.Info("code:", code, " 当前待采集历史信息量:", count)
  153. //查询爬虫信息
  154. lua, _ := MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
  155. if len(*lua) > 0 {
  156. state := qu.IntAll((*lua)["state"])
  157. if state >= 7 || state == 4 { //已作废、无发布、需登录、无法处理、已删除、已上线(python)这些状态爬虫的数据不再下载
  158. MgoS.Update("spider_historydata",
  159. map[string]interface{}{"spidercode": code, "state": 0},
  160. map[string]interface{}{"$set": map[string]interface{}{"luastate": state, "state": -1, "updatetime": time.Now().Unix()}},
  161. false, true,
  162. )
  163. } else {
  164. old := qu.IntAll((*lua)["old_lua"])
  165. script := ""
  166. if old == 1 {
  167. script = fmt.Sprint((*lua)["luacontent"])
  168. } else {
  169. if (*lua)["oldlua"] != nil {
  170. if (*lua)["luacontent"] != nil {
  171. script = (*lua)["luacontent"].(string)
  172. }
  173. } else {
  174. script = GetScriptByTmp((*lua))
  175. }
  176. }
  177. spTmp, b := HistoryAllSpiders.Load(code)
  178. isNew := true
  179. if b { //更新正在运行爬虫信息
  180. sp, ok := spTmp.(*Spider)
  181. if ok {
  182. sp.ScriptFile = script
  183. sp.UserName = qu.ObjToString((*lua)["createuser"])
  184. sp.UserEmail = qu.ObjToString((*lua)["createuseremail"])
  185. sp.MUserName = qu.ObjToString((*lua)["modifyuser"])
  186. sp.MUserEmail = qu.ObjToString((*lua)["next"])
  187. isNew = false
  188. }
  189. }
  190. if isNew {
  191. sp, errstr := CreateSpider(code, script, true, false)
  192. if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功
  193. sp.IsMainThread = true //多线程采集时,判断哪个是主线程,由主线程采集时更新心跳
  194. HistoryAllSpiders.Store(code, sp) //存入集合
  195. logger.Info("start job:", code)
  196. go sp.StartSpider()
  197. } else {
  198. logger.Info(code, "脚本加载失败,请检查!")
  199. nowT := time.Now().Unix()
  200. username := "异常"
  201. if sp != nil {
  202. username = sp.MUserName
  203. }
  204. MgoS.Update("spider_loadfail",
  205. map[string]interface{}{
  206. "code": code,
  207. "modifytime": map[string]interface{}{
  208. "$gte": nowT - 12*3600,
  209. "$lte": nowT + 12*3600,
  210. },
  211. },
  212. map[string]interface{}{
  213. "$set": map[string]interface{}{
  214. "code": code,
  215. "type": "初始化",
  216. "script": script,
  217. "updatetime": nowT,
  218. "modifyuser": username,
  219. "event": sputil.Config.Uploadevent,
  220. "err": errstr,
  221. },
  222. }, true, false)
  223. }
  224. }
  225. }
  226. }
  227. }(tmp)
  228. tmp = map[string]interface{}{}
  229. }
  230. wg.Wait()
  231. //time.AfterFunc(time.Second*30, GetHistoryDownloadSpider)
  232. time.AfterFunc(time.Minute*30, GetHistoryDownloadSpider)
  233. }