123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package spider
- import (
- "flag"
- "github.com/cron"
- "github.com/donnie4w/go-logger/logger"
- "gopkg.in/mgo.v2/bson"
- "os"
- qu "qfw/util"
- "sync"
- "time"
- )
- /*
- 重点爬虫,定期补采
- */
- var (
- Supplement bool //是否为定时重采
- Supplement_Cycle string //运行周期(day:每天定点执行;week:每周定点执行)
- Supplement_Day int //补采多少天的数据
- Supplement_Publishtime int64 //补采数据最小的发布时间
- Supplement_Publishtime_ZeroTimes = 100 //列表页无发布时间采集退出次数
- Supplement_StartCron string //开始
- Supplement_EndCron string //关闭
- Supplement_MaxErrorTimes int //连续异常次数,中断采集
- Supplement_SaveData map[string]*SupplementSpider
- )
- type SupplementSpider struct {
- Site string `bson:"site"`
- Channel string `bson:"channel"`
- Spidercode string `bson:"spidercode"`
- Modifyuser string `bson:"modifyuser"`
- Finish int `bson:"finish"`
- SaveNum int `bson:"savenum"`
- EndPage int `bson:"endage"`
- DownNum int `bson:"downnum"`
- RepeatNum int `bson:"repeatnum"`
- Comeintime int64 `bson:"comeintime"`
- Success int `bson:"success"`
- Failed int `bson:"failed"`
- PublishtimeZeroNum int `bson:"ptimezeronum"`
- EffectiveNum int `bson:"effectivenum"`
- }
- func InitSupplement() {
- flag.BoolVar(&Supplement, "s", false, "是否为补采节点")
- flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行")
- flag.IntVar(&Supplement_Day, "d", 1, "补采几天的数据")
- flag.IntVar(&Supplement_MaxErrorTimes, "e", 5, "连续几页异常采集中断")
- flag.Parse()
- logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes)
- if Supplement {
- Supplement_SaveData = map[string]*SupplementSpider{}
- Supplement_Publishtime = GetTime(-Supplement_Day)
- if Supplement_Cycle == "day" {
- Supplement_StartCron = "0 0 22 ? * *"
- Supplement_EndCron = "0 0 9 ? * *"
- //InitSpider()
- } else if Supplement_Cycle == "week" {
- Supplement_StartCron = "0 0 0 ? * SAT"
- Supplement_EndCron = "0 0 0 ? * MON"
- }
- c := cron.New()
- c.Start()
- if Supplement_StartCron != "" && Supplement_EndCron != "" {
- c.AddFunc(Supplement_StartCron, SupplementStart)
- c.AddFunc(Supplement_EndCron, SupplementEnd)
- }
- }
- }
- func SupplementStart() {
- InitSpider() //加载爬虫,执行采集
- }
- func SupplementEnd() {
- SupplementDataCount() //补采数据统计,汇总
- SupplementDataSave()
- os.Exit(-1) //关闭应用
- }
- func SupplementDataCount() {
- logger.Info("补采数据统计开始...")
- timeEnd := GetStrTime(-1)
- timeStart := GetStrTime(-3)
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- startTime := time.Now().Unix() - 3600*12
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": startTime,
- },
- "event": 7001,
- }
- field := map[string]interface{}{
- "state": 1,
- "spidercode": 1,
- "publishtime": 1,
- }
- count1 := MgoS.Count("spider_historydata_back", query)
- logger.Info("spider_historydata_back count:", count1, startTime)
- it := sess.DB(MgoS.DbName).C("spider_historydata_back").Find(&query).Select(&field).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- state := qu.IntAll(tmp["state"])
- code := qu.ObjToString(tmp["spidercode"])
- publishtime := qu.ObjToString(tmp["publishtime"])
- lock.Lock()
- if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
- ss.SaveNum++
- if state == 1 {
- ss.Success++
- } else {
- ss.Failed++
- }
- if publishtime == "0" || publishtime == "" {
- ss.PublishtimeZeroNum++
- } else if publishtime >= timeStart && publishtime < timeEnd {
- ss.EffectiveNum++
- }
- }
- lock.Unlock()
- }(tmp)
- tmp = map[string]interface{}{}
- }
- count2 := MgoS.Count("spider_historydata", query)
- logger.Info("spider_historydata count:", count2)
- it1 := sess.DB(MgoS.DbName).C("spider_historydata").Find(&query).Select(&field).Iter()
- n1 := 0
- for tmp := make(map[string]interface{}); it1.Next(tmp); n1++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- state := qu.IntAll(tmp["state"])
- code := qu.ObjToString(tmp["spidercode"])
- publishtime := qu.ObjToString(tmp["publishtime"])
- lock.Lock()
- if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
- ss.SaveNum++
- if state == 1 {
- ss.Success++
- } else {
- ss.Failed++
- }
- if publishtime == "0" || publishtime == "" {
- ss.PublishtimeZeroNum++
- }
- }
- lock.Unlock()
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Info("补采数据统计完毕...")
- }
- func SupplementDataSave() {
- var saveArr []map[string]interface{}
- for code, ss := range Supplement_SaveData {
- bt, err := bson.Marshal(ss)
- if err != nil {
- logger.Info("supplement marshal err:", code)
- continue
- }
- save := map[string]interface{}{}
- if bson.Unmarshal(bt, &save) == nil {
- saveArr = append(saveArr, save)
- } else {
- logger.Info("supplement unmarshal err:", code)
- }
- }
- if len(saveArr) > 0 {
- MgoS.SaveBulk("spider_supplement", saveArr...)
- }
- }
|