Browse Source

任务池-判重

zhengkun 4 years ago
parent
commit
570cdb5268
1 changed files with 125 additions and 71 deletions
  1. 125 71
      udpfilterdup/src/main.go

+ 125 - 71
udpfilterdup/src/main.go

@@ -22,17 +22,17 @@ import (
 
 
 var (
-	Sysconfig    map[string]interface{} //配置文件
-	mconf        map[string]interface{} //mongodb配置信息
-	mgo          *MongodbSim            //mongodb操作对象
-	task_mgo     *MongodbSim            //mongodb操作对象
+	Sysconfig    map[string]interface{} 	//配置文件
+	mconf        map[string]interface{} 	//mongodb配置信息
+	mgo          *MongodbSim            	//mongodb操作对象
+	task_mgo     *MongodbSim            	//mongodb操作对象
 	task_collName	string
 	extract      string
 	extract_back string
-	udpclient    mu.UdpClient             //udp对象
-	nextNode     []map[string]interface{} //下节点数组
-	dupdays      = 7                      //初始化判重范围
-	DM           *datamap                 //
+	udpclient    mu.UdpClient             	//udp对象
+	nextNode     []map[string]interface{} 	//下节点数组
+	dupdays      = 7                      	//初始化判重范围
+	DM           *datamap                 	//
 	Update		 *updateInfo
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
@@ -47,15 +47,17 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	gtid,lastid,gtept,ltept string			//命令输入
-	lteid	string							//历史增量属性
-	IsFull		   bool								//是否全量
-	updatelock 		sync.Mutex         //锁4
-	userName,passWord 	string				//mongo -用户密码
-
+	gtid,lastid,gtept,ltept string					 //命令输入
+	lteid	string									 //历史增量属性
+	IsFull		   bool								 //是否全量
+	updatelock 		sync.Mutex         				 //锁4
+	userName,passWord 	string						 //mongo -用户密码
+	taskList		[]map[string]interface{}		 //任务池
+	isRunningRepeat	bool
 )
-var udptask chan struct{} = make(chan struct{}, 1)
 
+//udp通道
+var udptask chan struct{} = make(chan struct{}, 1)
 
 
 func init() {
@@ -136,8 +138,6 @@ func init() {
 	}
 	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
 }
-
-
 func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
@@ -161,16 +161,16 @@ func main() {
 			mapinfo["gtid"] = sid
 			mapinfo["lteid"] = eid
 			mapinfo["stop"] = "true"
-			task([]byte{}, mapinfo)
+			taskRepeat(mapinfo)
 			time.Sleep(99999 * time.Hour)
 		}else {
 			//正常增量
-			log.Println("正常增量部署")
+			log.Println("正常增量部署,监听任务")
+			go getRepeatTask()
 		}
 	}
 	time.Sleep(99999 * time.Hour)
 }
-
 //测试组人员使用
 func mainT() {
 	if TimingTask {
@@ -190,66 +190,121 @@ func mainT() {
 		mapinfo["stop"] = "true"
 
 		log.Println("测试:全量判重-准备开始")
-		task([]byte{}, mapinfo)
+		taskRepeat(mapinfo)
 		
 		time.Sleep(99999 * time.Hour)
 	}
 }
-//upd接收
+//udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	select {
-	case udptask <- struct{}{}:
-		log.Println("...接收段落,通道正常...")
-		switch act {
-		case mu.OP_TYPE_DATA: //上个节点的数据
-			var mapInfo map[string]interface{}
-			err := json.Unmarshal(data, &mapInfo)
-			if err != nil {
-				log.Println("error data:", err)
-				udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
-			} else if mapInfo != nil {
-				key, _ := mapInfo["key"].(string)
-				if key == "" {
-					key = "udpok"
-				}
-				log.Println("当前段落,需要判重...",mapInfo)
-				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-				task(data, mapInfo)
-			}
-			log.Println("此段任务结束...",err,mapInfo)
-			<-udptask
-		case mu.OP_NOOP: //下个节点回应
-			ok := string(data)
-			if ok != "" {
-				log.Println("下节点回应-ok:", ok)
-				udptaskmap.Delete(ok)
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
 			}
-			<-udptask
+			udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+
+			//插入任务-判断任务-是否存在
+			updatelock.Lock()
+			taskList = append(taskList,mapInfo)
+			log.Println("udp收到任务...数量:",len(taskList))
+			updatelock.Unlock()
 		}
-	case <-time.After(2 * time.Second):
-		switch act {
-		case mu.OP_TYPE_DATA: //上个节点的数据
-			log.Println("通道堵塞中...上节点")
-			udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
-		case mu.OP_NOOP: //下个节点回应
-			log.Println("通道堵塞中...下节点")
-			ok := string(data)
-			if ok != "" {
-				log.Println("下节点回应-ok:", ok)
-				udptaskmap.Delete(ok)
-			}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Println("ok:", ok)
+			udptaskmap.Delete(ok)
 		}
 	}
