소스 검색

备份-更新时间

apple 4 년 전
부모
커밋
9b70e4f6c3
3개의 변경된 파일20개의 추가작업 그리고 10개의 파일을 삭제
  1. 2 2
      udpfilterdup/src/config.json
  2. 0 1
      udpfilterdup/src/datamap.go
  3. 18 7
      udpfilterdup/src/main.go

+ 2 - 2
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "extract_kf",
-        "extract": "zk_test",
-        "extract_back": "zk_test",
+        "extract": "zk_repeat_test",
+        "extract_back": "zk_repeat_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"

+ 0 - 1
udpfilterdup/src/datamap.go

@@ -269,7 +269,6 @@ L:
 					return false, v, ""
 				}
 
-
 				//buyer 优先级高,有值且不相等过滤
 				if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer {
 					if buyerIsContinue(v,info) {

+ 18 - 7
udpfilterdup/src/main.go

@@ -51,13 +51,14 @@ var (
 	gtid,lastid,gtept,ltept string			//命令输入
 	lteid	string							//历史增量属性
 	IsFull		   bool								//是否全量
-	updatelock 		sync.Mutex         //锁
+	updatelock 		sync.Mutex         //锁4
+
 )
 
 
 
 func init() {
-
+	
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
@@ -130,12 +131,19 @@ func init() {
 
 
 func main() {
+
+	//exportAllBuyerAlias()
+	////testmain()
+	//return
+
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	if TimingTask {
+		log.Println("正常历史部署")
 		go historyTaskDay()
 	}else {
 		if gtept!=""&&ltept!="" {
@@ -163,14 +171,12 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-
-	testXiufu24()
-	time.Sleep(99999 * time.Hour)
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
 		IsFull = true	//全量判重
+
 		sid := "1fffffffffffffffffffffff"
 		eid := "9fffffffffffffffffffffff"
 		mapinfo := map[string]interface{}{}
@@ -420,6 +426,7 @@ func historyTaskDay() {
 						"$set": map[string]interface{}{
 							"repeat": 1,
 							"dataging": 0,
+							"history_updatetime":util.Int64All(time.Now().Unix()),
 							"repeat_reason": "sourcewebsite为1 重复",
 						},
 					},
@@ -458,6 +465,7 @@ func historyTaskDay() {
 						map[string]interface{}{
 							"$set": map[string]interface{}{
 								"dataging": 0,
+								"history_updatetime":util.Int64All(time.Now().Unix()),
 							},
 						},
 					}
@@ -471,7 +479,7 @@ func historyTaskDay() {
 			dayArr = []map[string]interface{}{}
 		}
 
-		log.Println("查询数量:",num,"符合条件:",oknum)
+		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
 
 		if len(pendAllArr) <= 0 {
 			log.Println("没找到dataging==1的数据")
@@ -556,6 +564,7 @@ func historyTaskDay() {
 									"repeat_reason": reason,
 									"repeat_id":     source.id,
 									"dataging":      0,
+									"history_updatetime":util.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -575,6 +584,7 @@ func historyTaskDay() {
 							map[string]interface{}{
 								"$set": map[string]interface{}{
 									"dataging": 0, //符合条件的都为dataging==0
+									"history_updatetime":util.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -593,7 +603,9 @@ func historyTaskDay() {
 
 		wg.Wait()
 
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
 
+		time.Sleep(30 * time.Second)
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
 		if n >= repeateN && gtid!=lteid{
 			for _, to := range nextNode {
@@ -618,7 +630,6 @@ func historyTaskDay() {
 
 		end:=time.Now().Unix()
 
-		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
 		log.Println(gtid,lteid)
 		if end-start<60*5 {
 			log.Println("睡眠.............")