apple 5 years ago
parent
commit
3c9500ca4c
3 changed files with 58 additions and 6 deletions
  1. 9 0
      udpfilterdup/src/README.md
  2. 1 1
      udpfilterdup/src/datamap.go
  3. 48 5
      udpfilterdup/src/main.go

+ 9 - 0
udpfilterdup/src/README.md

@@ -1,3 +1,12 @@
+
+mgo = &MongodbSim{
+		MongodbAddr: "172.17.4.187:27083",
+		DbName:      "qfw",
+		Size:        10,
+	}
+mgo.InitPool()
+	return
+	
 func moveTimeoutData()  {
 	log.Println("部署迁移定时任务")
 	c := cron.New()

+ 1 - 1
udpfilterdup/src/datamap.go

@@ -304,7 +304,7 @@ L:
 						reasons = reason
 						break L
 					}
-					if info.href != "" && info.href != v.href {
+					if info.href != "" && info.href != v.href { //待优化
 						if v.title==info.title&&len([]rune(info.title)) >10 && isTheSameDay(info.publishtime,v.publishtime){
 							if !againRepeat(v, info) {//进行同站点二次判断
 								reason = "同站点-href不同-标题相同等"

+ 48 - 5
udpfilterdup/src/main.go

@@ -8,6 +8,8 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -103,6 +105,7 @@ func init() {
 
 
 func main() {
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -116,13 +119,13 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-	testRepairData11()
-	return
+
+	//testRepairData11()
+	//return
 
 	if TimingTask {
 		log.Println("新历史任务测试开始")
 		go historyTaskDay()
-		//go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
@@ -473,6 +476,7 @@ func historyTaskDay() {
 					})
 					if len(updateExtract) > 50 {
 						mgo.UpSertBulk(extract, updateExtract...)
+
 						updateExtract = [][]map[string]interface{}{}
 					}
 
@@ -485,6 +489,7 @@ func historyTaskDay() {
 		//批量更新标记
 		if len(updateExtract) > 0 {
 			mgo.UpSertBulk(extract, updateExtract...)
+
 			updateExtract = [][]map[string]interface{}{}
 		}
 
@@ -629,8 +634,6 @@ func historyTaskDay() {
 		}
 		log.Println("继续下一段的历史判重")
 	}
-
-
 }
 
 
@@ -678,6 +681,46 @@ func moveHistoryData(startid string,endid string) {
 
 
 
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
+}
+
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
+	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$lt": StringTOBsonId(task_id),
+		},
+	}
+
+	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		mgo.Save("result_20200713", tmp)
+		mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+
+
+
+
+
+}
+
+