+}
+
 
-	//udptask <- struct{}{}
-	//defer func() {
-	//	<-udptask
-	//}()
+//upd接收
+//func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+//	select {
+//	case udptask <- struct{}{}:
+//		log.Println("...接收段落,通道正常...")
+//		switch act {
+//		case mu.OP_TYPE_DATA: //上个节点的数据
+//			var mapInfo map[string]interface{}
+//			err := json.Unmarshal(data, &mapInfo)
+//			if err != nil {
+//				log.Println("error data:", err)
+//				udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+//			} else if mapInfo != nil {
+//				key, _ := mapInfo["key"].(string)
+//				if key == "" {
+//					key = "udpok"
+//				}
+//				log.Println("当前段落,需要判重...",mapInfo)
+//				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+//				task(data, mapInfo)
+//			}
+//			log.Println("此段任务结束...",err,mapInfo)
+//			<-udptask
+//		case mu.OP_NOOP: //下个节点回应
+//			ok := string(data)
+//			if ok != "" {
+//				log.Println("下节点回应-ok:", ok)
+//				udptaskmap.Delete(ok)
+//			}
+//			<-udptask
+//		}
+//	case <-time.After(2 * time.Second):
+//		switch act {
+//		case mu.OP_TYPE_DATA: //上个节点的数据
+//			log.Println("通道堵塞中...上节点")
+//			udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
+//		case mu.OP_NOOP: //下个节点回应
+//			log.Println("通道堵塞中...下节点")
+//			ok := string(data)
+//			if ok != "" {
+//				log.Println("下节点回应-ok:", ok)
+//				udptaskmap.Delete(ok)
+//			}
+//		}
+//	}
+//
+//	//udptask <- struct{}{}
+//	//defer func() {
+//	//	<-udptask
+//	//}()
+//}
+
+//监听-获取-分发判重任务
+func getRepeatTask()  {
+	for  {
+		if len(taskList)>0 {
+			updatelock.Lock()
+			log.Println("准备执行判重任务...")
+			mapInfo := taskList[0]
+			if mapInfo != nil  {
+				taskRepeat(mapInfo) //判重方法
+			}
+			taskList = taskList[1:]
+			log.Println("当前任务池...",len(taskList),taskList)
+			updatelock.Unlock()
+		}else {
+			log.Println("无任务...睡眠15s")
+			time.Sleep(15 * time.Second)
+		}
+	}
 }
 
+
+
+
+
+
 //开始判重程序
-func task(data []byte, mapInfo map[string]interface{}) {
-	log.Println("开始数据判重")
+func taskRepeat(mapInfo map[string]interface{}) {
 	defer util.Catch()
 	//区间id
 	q := map[string]interface{}{
@@ -269,10 +324,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
-
-
-	log.Println("查询条件:",mgo.DbName, extract, q)
-
+	log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
@@ -342,6 +394,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
+
 	log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 	//log.Println("当前数据池的数量:",DM.currentTotalCount())
 	//睡眠时间30s 目的是让数据池更新所有数据...
@@ -370,6 +423,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 		}
 	}
+
 }
 
 func updateOcrFileData(cur_lteid string)  {