supplement.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package spider
  2. import (
  3. "flag"
  4. "github.com/cron"
  5. "github.com/donnie4w/go-logger/logger"
  6. "gopkg.in/mgo.v2/bson"
  7. "os"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. /*
  13. 重点爬虫,定期补采
  14. */
  15. var (
  16. Supplement bool //是否为定时重采
  17. Supplement_Cycle string //运行周期(day:每天定点执行;week:每周定点执行)
  18. Supplement_Day int //补采多少天的数据
  19. Supplement_Publishtime int64 //补采数据最小的发布时间
  20. Supplement_Publishtime_ZeroTimes = 100 //列表页无发布时间采集退出次数
  21. Supplement_StartCron string //开始
  22. Supplement_EndCron string //关闭
  23. Supplement_MaxErrorTimes int //连续异常次数,中断采集
  24. Supplement_SaveData map[string]*SupplementSpider
  25. )
  26. type SupplementSpider struct {
  27. Site string `bson:"site"`
  28. Channel string `bson:"channel"`
  29. Spidercode string `bson:"spidercode"`
  30. Modifyuser string `bson:"modifyuser"`
  31. Finish int `bson:"finish"`
  32. SaveNum int `bson:"savenum"`
  33. EndPage int `bson:"endage"`
  34. DownNum int `bson:"downnum"`
  35. RepeatNum int `bson:"repeatnum"`
  36. Comeintime int64 `bson:"comeintime"`
  37. Success int `bson:"success"`
  38. Failed int `bson:"failed"`
  39. PublishtimeZeroNum int `bson:"ptimezeronum"`
  40. EffectiveNum int `bson:"effectivenum"`
  41. }
  42. func InitSupplement() {
  43. flag.BoolVar(&Supplement, "s", false, "是否为补采节点")
  44. flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行")
  45. flag.IntVar(&Supplement_Day, "d", 1, "补采几天的数据")
  46. flag.IntVar(&Supplement_MaxErrorTimes, "e", 5, "连续几页异常采集中断")
  47. flag.Parse()
  48. logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes)
  49. if Supplement {
  50. Supplement_SaveData = map[string]*SupplementSpider{}
  51. Supplement_Publishtime = GetTime(-Supplement_Day)
  52. if Supplement_Cycle == "day" {
  53. Supplement_StartCron = "0 0 22 ? * *"
  54. Supplement_EndCron = "0 0 9 ? * *"
  55. //InitSpider()
  56. } else if Supplement_Cycle == "week" {
  57. Supplement_StartCron = "0 0 0 ? * SAT"
  58. Supplement_EndCron = "0 0 0 ? * MON"
  59. }
  60. c := cron.New()
  61. c.Start()
  62. if Supplement_StartCron != "" && Supplement_EndCron != "" {
  63. c.AddFunc(Supplement_StartCron, SupplementStart)
  64. c.AddFunc(Supplement_EndCron, SupplementEnd)
  65. }
  66. }
  67. }
  68. func SupplementStart() {
  69. InitSpider() //加载爬虫,执行采集
  70. }
  71. func SupplementEnd() {
  72. SupplementDataCount() //补采数据统计,汇总
  73. SupplementDataSave()
  74. os.Exit(-1) //关闭应用
  75. }
  76. func SupplementDataCount() {
  77. logger.Info("补采数据统计开始...")
  78. timeEnd := GetStrTime(-1)
  79. timeStart := GetStrTime(-3)
  80. sess := MgoS.GetMgoConn()
  81. defer MgoS.DestoryMongoConn(sess)
  82. ch := make(chan bool, 5)
  83. wg := &sync.WaitGroup{}
  84. lock := &sync.Mutex{}
  85. startTime := time.Now().Unix() - 3600*12
  86. query := map[string]interface{}{
  87. "comeintime": map[string]interface{}{
  88. "$gte": startTime,
  89. },
  90. "event": 7001,
  91. }
  92. field := map[string]interface{}{
  93. "state": 1,
  94. "spidercode": 1,
  95. "publishtime": 1,
  96. }
  97. count1 := MgoS.Count("spider_historydata_back", query)
  98. logger.Info("spider_historydata_back count:", count1, startTime)
  99. it := sess.DB(MgoS.DbName).C("spider_historydata_back").Find(&query).Select(&field).Iter()
  100. n := 0
  101. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  102. ch <- true
  103. wg.Add(1)
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-ch
  107. wg.Done()
  108. }()
  109. state := qu.IntAll(tmp["state"])
  110. code := qu.ObjToString(tmp["spidercode"])
  111. publishtime := qu.ObjToString(tmp["publishtime"])
  112. lock.Lock()
  113. if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
  114. ss.SaveNum++
  115. if state == 1 {
  116. ss.Success++
  117. } else {
  118. ss.Failed++
  119. }
  120. if publishtime == "0" || publishtime == "" {
  121. ss.PublishtimeZeroNum++
  122. } else if publishtime >= timeStart && publishtime < timeEnd {
  123. ss.EffectiveNum++
  124. }
  125. }
  126. lock.Unlock()
  127. }(tmp)
  128. tmp = map[string]interface{}{}
  129. }
  130. count2 := MgoS.Count("spider_historydata", query)
  131. logger.Info("spider_historydata count:", count2)
  132. it1 := sess.DB(MgoS.DbName).C("spider_historydata").Find(&query).Select(&field).Iter()
  133. n1 := 0
  134. for tmp := make(map[string]interface{}); it1.Next(tmp); n1++ {
  135. ch <- true
  136. wg.Add(1)
  137. go func(tmp map[string]interface{}) {
  138. defer func() {
  139. <-ch
  140. wg.Done()
  141. }()
  142. state := qu.IntAll(tmp["state"])
  143. code := qu.ObjToString(tmp["spidercode"])
  144. publishtime := qu.ObjToString(tmp["publishtime"])
  145. lock.Lock()
  146. if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
  147. ss.SaveNum++
  148. if state == 1 {
  149. ss.Success++
  150. } else {
  151. ss.Failed++
  152. }
  153. if publishtime == "0" || publishtime == "" {
  154. ss.PublishtimeZeroNum++
  155. }
  156. }
  157. lock.Unlock()
  158. }(tmp)
  159. tmp = map[string]interface{}{}
  160. }
  161. wg.Wait()
  162. logger.Info("补采数据统计完毕...")
  163. }
  164. func SupplementDataSave() {
  165. var saveArr []map[string]interface{}
  166. for code, ss := range Supplement_SaveData {
  167. bt, err := bson.Marshal(ss)
  168. if err != nil {
  169. logger.Info("supplement marshal err:", code)
  170. continue
  171. }
  172. save := map[string]interface{}{}
  173. if bson.Unmarshal(bt, &save) == nil {
  174. saveArr = append(saveArr, save)
  175. } else {
  176. logger.Info("supplement unmarshal err:", code)
  177. }
  178. }
  179. if len(saveArr) > 0 {
  180. MgoS.SaveBulk("spider_supplement", saveArr...)
  181. }
  182. }