Selaa lähdekoodia

调整~更新记录表

zhengkun 2 vuotta sitten
vanhempi
commit
ba3967efa2
1 muutettua tiedostoa jossa 68 lisäystä ja 36 poistoa
  1. 68 36
      src/main.go

+ 68 - 36
src/main.go

@@ -34,17 +34,18 @@ var (
 	bid_mgo                               *MongodbSim              //mongodb操作对象
 	udpclient                             mu.UdpClient             //udp对象
 	nextNode                              []map[string]interface{} //节点信息
-	coll_name, qy_coll_name, jy_coll_name string                   //表名
-	check_lock                            sync.Mutex               //更新锁
-	check_thread                          int                      //线程数
-	UpdateTask                            *updateInfo              //更新池
-
-	ProvinceDict map[string][]Province //省份-map
-	CityDict     map[string][]City     //城市-map
-	DistrictDict map[string][]District //区县-map
+	coll_name, qy_coll_name, jy_coll_name string
+	check_lock                            sync.Mutex            //更新锁
+	check_thread                          int                   //线程数
+	UpdateTask                            *updateInfo           //更新池
+	ProvinceDict                          map[string][]Province //省份-map
+	CityDict                              map[string][]City     //城市-map
+	DistrictDict                          map[string][]District //区县-map
 
 	//删除字段
-	unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
+	unset_dict          = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
+	udplock, getasklock sync.Mutex
+	taskList            []map[string]interface{}
 )
 
 //初始化城市
@@ -170,7 +171,11 @@ func main() {
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
-	time.Sleep(99999 * time.Hour)
+
+	go getRepeatTask()
+
+	lock := make(chan bool)
+	<-lock
 }
 
 //临时校验
@@ -183,7 +188,6 @@ func mainT() {
 
 //开始审查数据
 func startCheckData(sid, eid string) {
-	log.Println("开始审查数据...")
 	defer qu.Catch()
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -191,11 +195,8 @@ func startCheckData(sid, eid string) {
 			"$lte": StringTOBsonId(eid),
 		},
 	}
-	log.Println("查询条件:", q)
-
 	check_pool := make(chan bool, check_thread)
 	check_wg := &sync.WaitGroup{}
-
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
 	it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
@@ -244,7 +245,9 @@ func startCheckData(sid, eid string) {
 	}
 	check_wg.Wait()
 
-	log.Println("check is over - 总计数量", total, isRepair)
+	log.Println("data_clean is over ", total, "~", isRepair)
+
+	sendNextNode(sid, eid)
 }
 
 //udp监听
@@ -263,27 +266,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				return
 			} else {
 				go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
-				log.Println("udp通知id段-审查数据", sid, "~", eid)
-				startCheckData(sid, eid)
-
-				log.Println("udp通知审查数据完成,下节点响应")
-				for _, m := range nextNode {
-					by, _ := json.Marshal(map[string]interface{}{
-						"gtid":  sid,
-						"lteid": eid,
-						"stype": qu.ObjToString(m["stype"]),
-					})
-					new_err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-						IP:   net.ParseIP(m["addr"].(string)),
-						Port: qu.IntAll(m["port"]),
-					})
-					if new_err != nil {
-						log.Println(err)
-					}
-				}
-
-				//更新记录状态
-				updateProcessUdpIdsInfo(sid, eid)
+				//插入任务
+				udplock.Lock()
+				taskList = append(taskList, map[string]interface{}{
+					"sid": sid,
+					"eid": eid,
+				})
+				log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
+				udplock.Unlock()
 
 			}
 		}
@@ -292,6 +282,28 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
+//发送下阶段节点~
+func sendNextNode(sid string, eid string) {
+	//更新记录状态
+	updateProcessUdpIdsInfo(sid, eid)
+
+	for _, m := range nextNode {
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": qu.ObjToString(m["stype"]),
+		})
+		new_err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(m["addr"].(string)),
+			Port: qu.IntAll(m["port"]),
+		})
+		if new_err != nil {
+			log.Println(new_err)
+		}
+	}
+	log.Println("udp通知审查数据完成,通知下节点")
+}
+
 //更新流程记录id段落
 func updateProcessUdpIdsInfo(sid string, eid string) {
 	query := map[string]interface{}{
@@ -337,3 +349,23 @@ func httpDo(detail string) (e error) {
 	log.Println("put ", string(body))
 	return nil
 }
+
+//监听-获取-分发清洗任务
+func getRepeatTask() {
+	for {
+		if len(taskList) > 0 {
+			getasklock.Lock()
+			mapInfo := taskList[0]
+			if mapInfo != nil {
+				taskList = taskList[1:]
+				log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
+				sid := qu.ObjToString(mapInfo["sid"])
+				eid := qu.ObjToString(mapInfo["eid"])
+				startCheckData(sid, eid)
+			}
+			getasklock.Unlock()
+		} else {
+			time.Sleep(15 * time.Second)
+		}
+	}
+}