|
@@ -4,6 +4,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
qu "qfw/util"
|
|
|
+ "regexp"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
@@ -22,6 +23,8 @@ const FailedPercentLimit = 0.20
|
|
|
//失败条数
|
|
|
const FailedNumLimit = 3
|
|
|
|
|
|
+//提取年月日正则
|
|
|
+var DateReg = regexp.MustCompile("(\\d){4}-(\\d){2}-(\\d){2}")
|
|
|
var CodeInfoMap map[string]*Spider
|
|
|
var AllHref map[string]string
|
|
|
var SameDayHref map[string]string
|
|
@@ -1923,6 +1926,9 @@ func ResetDataState() {
|
|
|
}
|
|
|
lock.Unlock()
|
|
|
}(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ logger.Debug("current:", n)
|
|
|
+ }
|
|
|
tmp = map[string]interface{}{}
|
|
|
}
|
|
|
wg.Wait()
|
|
@@ -1932,7 +1938,76 @@ func ResetDataState() {
|
|
|
arr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
lock.Unlock()
|
|
|
- logger.Info("-----更新数据状态完毕-----")
|
|
|
+ logger.Info("-----更新spider_highlistdata数据状态完毕-----")
|
|
|
+
|
|
|
+ //spider_historydata按发布时间更新数据、迁移数据
|
|
|
+ query = map[string]interface{}{
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$ne": 0,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ pTimeStartInt := time.Now().Unix() - int64(util.DayNum*86400)
|
|
|
+ pTimeStartStr := qu.FormatDateByInt64(&pTimeStartInt, qu.Date_Short_Layout)
|
|
|
+ it2 := sess.DB("spider").C("spider_historydata").Find(&query).Iter()
|
|
|
+ count, _ = sess.DB("spider").C("spider_historydata").Find(&query).Count()
|
|
|
+ logger.Info("更新数据状态数量:", count)
|
|
|
+ n2 := 0
|
|
|
+ arr2 := [][]map[string]interface{}{}
|
|
|
+ save := []map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it2.Next(tmp); n2++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ publishtime := qu.ObjToString(tmp["publishtime"])
|
|
|
+ publishtime = DateReg.FindString(publishtime)
|
|
|
+ state := qu.IntAll(tmp["state"])
|
|
|
+ lock.Lock()
|
|
|
+ if state == 1 { //下载成功数据迁移至spider_historydata
|
|
|
+ save = append(save, tmp)
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{"delete": true}})
|
|
|
+ } else if state == -1 {
|
|
|
+ if pTimeStartStr <= publishtime { //最近几天未下成功的数据状态重置
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
|
|
|
+ } else { //非最近几天下载失败的数据不再下载,进行迁移
|
|
|
+ save = append(save, tmp)
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{"delete": true}})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ arr2 = append(arr2, update)
|
|
|
+ if len(arr2) > 500 {
|
|
|
+ util.MgoS.UpdateBulk("spider_historydata", arr2...)
|
|
|
+ arr2 = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(save) > 500 {
|
|
|
+ util.MgoS.SaveBulk("spider_historydata_back", save...)
|
|
|
+ save = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n2%1000 == 0 {
|
|
|
+ logger.Debug("current:", n2)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr2) > 0 {
|
|
|
+ util.MgoS.UpdateBulk("spider_historydata", arr2...)
|
|
|
+ arr2 = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(save) > 0 {
|
|
|
+ util.MgoS.SaveBulk("spider_historydata_back", save...)
|
|
|
+ save = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ util.MgoS.Delete("spider_historydata", map[string]interface{}{"delete": true})
|
|
|
+ logger.Info("-----更新spider_historydata数据状态完毕-----")
|
|
|
}
|
|
|
|
|
|
//关闭任务
|