apple vor 5 Jahren
Ursprung
Commit
bed20b0fbe
3 geänderte Dateien mit 73 neuen und 10 gelöschten Zeilen
  1. 44 2
      udpfilterdup/src/datamap.go
  2. 28 7
      udpfilterdup/src/main.go
  3. 1 1
      udpfilterdup/src/mgo.go

+ 44 - 2
udpfilterdup/src/datamap.go

@@ -63,8 +63,50 @@ type historymap struct {
 	keys   map[string]bool
 }
 
-func TimedTaskDatamap(days int, lastid string) *datamap {
+func TimedTaskDatamap(days int,lasttime int64) *datamap {
+	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	if lastid == "" {
+		return dm
+	}
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{"publishtime": map[string]interface{}{
+		"$lte": lasttime,
+	}}
+	log.Println("query", query)
+	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
+	n, continuSum := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||qutil.IntAll(tmp["dataging"]) == 1 {
+			continuSum++
+		} else {
+			pt := tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+			if pt_time <= 0 {
+				continue
+			}
+			if qutil.Float64All(lasttime-pt_time) < datelimit {
+				info := NewInfo(tmp)
+				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
+				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+				data := dm.data[k]
+				if data == nil {
+					data = []*Info{}
+				}
+				data = append(data, info)
+				dm.data[k] = data
+				dm.keys[dkey] = true
+			} else {
+				break
+			}
+		}
+		if n%5000 == 0 {
+			log.Println("current n:", n, continuSum)
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Println("load data:", n)
 	return dm
 }
 
@@ -90,7 +132,7 @@ func NewDatamap(days int, lastid string) *datamap {
 			continuSum++
 		} else {
 			pt := tmp["comeintime"]
-			if Is_Sort||TimingTask {
+			if Is_Sort {
 				pt = tmp["publishtime"]
 			}
 			pt_time := qutil.Int64All(pt)

+ 28 - 7
udpfilterdup/src/main.go

@@ -650,8 +650,13 @@ func timedTaskOnce() {
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	//间隔时间 半年
+	between_time := curTime.Unix()-(86400*180)
 	lastid := task_sid
-	log.Println(task_sid, task_eid)
+	lasttime := int64(0)
+	//
+
+	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
 
 	//ObjectId("5da3f31aa5cb26b9b798d3aa")
 	//ObjectId("5da418c4a5cb26b9b7e3e9a6")
@@ -660,7 +665,7 @@ func timedTaskOnce() {
 
 	defer util.Catch()
 	//区间id
-	q := map[string]interface{}{
+	q_start := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gte": StringTOBsonId(task_sid),
 			"$lte": StringTOBsonId(task_eid),
@@ -668,22 +673,38 @@ func timedTaskOnce() {
 	}
 	sess_start := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess_start)
-	it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
 	startNum := 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
 		if startNum%10000 == 0 {
 			log.Println("正序遍历:", startNum)
 		}
-		if util.IntAll(tmp_start["dataging"]) == 1 { //取起始id
-			lastid = BsonTOStringId(tmp_start["_id"])
-			break
+		//取-符合-发布时间半年内的数据
+		if util.IntAll(tmp_start["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp_start["publishtime"])
+			if pubtime>0 && pubtime>between_time {
+				lastid = BsonTOStringId(tmp_start["_id"])
+				lasttime = pubtime
+				break
+			}
 		}
 	}
 
-	DM = NewDatamap(dupdays, lastid)
+
+	DM = TimedTaskDatamap(dupdays,lasttime)
 	log.Println("本地数据加载完成,定时任务数据判重开始")
+
+
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
+
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": StringTOBsonId(lastid),
+			"$lte": StringTOBsonId(task_eid),
+		},
+	}
+
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint64(m.Size))
+	opts.SetMaxPoolSize(uint16(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)