Browse Source

最新-判重备份

apple 4 years ago
parent
commit
9736774399
2 changed files with 12 additions and 29 deletions
  1. 1 5
      udpfilterdup/src/datamap.go
  2. 11 24
      udpfilterdup/src/main.go

+ 1 - 5
udpfilterdup/src/datamap.go

@@ -72,7 +72,6 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	it := sess.DB(mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		//qutil.IntAll(tmp["dataging"]) == 1
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
 			qutil.IntAll(tmp["dataging"]) == 1 {
 
@@ -114,9 +113,7 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 				break
 			}
 		}
-		//if n%50000 == 0 {
-		//	log.Println("当前第",numIndex,"组数据池构建中:", n, continuSum,tmp["_id"])
-		//}
+
 		tmp = make(map[string]interface{})
 	}
 
@@ -472,7 +469,6 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 
 	}else {
-
 		if IsFull {
 			d.keymap = d.GetLatelyFiveDay(t)//全量
 		}else {

+ 11 - 24
udpfilterdup/src/main.go

@@ -47,7 +47,7 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	gtid,lteid,lastid,gtept string
+	gtid,lteid,lastid,gtept,ltept string			//命令输入
 	IdType         bool   							 //默认object类型
 	IsFull		   bool
 	updatelock 		sync.Mutex         //锁
@@ -57,13 +57,12 @@ var (
 
 func init() {
 
-	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
-	flag.StringVar(&gtid, "gtid", "", "历史的起始id")
-	flag.StringVar(&gtept, "pt", "", "全量发布时间")
-
+	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //以小于等于此id开始加载最近几天的数据
+	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")
+	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")
+	flag.StringVar(&gtept, "ltept", "", "全量lte发布时间")
 	flag.Parse()
 
-	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
 
 	task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
@@ -125,7 +124,6 @@ func init() {
 
 
 func main() {
-
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -139,7 +137,6 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)
@@ -174,14 +171,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		if err != nil {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 		} else if mapInfo != nil {
-			taskType := util.ObjToString(mapInfo["stype"])
-			if taskType == "normalTask" {
-				//判重流程
-				go task(data, mapInfo)
-			} else {
-				//其他
-				go task(data, mapInfo)
-			}
+			go task(data, mapInfo)
 			key, _ := mapInfo["key"].(string)
 			if key == "" {
 				key = "udpok"
@@ -218,11 +208,13 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
-
-	if IsFull && gtept!="" {
+	//增量
+	if IsFull && gtept!="" && ltept!=""{
+		log.Println("执行分段模式")
 		q = map[string]interface{}{
 			"publishtime": map[string]interface{}{
-				"$gte": util.Int64All(gtept)-864000,
+				"$gte": util.Int64All(gtept),
+				"$lte": util.Int64All(ltept),
 			},
 		}
 	}
@@ -256,7 +248,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				},
 			})
 			if len(updateExtract) >= 500 {
-				log.Println("批量-更新-sourcewebsite")
 				mgo.UpSertBulk(extract, updateExtract...)
 				updateExtract = [][]map[string]interface{}{}
 			}
@@ -289,7 +280,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				if IdType {
 					updateID["_id"] = info.id
 				}
-
 				repeat_ids:=source.repeat_ids
 				repeat_ids =  append(repeat_ids,info.id)
 				source.repeat_ids = repeat_ids
@@ -305,8 +295,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						},
 					},
 				})
-
-
 				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
 					updateID,
 					map[string]interface{}{
@@ -321,7 +309,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		}(tmp)
 		updatelock.Lock()
 		if len(updateExtract) >=500 {
-			log.Println("批量-更新")
 			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}