فهرست منبع

更新 抽取控制台

zhengkun 2 سال پیش
والد
کامیت
e1e681f99a
6فایلهای تغییر یافته به همراه162 افزوده شده و 166 حذف شده
  1. 4 8
      src/jy/extract/extractudp.go
  2. 35 32
      udpcontrol/src/initdata.go
  3. 3 9
      udpcontrol/src/main.go
  4. 108 112
      udpcontrol/src/method.go
  5. 11 4
      udpcontrol/src/updprocess.go
  6. 1 1
      udps/main.go

+ 4 - 8
src/jy/extract/extractudp.go

@@ -89,10 +89,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid)
 					}
 
-					//适配重采抽取-发送udp-必须替换
-					//go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
-
-					//发布数据~测试流程
+					//发布数据~重采数据~测试流程
 					//key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
 					//go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 					//
@@ -104,16 +101,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					//		"lteid": eid,
 					//		"stype": qu.ObjToString(m["stype"]),
 					//	})
-					//	err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+					//	err_udp := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
 					//		IP:   net.ParseIP(m["addr"].(string)),
 					//		Port: qu.IntAll(m["port"]),
 					//	})
-					//	if err != nil {
-					//		log.Debug(err)
+					//	if err_udp != nil {
+					//		log.Debug(err_udp)
 					//	}
 					//}
 					//log.Debug("udp通知抽取完成,eid=", eid)
-
 				}
 			}
 		}

+ 35 - 32
udpcontrol/src/initdata.go

@@ -7,37 +7,40 @@ import (
 )
 
 var (
-	sysconfig    			map[string]interface{} 		//配置文件
-	data_mgo,source_mgo     *MongodbSim
-	data_c_name,source_c_name	string
-	using_machine			int
-	lastNodeResponse		int64
+	sysconfig                  map[string]interface{} //配置文件
+	data_mgo, source_mgo       *MongodbSim
+	data_c_name, source_c_name string
+	using_machine              int
+	lastNodeResponse           int64
+	taskList                   []map[string]interface{}
 )
