supplement.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package spider
  2. import (
  3. "flag"
  4. "github.com/cron"
  5. "github.com/donnie4w/go-logger/logger"
  6. "gopkg.in/mgo.v2/bson"
  7. "os"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. /*
  13. 重点爬虫,定期补采
  14. */
  15. var (
  16. Supplement bool //是否为定时重采
  17. Supplement_Cycle string //运行周期(day:每天定点执行;week:每周定点执行)
  18. Supplement_Day int //补采多少天的数据
  19. Supplement_Publishtime int64 //补采数据最小的发布时间
  20. Supplement_Publishtime_ZeroTimes = 100 //列表页无发布时间采集退出次数
  21. Supplement_StartCron string //开始
  22. Supplement_EndCron string //关闭
  23. Supplement_MaxErrorTimes int //连续异常次数,中断采集
  24. Supplement_SaveData map[string]*SupplementSpider
  25. )
  26. type SupplementSpider struct {
  27. Site string `bson:"site"`
  28. Channel string `bson:"channel"`
  29. Spidercode string `bson:"spidercode"`
  30. Modifyuser string `bson:"modifyuser"`
  31. Finish int `bson:"finish"`
  32. SaveNum int `bson:"savenum"`
  33. EndPage int `bson:"endage"`
  34. DownNum int `bson:"downnum"`
  35. RepeatNum int `bson:"repeatnum"`
  36. Comeintime int64 `bson:"comeintime"`
  37. Success int `bson:"success"`
  38. Failed int `bson:"failed"`
  39. PublishtimeZeroNum int `bson:"ptimezeronum"`
  40. }
  41. func InitSupplement() {
  42. flag.BoolVar(&Supplement, "s", false, "是否为补采节点")
  43. flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行")
  44. flag.IntVar(&Supplement_Day, "d", 1, "补采几天的数据")
  45. flag.IntVar(&Supplement_MaxErrorTimes, "e", 5, "连续几页异常采集中断")
  46. flag.Parse()
  47. logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes)
  48. if Supplement {
  49. Supplement_SaveData = map[string]*SupplementSpider{}
  50. Supplement_Publishtime = GetTime(-Supplement_Day)
  51. if Supplement_Cycle == "day" {
  52. Supplement_StartCron = "0 0 22 ? * *"
  53. Supplement_EndCron = "0 0 9 ? * *"
  54. //InitSpider()
  55. } else if Supplement_Cycle == "week" {
  56. Supplement_StartCron = "0 0 0 ? * SAT"
  57. Supplement_EndCron = "0 0 0 ? * MON"
  58. }
  59. c := cron.New()
  60. c.Start()
  61. if Supplement_StartCron != "" && Supplement_EndCron != "" {
  62. c.AddFunc(Supplement_StartCron, SupplementStart)
  63. c.AddFunc(Supplement_EndCron, SupplementEnd)
  64. }
  65. }
  66. }
  67. func SupplementStart() {
  68. InitSpider() //加载爬虫,执行采集
  69. }
  70. func SupplementEnd() {
  71. SupplementDataCount() //补采数据统计,汇总
  72. SupplementDataSave()
  73. os.Exit(-1) //关闭应用
  74. }
  75. func SupplementDataCount() {
  76. logger.Info("补采数据统计开始...")
  77. sess := MgoS.GetMgoConn()
  78. defer MgoS.DestoryMongoConn(sess)
  79. ch := make(chan bool, 5)
  80. wg := &sync.WaitGroup{}
  81. lock := &sync.Mutex{}
  82. startTime := time.Now().Unix() - 3600*12
  83. query := map[string]interface{}{
  84. "comeintime": map[string]interface{}{
  85. "$gte": startTime,
  86. },
  87. "event": 7001,
  88. }
  89. field := map[string]interface{}{
  90. "state": 1,
  91. "spidercode": 1,
  92. "publishtime": 1,
  93. }
  94. count1 := MgoS.Count("spider_historydata_back", query)
  95. logger.Info("spider_historydata_back count:", count1, startTime)
  96. it := sess.DB(MgoS.DbName).C("spider_historydata_back").Find(&query).Select(&field).Iter()
  97. n := 0
  98. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  99. ch <- true
  100. wg.Add(1)
  101. go func(tmp map[string]interface{}) {
  102. defer func() {
  103. <-ch
  104. wg.Done()
  105. }()
  106. state := qu.IntAll(tmp["state"])
  107. code := qu.ObjToString(tmp["spidercode"])
  108. publishtime := qu.ObjToString(tmp["publishtime"])
  109. lock.Lock()
  110. if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
  111. ss.SaveNum++
  112. if state == 1 {
  113. ss.Success++
  114. } else {
  115. ss.Failed++
  116. }
  117. if publishtime == "0" || publishtime == "" {
  118. ss.PublishtimeZeroNum++
  119. }
  120. }
  121. lock.Unlock()
  122. }(tmp)
  123. tmp = map[string]interface{}{}
  124. }
  125. count2 := MgoS.Count("spider_historydata", query)
  126. logger.Info("spider_historydata count:", count2)
  127. it1 := sess.DB(MgoS.DbName).C("spider_historydata").Find(&query).Select(&field).Iter()
  128. n1 := 0
  129. for tmp := make(map[string]interface{}); it1.Next(tmp); n1++ {
  130. ch <- true
  131. wg.Add(1)
  132. go func(tmp map[string]interface{}) {
  133. defer func() {
  134. <-ch
  135. wg.Done()
  136. }()
  137. state := qu.IntAll(tmp["state"])
  138. code := qu.ObjToString(tmp["spidercode"])
  139. publishtime := qu.ObjToString(tmp["publishtime"])
  140. lock.Lock()
  141. if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
  142. ss.SaveNum++
  143. if state == 1 {
  144. ss.Success++
  145. } else {
  146. ss.Failed++
  147. }
  148. if publishtime == "0" || publishtime == "" {
  149. ss.PublishtimeZeroNum++
  150. }
  151. }
  152. lock.Unlock()
  153. }(tmp)
  154. tmp = map[string]interface{}{}
  155. }
  156. wg.Wait()
  157. logger.Info("补采数据统计完毕...")
  158. }
  159. func SupplementDataSave() {
  160. var saveArr []map[string]interface{}
  161. for code, ss := range Supplement_SaveData {
  162. bt, err := bson.Marshal(ss)
  163. if err != nil {
  164. logger.Info("supplement marshal err:", code)
  165. continue
  166. }
  167. save := map[string]interface{}{}
  168. if bson.Unmarshal(bt, &save) == nil {
  169. saveArr = append(saveArr, save)
  170. } else {
  171. logger.Info("supplement unmarshal err:", code)
  172. }
  173. }
  174. if len(saveArr) > 0 {
  175. MgoS.SaveBulk("spider_supplement", saveArr...)
  176. }
  177. }