package spider import ( "flag" "github.com/cron" "github.com/donnie4w/go-logger/logger" "gopkg.in/mgo.v2/bson" "os" qu "qfw/util" "sync" "time" ) /* 重点爬虫,定期补采 */ var ( Supplement bool //是否为定时重采 Supplement_Cycle string //运行周期(day:每天定点执行;week:每周定点执行) Supplement_Day int //补采多少天的数据 Supplement_Publishtime int64 //补采数据最小的发布时间 Supplement_Publishtime_ZeroTimes = 100 //列表页无发布时间采集退出次数 Supplement_StartCron string //开始 Supplement_EndCron string //关闭 Supplement_MaxErrorTimes int //连续异常次数,中断采集 Supplement_SaveData map[string]*SupplementSpider ) type SupplementSpider struct { Site string `bson:"site"` Channel string `bson:"channel"` Spidercode string `bson:"spidercode"` Modifyuser string `bson:"modifyuser"` Finish int `bson:"finish"` SaveNum int `bson:"savenum"` EndPage int `bson:"endage"` DownNum int `bson:"downnum"` RepeatNum int `bson:"repeatnum"` Comeintime int64 `bson:"comeintime"` Success int `bson:"success"` Failed int `bson:"failed"` PublishtimeZeroNum int `bson:"ptimezeronum"` EffectiveNum int `bson:"effectivenum"` } func InitSupplement() { flag.BoolVar(&Supplement, "s", false, "是否为补采节点") flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行") flag.IntVar(&Supplement_Day, "d", 1, "补采几天的数据") flag.IntVar(&Supplement_MaxErrorTimes, "e", 5, "连续几页异常采集中断") flag.Parse() logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes) if Supplement { Supplement_SaveData = map[string]*SupplementSpider{} Supplement_Publishtime = GetTime(-Supplement_Day) if Supplement_Cycle == "day" { Supplement_StartCron = "0 0 22 ? * *" Supplement_EndCron = "0 0 9 ? * *" //InitSpider() } else if Supplement_Cycle == "week" { Supplement_StartCron = "0 0 0 ? * SAT" Supplement_EndCron = "0 0 0 ? * MON" } c := cron.New() c.Start() if Supplement_StartCron != "" && Supplement_EndCron != "" { c.AddFunc(Supplement_StartCron, SupplementStart) c.AddFunc(Supplement_EndCron, SupplementEnd) } } } func SupplementStart() { InitSpider() //加载爬虫,执行采集 } func SupplementEnd() { SupplementDataCount() //补采数据统计,汇总 SupplementDataSave() os.Exit(-1) //关闭应用 } func SupplementDataCount() { logger.Info("补采数据统计开始...") timeEnd := GetStrTime(-1) timeStart := GetStrTime(-3) sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} startTime := time.Now().Unix() - 3600*12 query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": startTime, }, "event": 7001, } field := map[string]interface{}{ "state": 1, "spidercode": 1, "publishtime": 1, } count1 := MgoS.Count("spider_historydata_back", query) logger.Info("spider_historydata_back count:", count1, startTime) it := sess.DB(MgoS.DbName).C("spider_historydata_back").Find(&query).Select(&field).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() state := qu.IntAll(tmp["state"]) code := qu.ObjToString(tmp["spidercode"]) publishtime := qu.ObjToString(tmp["publishtime"]) lock.Lock() if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕 ss.SaveNum++ if state == 1 { ss.Success++ } else { ss.Failed++ } if publishtime == "0" || publishtime == "" { ss.PublishtimeZeroNum++ } else if publishtime >= timeStart && publishtime < timeEnd { ss.EffectiveNum++ } } lock.Unlock() }(tmp) tmp = map[string]interface{}{} } count2 := MgoS.Count("spider_historydata", query) logger.Info("spider_historydata count:", count2) it1 := sess.DB(MgoS.DbName).C("spider_historydata").Find(&query).Select(&field).Iter() n1 := 0 for tmp := make(map[string]interface{}); it1.Next(tmp); n1++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() state := qu.IntAll(tmp["state"]) code := qu.ObjToString(tmp["spidercode"]) publishtime := qu.ObjToString(tmp["publishtime"]) lock.Lock() if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕 ss.SaveNum++ if state == 1 { ss.Success++ } else { ss.Failed++ } if publishtime == "0" || publishtime == "" { ss.PublishtimeZeroNum++ } } lock.Unlock() }(tmp) tmp = map[string]interface{}{} } wg.Wait() logger.Info("补采数据统计完毕...") } func SupplementDataSave() { var saveArr []map[string]interface{} for code, ss := range Supplement_SaveData { bt, err := bson.Marshal(ss) if err != nil { logger.Info("supplement marshal err:", code) continue } save := map[string]interface{}{} if bson.Unmarshal(bt, &save) == nil { saveArr = append(saveArr, save) } else { logger.Info("supplement unmarshal err:", code) } } if len(saveArr) > 0 { MgoS.SaveBulk("spider_supplement", saveArr...) } }