-func initMgo()  {
+
+func initMgo() {
 	sourceconf := sysconfig["source_mgodb"].(map[string]interface{})
-	source_c_name = qu.ObjToString(sourceconf["coll"])//数据源bidding
+	source_c_name = qu.ObjToString(sourceconf["coll"]) //数据源bidding
 	source_mgo = &MongodbSim{
 		MongodbAddr: sourceconf["addr"].(string),
 		DbName:      sourceconf["db"].(string),
 		Size:        3,
-		UserName: "zhengkun",
-		Password: "zk@123123",
+		UserName:    "zhengkun",
+		Password:    "zk@123123",
 	}
 	source_mgo.InitPool()
 
 	dataconf := sysconfig["data_mgodb"].(map[string]interface{})
-	data_c_name = qu.ObjToString(dataconf["coll"])  //机器源center
+	data_c_name = qu.ObjToString(dataconf["coll"]) //机器源center
 	data_mgo = &MongodbSim{
 		MongodbAddr: dataconf["addr"].(string),
 		DbName:      dataconf["db"].(string),
 		Size:        3,
-		UserName: "zhengkun",
-		Password: "zk@123123",
+		UserName:    "zhengkun",
+		Password:    "zk@123123",
 	}
 	data_mgo.InitPool()
 
+	taskList = []map[string]interface{}{}
 }
-func initVarData()  {
+func initVarData() {
 	qu.ReadConfig(&sysconfig)
 	initMgo()
 	using_machine = qu.IntAll(sysconfig["using_machine"])
@@ -46,21 +49,21 @@ func initVarData()  {
 }
 
 //加载抽取
-func initExtractNode()  {
+func initExtractNode() {
 	resetExtNodeArr() //重置抽取节点数组
 	sess := data_mgo.GetMgoConn()
 	defer data_mgo.DestoryMongoConn(sess)
 	q := map[string]interface{}{}
 	it := sess.DB(data_mgo.DbName).C(data_c_name).Find(&q).Iter()
-	for tmp := make(map[string]interface{}); it.Next(&tmp);{
+	for tmp := make(map[string]interface{}); it.Next(&tmp); {
 		isuse := qu.IntAll(tmp["isuse"])
 		if isuse == 0 {
-			invalid_ext_node = append(invalid_ext_node,tmp)
-		}else if isuse == 1 {
-			using_ext_node = append(using_ext_node,tmp)
-		}else if isuse == 2 {
-			standby_ext_node = append(standby_ext_node,tmp)
-		}else {
+			invalid_ext_node = append(invalid_ext_node, tmp)
+		} else if isuse == 1 {
+			using_ext_node = append(using_ext_node, tmp)
+		} else if isuse == 2 {
+			standby_ext_node = append(standby_ext_node, tmp)
+		} else {
 
 		}
 		tmp = make(map[string]interface{})
@@ -68,31 +71,31 @@ func initExtractNode()  {
 	//根据实际情况~把备用节点~与正常节点综合一下
 	for { //可用数量-可变
 		if len(using_ext_node) < using_machine {
-			if len(standby_ext_node)==0 {
+			if len(standby_ext_node) == 0 {
 				break
 			}
 			tmp_node := standby_ext_node[0]
-			using_ext_node = append(using_ext_node,tmp_node)
+			using_ext_node = append(using_ext_node, tmp_node)
 			standby_ext_node = standby_ext_node[1:]
-		}else {
+		} else {
 			break
 		}
 	}
-	if len(using_ext_node)<=0 {
-		sendErrMailApi("抽取控制中心~严重错误","当前无可用机器")
-	}else if len(using_ext_node)<using_machine { //不足预设-通知
-		sendErrMailApi("抽取控制中心~警告","当前可用机器不足预设~请检查")
-	}else {
+	if len(using_ext_node) <= 0 {
+		sendErrMailApi("抽取控制中心~严重错误", "当前无可用机器")
+	} else if len(using_ext_node) < using_machine { //不足预设-通知
+		sendErrMailApi("抽取控制中心~警告", "当前可用机器不足预设~请检查")
+	} else {
 
 	}
 
-	log.Debug("综合后节点~有效~备用~无效",len(using_ext_node),len(standby_ext_node),len(invalid_ext_node))
+	log.Debug("综合后节点~有效~备用~无效", len(using_ext_node), len(standby_ext_node), len(invalid_ext_node))
 }
 
-
 //重置抽取
-func resetExtNodeArr () {
+func resetExtNodeArr() {
 	isAction = false
+	isGetask = false
 	using_ext_node = []map[string]interface{}{}
 	standby_ext_node = []map[string]interface{}{}
 	invalid_ext_node = []map[string]interface{}{}

+ 3 - 9
udpcontrol/src/main.go

@@ -6,26 +6,20 @@ import (
 )
 
 func init() {
-	initVarData() 	//初始化属性
+	initVarData() //初始化属性
 	updport := sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Debug("Udp服务监听", updport)
 }
 
-func main()  {
+func main() {
 	//各种监控等
 	go extractRunningMonitoring()
 	go lastUdpMonitoring()
 	go nextUdpMonitoring()
+	go getRepeatTask()
 
 	lock := make(chan bool)
 	<-lock
 }
-
-
-
-
-
-
-

+ 108 - 112
udpcontrol/src/method.go

@@ -11,52 +11,77 @@ import (
 	"time"
 )
 
-var methodlock 		sync.Mutex
-var responselock 	sync.Mutex
+var methodlock sync.Mutex
+var responselock sync.Mutex
+var getasklock sync.Mutex
 
 //邮件下节点响应
 var udptaskmap = &sync.Map{}
+
 type udpNode struct {
 	data      []byte
 	addr      *net.UDPAddr
 	timestamp int64
 }
 
+//监听-获取-分发抽取任务
+func getRepeatTask() {
+	for {
+		if len(taskList) > 0 && !isGetask {
+			getasklock.Lock()
+			isGetask = true
+			mapInfo := taskList[0]
+			if mapInfo != nil {
+				taskList = taskList[1:]
+				log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
+				sid := qu.ObjToString(mapInfo["sid"])
+				eid := qu.ObjToString(mapInfo["eid"])
+				dealWithExtUdpData(sid, eid)
+			} else {
+				sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...")
+				isGetask = false
+			}
+			getasklock.Unlock()
+		} else {
+			time.Sleep(15 * time.Second)
+		}
+	}
+}
 
 //监控~当前抽取段~状态  生命周期
-func extractRunningMonitoring()  {
-	for  {
+func extractRunningMonitoring() {
+	for {
 		if isAction {
 			time_now := time.Now().Unix()
 			isErr := false
 			methodlock.Lock()
-			for k,v := range extractAction {
-				if k=="extract_ids" {
+			for k, v := range extractAction {
+				if k == "extract_ids" {
 					continue
 				}
 				//抽取行为完成~状态
-				action:=qu.IntAll(v["action"])
-				if action==1 {
+				action := qu.IntAll(v["action"])
+				if action == 1 {
 					continue
 				}
 
 				//心跳监测~回应
-				keyArr := strings.Split(k,":")
-				if len(keyArr)==3 {
+				keyArr := strings.Split(k, ":")
+				if len(keyArr) == 3 {
 					by, _ := json.Marshal(map[string]interface{}{
 						"stype": "heart_extract",
-						"skey" :"heart_extract"+k,
+						"skey":  "heart_extract" + k,
 					})
-					sendSingleOtherNode(by,keyArr[0],keyArr[1])
+					sendSingleOtherNode(by, keyArr[0], keyArr[1])
 					heart_num := qu.IntAll(heartAction[k])
-					heartAction[k] = heart_num+1
+					heartAction[k] = heart_num + 1
 				}
-				life:=qu.Int64All(v["life"])
-				if time_now>life || qu.IntAll(heartAction[k]) > 10 {
+				life := qu.Int64All(v["life"])
+				if time_now > life || qu.IntAll(heartAction[k]) > 10 {
 					isErr = true //超时~无响应~认为机器异常
-					data_mgo.UpdateById(data_c_name,qu.ObjToString(v["uid"]), map[string]interface{}{
+					data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
 						"$set": map[string]interface{}{
-							"isuse":0,
+							"isuse": 0,
 						},
 					})
 				}
@@ -64,39 +89,38 @@ func extractRunningMonitoring()  {
 			}
 			methodlock.Unlock()
 			if isErr {
-				sid:= qu.ObjToString(extractAction["extract_ids"]["sid"])
-				eid:= qu.ObjToString(extractAction["extract_ids"]["eid"])
+				sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
+				eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
 				isAction = false
 				sendStopExtractNode(using_ext_node) //停止
-				if len(standby_ext_node)==0 {
-					sendErrMailApi("抽取控制中心~异常",fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s",sid,eid))
-					time.Sleep(15*time.Second)
-					sendNextNode(sid,eid)
-				}else {
-					sendErrMailApi("抽取控制中心~异常",fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s",sid,eid))
-					time.Sleep(15*time.Second)
-					dealWithExtUdpData(sid,eid)
+				if len(standby_ext_node) == 0 {
+					sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
+					time.Sleep(15 * time.Second)
+					sendNextNode(sid, eid)
+				} else {
+					sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
+					time.Sleep(15 * time.Second)
+					dealWithExtUdpData(sid, eid)
 				}
 			}
 		}
-		time.Sleep(15*time.Second)
+		time.Sleep(15 * time.Second)
 	}
 }
 
 //监控~上节点~长时间未响应
-func lastUdpMonitoring()  {
-	for  {
+func lastUdpMonitoring() {
+	for {
 		responselock.Lock()
-		if !isAction && time.Now().Unix()-lastNodeResponse > 1800 {
-			sendErrMailApi("抽取控制中心~流程超时~告警",fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
+		if !isAction && time.Now().Unix()-lastNodeResponse >= 1800 {
+			sendErrMailApi("抽取控制中心~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
 			lastNodeResponse = time.Now().Unix() //重置时间
 		}
 		responselock.Unlock()
-		time.Sleep(600*time.Second)
+		time.Sleep(600 * time.Second)
 	}
 }
 
-
 //监控~
 func nextUdpMonitoring() {
 	for {
@@ -105,7 +129,7 @@ func nextUdpMonitoring() {
 			node, _ := v.(*udpNode)
 			if now-node.timestamp > 120 {
 				udptaskmap.Delete(k)
-				sendErrMailApi("抽取控制中心~下节点未响应~警告",fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
+				sendErrMailApi("抽取控制中心~下节点未响应~警告", fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
 			}
 			return true
 		})
@@ -113,85 +137,78 @@ func nextUdpMonitoring() {
 	}
 }
 
-
-
-
-
-
-
-
-
 //验证抽取是否完毕	不验证-extract_ids~key
-func validExtractFinish() bool  {
-	for k,v :=range extractAction{
-		if k=="extract_ids" {
+func validExtractFinish() bool {
+	for k, v := range extractAction {
+		if k == "extract_ids" {
 			continue
 		}
-		if qu.Int64All(v["action"])==0 {
+		if qu.Int64All(v["action"]) == 0 {
 			return false
 		}
 	}
 	return true
 }
+
 //拆分ID段方法
-func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
-	dataArr := make([]map[string]interface{},0)
-	lifeArr := make([]int64,0)
-	if sid=="" || eid=="" || len(using_ext_node)==0 {
-		return dataArr,lifeArr
+func splitIdMethod(sid string, eid string) ([]map[string]interface{}, []int64) {
+	dataArr := make([]map[string]interface{}, 0)
+	lifeArr := make([]int64, 0)
+	if sid == "" || eid == "" || len(using_ext_node) == 0 {
+		return dataArr, lifeArr
 	}
 	sess := source_mgo.GetMgoConn()
 	defer source_mgo.DestoryMongoConn(sess)
-	q ,total := map[string]interface{}{
+	q, total := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt":  StringTOBsonId(sid),
 			"$lte": StringTOBsonId(eid),
 		},
-	},int64(0)
-	count,_ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count()
-	log.Debug("查询当前数量:",count)
-	if len(using_ext_node)==1 {
+	}, int64(0)
+	count, _ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count()
+	log.Debug("查询当前数量:", count)
+	if len(using_ext_node) == 1 {
 		dataArr = append(dataArr, map[string]interface{}{
-			"sid":sid,
-			"eid":eid,
+			"sid": sid,
+			"eid": eid,
 		})
-		lifeArr = append(lifeArr,calculateLiftime(count))
+		lifeArr = append(lifeArr, calculateLiftime(count))
 	} else {
 		node_num := int64(len(using_ext_node))
-		if count<node_num{ //采用一个节点-多余临时删除
+		if count < node_num { //采用一个节点-多余临时删除
 			log.Debug("数量过少~采用一个节点")
 			tmp_node := using_ext_node[0]
 			using_ext_node = []map[string]interface{}{}
-			using_ext_node = append(using_ext_node,tmp_node)
+			using_ext_node = append(using_ext_node, tmp_node)
 			dataArr = append(dataArr, map[string]interface{}{
-				"sid":sid,
-				"eid":eid,
+				"sid": sid,
+				"eid": eid,
 			})
-			lifeArr = append(lifeArr,calculateLiftime(count))
-		}else {
-			limit := count/node_num
+			lifeArr = append(lifeArr, calculateLiftime(count))
+		} else {
+			limit := count / node_num
 			limit_lifetime := calculateLiftime(limit)
-			tmp_sid:=sid
+			tmp_sid := sid
 			it := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Sort("_id").Select(map[string]interface{}{
-				"_id":1,
+				"_id": 1,
 			}).Iter()
-			for tmp := make(map[string]interface{}); it.Next(&tmp);{
+			for tmp := make(map[string]interface{}); it.Next(&tmp); {
 				total++
-				if total%limit==0 {
-					if total/limit==node_num {
+				if total%limit == 0 {
+					if total/limit == node_num {
 						dataArr = append(dataArr, map[string]interface{}{
-							"sid":tmp_sid,
-							"eid":eid,
+							"sid": tmp_sid,
+							"eid": eid,
 						})
-						lifeArr = append(lifeArr,limit_lifetime)
+						lifeArr = append(lifeArr, limit_lifetime)
 						break
-					}else {
+					} else {
 						dataArr = append(dataArr, map[string]interface{}{
-							"sid":tmp_sid,
-							"eid":BsonTOStringId(tmp["_id"]),
+							"sid": tmp_sid,
+							"eid": BsonTOStringId(tmp["_id"]),
 						})
 						tmp_sid = BsonTOStringId(tmp["_id"])
-						lifeArr = append(lifeArr,limit_lifetime)
+						lifeArr = append(lifeArr, limit_lifetime)
 					}
 				}
 				tmp = make(map[string]interface{})
@@ -199,53 +216,32 @@ func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
 		}
 	}
 
-	if len(dataArr)!=len(using_ext_node) || len(dataArr)!=len(lifeArr) {
+	if len(dataArr) != len(using_ext_node) || len(dataArr) != len(lifeArr) {
 		log.Debug("划分段落异常~请检查~只能采用唯一节点~")
 		tmp_node := using_ext_node[0]
 		using_ext_node = []map[string]interface{}{}
-		using_ext_node = append(using_ext_node,tmp_node)
+		using_ext_node = append(using_ext_node, tmp_node)
 		dataArr = []map[string]interface{}{}
 		lifeArr = []int64{}
 		dataArr = append(dataArr, map[string]interface{}{
-			"sid":sid,
-			"eid":eid,
+			"sid": sid,
+			"eid": eid,
 		})
-		lifeArr = append(lifeArr,calculateLiftime(count))
+		lifeArr = append(lifeArr, calculateLiftime(count))
 	}
-	return dataArr,lifeArr
+	return dataArr, lifeArr
 }
+
 //计算生命周期
 func calculateLiftime(count int64) int64 {
-	time_one := 1500.0/1000.0//暂定~每千条用时1000秒
-	life_time := int64(time_one*float64(count)*3.0)
-	if life_time<2400 {
+	time_one := 1500.0 / 1000.0 //暂定~每千条用时1000秒
+	life_time := int64(time_one * float64(count) * 3.0)
+	if life_time < 2400 {
 		life_time = 2400
 	}
-	return time.Now().Unix()+life_time
+	return time.Now().Unix() + life_time
 }
 
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
 //暂时弃用
 //func sqlitID(){
 //	if len(using_ext_node)==1 {
@@ -287,4 +283,4 @@ func calculateLiftime(count int64) int64 {
 //func hex2Dec(val string)int64{
 //	n,_ := strconv.ParseInt(val,16,32)
 //	return n
-//}
+//}

+ 11 - 4
udpcontrol/src/updprocess.go

@@ -19,6 +19,7 @@ var (
 	extractAction                                      map[string]map[string]interface{}
 	heartAction                                        map[string]interface{}
 	isAction                                           bool
+	isGetask                                           bool
 	using_ext_node, standby_ext_node, invalid_ext_node []map[string]interface{}
 )
 
@@ -38,8 +39,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				lastNodeResponse = time.Now().Unix()
 				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
 				go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+
+				//插入任务
 				udplock.Lock()
-				dealWithExtUdpData(sid, eid)
+				taskList = append(taskList, map[string]interface{}{
+					"sid": sid,
+					"eid": eid,
+				})
+				log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
 				udplock.Unlock()
 			}
 		}
@@ -65,7 +72,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 func dealWithExtUdpData(sid, eid string) {
 	//获取最新-抽取节点状态
 	initExtractNode()
-	log.Debug("接收当前段落,udp通知抽取-需拆分", len(using_ext_node), "组", sid, "~~", eid)
+	log.Debug("处理当前段落~~~需拆分", len(using_ext_node), "组", sid, "~~", eid)
 	if len(using_ext_node) > 0 {
 		//拆分段落方法~并附加抽取状态标记~有效期等
 		splitArr, lifeArr := splitIdMethod(sid, eid)
@@ -103,8 +110,6 @@ func dealWithCallBackUdpData(str string) {
 			isAction = false
 			lastNodeResponse = time.Now().Unix()
 			sendNextNode(sid, eid)
-			//更新id段记录状态
-
 		}
 	} else {
 		log.Debug("其他节点回应:", str)
@@ -172,7 +177,9 @@ func sendNextNode(sid string, eid string) {
 		}
 	}
 	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
+
 	//更新记录状态
+	isGetask = false
 	updateProcessUdpIdsInfo(sid, eid)
 }
 

+ 1 - 1
udps/main.go

@@ -19,7 +19,7 @@ func main() {
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.IntVar(&p, "p", 6601, "端口")
+	flag.IntVar(&p, "p", 11109, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")