瀏覽代碼

控制中心调整~可变

zhengkun 3 年之前
父節點
當前提交
3a23e38c02

+ 1 - 1
src/config.json

@@ -34,7 +34,7 @@
     "pricenumber":true,
     "pricenumber":true,
     "udptaskid": "60b493c2e138234cb4adb640",
     "udptaskid": "60b493c2e138234cb4adb640",
     "nextNode": [],
     "nextNode": [],
-    "udpport": "6601",
+    "udpport": "1784",
     "esconfig": {
     "esconfig": {
         "available": false,
         "available": false,
         "AccessID": "",
         "AccessID": "",

+ 1 - 1
src/jy/extract/extractInit.go

@@ -1020,7 +1020,7 @@ func InitProvincesx() []map[string]interface{} {
 func InitSite() []map[string]interface{} {
 func InitSite() []map[string]interface{} {
 	defer qu.Catch()
 	defer qu.Catch()
 	query := map[string]interface{}{
 	query := map[string]interface{}{
-		"depttype": map[string]interface{}{
+		"site_type": map[string]interface{}{
 			"$ne": "代理机构",
 			"$ne": "代理机构",
 		},
 		},
 	}
 	}

+ 37 - 31
src/jy/extract/extractudp.go

@@ -18,11 +18,12 @@ import (
 
 
 var Udpclient mu.UdpClient //udp对象
 var Udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 var nextNodes []map[string]interface{}
-
+var IsExtStop bool
 //udp通知抽取
 //udp通知抽取
 func ExtractUdp() {
 func ExtractUdp() {
 	nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
 	nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
-	Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
+	//Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
+	Udpclient = mu.UdpClient{Local: ":6601", BufSize: 1024}
 	Udpclient.Listen(processUdpMsg)
 	Udpclient.Listen(processUdpMsg)
 }
 }
 
 
@@ -52,6 +53,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						},
 						},
 					}, true, false)
 					}, true, false)
 				log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
 				log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
+			}else if stype == "stop_extract"{
+				IsExtStop = true
+			}else if stype == "heart_extract"{
+				skey, _ := rep["skey"].(string)
+				Udpclient.WriteUdp([]byte(skey), mu.OP_NOOP, ra)
 			} else {
 			} else {
 				sid, _ := rep["gtid"].(string)
 				sid, _ := rep["gtid"].(string)
 				eid, _ := rep["lteid"].(string)
 				eid, _ := rep["lteid"].(string)
@@ -62,33 +68,35 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					if udpinfo == "" {
 					if udpinfo == "" {
 						udpinfo = "udpok"
 						udpinfo = "udpok"
 					}
 					}
-
+					IsExtStop = false
 					//新版本控制抽取
 					//新版本控制抽取
-					//ExtractByUdp(sid, eid, ra)
-					//log.Debug("抽取完成udp通知抽取id段-控制台",udpinfo, sid, "~", eid)
-					//Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
-
-
-
-					//适配重采抽取-发送udp-必须替换
-					go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
-					log.Debug("udp通知抽取id段", sid, " ", eid)
 					ExtractByUdp(sid, eid, ra)
 					ExtractByUdp(sid, eid, ra)
-					for _, m := range nextNodes {
-						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  sid,
-							"lteid": eid,
-							"stype": qu.ObjToString(m["stype"]),
-						})
-						err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP(m["addr"].(string)),
-							Port: qu.IntAll(m["port"]),
-						})
-						if err != nil {
-							log.Debug(err)
-						}
+					if !IsExtStop {
+						log.Debug("抽取完成udp通知抽取id段-控制台",udpinfo, sid, "~", eid)
+						Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
+					}else {
+						log.Debug("抽取强制中断udp不通知-控制台",udpinfo, sid, "~", eid)
 					}
 					}
-					log.Debug("udp通知抽取完成,eid=", eid)
+
+					//适配重采抽取-发送udp-必须替换
+					//go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
+					//log.Debug("udp通知抽取id段", sid, " ", eid)
+					//ExtractByUdp(sid, eid, ra)
+					//for _, m := range nextNodes {
+					//	by, _ := json.Marshal(map[string]interface{}{
+					//		"gtid":  sid,
+					//		"lteid": eid,
+					//		"stype": qu.ObjToString(m["stype"]),
+					//	})
+					//	err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+					//		IP:   net.ParseIP(m["addr"].(string)),
+					//		Port: qu.IntAll(m["port"]),
+					//	})
+					//	if err != nil {
+					//		log.Debug(err)
+					//	}
+					//}
+					//log.Debug("udp通知抽取完成,eid=", eid)
 				}
 				}
 			}
 			}
 		}
 		}
@@ -254,15 +262,13 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 			fmt.Printf("page=%d,query=%v\n", i+1, query)
 			fmt.Printf("page=%d,query=%v\n", i+1, query)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
 			for _, v := range *list {
 			for _, v := range *list {
-				//if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
-				//	log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
-				//	continue
-				//}
+				if IsExtStop {
+					break
+				}
 				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
 				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
 					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
 					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
 					continue
 					continue
 				}
 				}
-
 				_id := qu.BsonIdToSId(v["_id"])
 				_id := qu.BsonIdToSId(v["_id"])
 				var j, jf *ju.Job
 				var j, jf *ju.Job
 				var isSite bool
 				var isSite bool

+ 16 - 4
src/jy/extract/newextractcity.go

@@ -186,6 +186,7 @@ func (e *ExtractTask) NewExtractCity(j *ju.Job, resulttmp *map[string]interface{
 	}
 	}
 
 
 
 
+
 	//如果-仅有省份-敏感词-校验核对方法
 	//如果-仅有省份-敏感词-校验核对方法
 	if arearesult!="全国" && cityresult=="" {
 	if arearesult!="全国" && cityresult=="" {
 		sensitive_city := e.SensitiveCityData(qu.ObjToString((*j.Data)["detail"]),arearesult)
 		sensitive_city := e.SensitiveCityData(qu.ObjToString((*j.Data)["detail"]),arearesult)
@@ -455,7 +456,7 @@ func (e *ExtractTask) NewGetCityByOthers(j *ju.Job, sm *SortMap, pscore, cscore,
 		p_full, c_full, d_full, p_sim, c_sim, d_sim := "", "", "", "", "", "" //每个字段抽取的时候重新定义该字段抽取的province,city,district
 		p_full, c_full, d_full, p_sim, c_sim, d_sim := "", "", "", "", "", "" //每个字段抽取的时候重新定义该字段抽取的province,city,district
 		str, _ := sm.Map[from].(string)
 		str, _ := sm.Map[from].(string)
 		jbText := e.Seg_SV.Cut(str, true)
 		jbText := e.Seg_SV.Cut(str, true)
-		for _, text := range jbText {
+		for jb_index, text := range jbText {
 			if len([]rune(text)) == 1 {
 			if len([]rune(text)) == 1 {
 				continue
 				continue
 			}
 			}
@@ -602,7 +603,7 @@ func (e *ExtractTask) NewGetCityByOthers(j *ju.Job, sm *SortMap, pscore, cscore,
 										PCDScore(j, "city", c.Name, 2+ts, false)
 										PCDScore(j, "city", c.Name, 2+ts, false)
 									}
 									}
 									PCDScore(j, "province", tmpPbrief, 2+ts, false) //
 									PCDScore(j, "province", tmpPbrief, 2+ts, false) //
-								} else if p_sim == "" {
+								} else if p_sim == "" { //暂未匹配到省
 									if !repeatDb[dfull] {
 									if !repeatDb[dfull] {
 										PCDScoreByDistrictSim("d", dfull, 1+ts, pscore, cscore, dscore)
 										PCDScoreByDistrictSim("d", dfull, 1+ts, pscore, cscore, dscore)
 										repeatDb[dfull] = true
 										repeatDb[dfull] = true
@@ -617,6 +618,14 @@ func (e *ExtractTask) NewGetCityByOthers(j *ju.Job, sm *SortMap, pscore, cscore,
 										}
 										}
 										PCDScoreByDistrictSim("c", c.Name, 1+ts, pscore, cscore, dscore)
 										PCDScoreByDistrictSim("c", c.Name, 1+ts, pscore, cscore, dscore)
 									}
 									}
+
+									//新增~特殊组情况下~津市高新区管委会~切词首"津市"~均未匹配到情况下
+									if jb_index==0 && len(dfull_citys) == 1 && len(j.FullAreaScore)==0 && len(j.SimAreaScore)==0{
+										PCDScore(j, "district", dfull, 0, false)
+										PCDScore(j, "city", c.Name, 0, false)
+										PCDScore(j, "province", tmpPbrief, 0, false) //
+									}
+
 								} else if p_sim != "" && p_sim != tmpPbrief {
 								} else if p_sim != "" && p_sim != tmpPbrief {
 									if !repeatPb[tmpPbrief] {
 									if !repeatPb[tmpPbrief] {
 										PCDScoreByDistrictSim("p", tmpPbrief, ts, pscore, cscore, dscore)
 										PCDScoreByDistrictSim("p", tmpPbrief, ts, pscore, cscore, dscore)
@@ -980,8 +989,11 @@ func(e *ExtractTask) CheckingXjbtCity(buyer string) (new_a,new_c,new_d string,ok
 
 
 //敏感词识别~~~
 //敏感词识别~~~
 func(e *ExtractTask) SensitiveCityData(detail string, area string) string{
 func(e *ExtractTask) SensitiveCityData(detail string, area string) string{
-	//全程组
+	//采用正文
 	detail = sensitiveReg.ReplaceAllString(detail,"")
 	detail = sensitiveReg.ReplaceAllString(detail,"")
+	//删除表格相关-文本
+	detail = TextAfterRemoveTable(detail)
+
 	sim_arr := e.SensitiveSimCity.FindAll(detail)
 	sim_arr := e.SensitiveSimCity.FindAll(detail)
 	full_arr := e.SensitiveFullCity.FindAll(detail)
 	full_arr := e.SensitiveFullCity.FindAll(detail)
 	if len(full_arr)<3 {
 	if len(full_arr)<3 {
@@ -996,7 +1008,7 @@ func(e *ExtractTask) SensitiveCityData(detail string, area string) string{
 	if len(sim_arr)<3 {
 	if len(sim_arr)<3 {
 		for _,v := range sim_arr{
 		for _,v := range sim_arr{
 			if cityMap := e.CityBriefMap[v]; cityMap != nil {
 			if cityMap := e.CityBriefMap[v]; cityMap != nil {
-				if cityMap.P.Brief==area {
+				if cityMap.P.Brief==area && !strings.Contains(area,v) {
 					return cityMap.Name
 					return cityMap.Name
 				}
 				}
 			}
 			}

+ 12 - 11
src/main.go

@@ -19,9 +19,10 @@ import (
 )
 )
 
 
 func init() {
 func init() {
-	log.SetConsole(false)
-	log.SetLevel(log.DEBUG)
-	log.SetRollingDaily("./", "out.log")
+	//log.SetConsole(false)
+	//log.SetLevel(log.DEBUG)
+	//log.SetRollingDaily("./", "out.log")
+
 	qu.ReadConfig(&u.Config)
 	qu.ReadConfig(&u.Config)
 	//抽取price和number相关
 	//抽取price和number相关
 	qu.ReadConfig("./res/pricenumber.json", &u.PriceNumberConfig)
 	qu.ReadConfig("./res/pricenumber.json", &u.PriceNumberConfig)
@@ -57,23 +58,23 @@ func init() {
 
 
 func main() {
 func main() {
 	extract.ExtractUdp() 		//udp通知抽取
 	extract.ExtractUdp() 		//udp通知抽取
-	//extract.ClearUdp()   		//udp通知清理
-	go extract.Export()
-	//go heart.HeartMonitor()	//心跳监测
+	//go extract.Export()			//导出任务
+	//extract.ClearUdp()   			//udp通知清理
+	//go heart.HeartMonitor()		//心跳监测
+
 	go Router.Run(":" + qu.ObjToString(u.Config["port"]))
 	go Router.Run(":" + qu.ObjToString(u.Config["port"]))
 	go log.Debug("启动..", qu.ObjToString(u.Config["port"]))
 	go log.Debug("启动..", qu.ObjToString(u.Config["port"]))
-
 	go func() {
 	go func() {
 		http.ListenAndServe("localhost:10000", nil)
 		http.ListenAndServe("localhost:10000", nil)
 	}()
 	}()
-
-	//临时调试
-	//testMain()
-
 	lock := make(chan bool)
 	lock := make(chan bool)
 	<-lock
 	<-lock
 }
 }
 
 
+
+
+
+
 //验证规则
 //验证规则
 func testMain()  {
 func testMain()  {
 
 

+ 7 - 24
udpcontrol/src/config.json

@@ -4,30 +4,13 @@
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
         "api": "http://172.17.145.179:19281/_send/_mail"
         "api": "http://172.17.145.179:19281/_send/_mail"
     },
     },
-    "extractNode": [
-        {
-            "addr": "127.0.0.1",
-            "port": 6601,
-            "stype": "extract_1"
-        },
-        {
-            "addr": "127.0.0.1",
-            "port": 6602,
-            "stype": "extract_2"
-        }
-    ],
+    "data_mgodb": {
+        "addr": "127.0.0.1:27017",
+        "db": "zhengkun",
+        "coll": "extract_control_center",
+        "s_coll": "zktest_city_field_test"
+    },
     "nextNode": [
     "nextNode": [
-        {
-            "addr": "127.0.0.1",
-            "port": 1799,
-            "stype": "",
-            "memo": "生城市"
-        },
-        {
-            "addr": "127.0.0.1",
-            "port": 1762,
-            "stype": "",
-            "memo": "敏感词清理"
-        }
+
     ]
     ]
 }
 }

+ 84 - 0
udpcontrol/src/initdata.go

@@ -0,0 +1,84 @@
+package main
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+)
+
+var (
+	sysconfig    	map[string]interface{} 		//配置文件
+	data_mgo        *MongodbSim
+	data_c_name,data_s_name		string
+)
+func initMgo()  {
+	dataconf := sysconfig["data_mgodb"].(map[string]interface{})
+	data_c_name = qu.ObjToString(dataconf["coll"])  //机器源center
+	data_s_name = qu.ObjToString(dataconf["s_coll"])//数据源bidding
+	data_mgo = &MongodbSim{
+		MongodbAddr: dataconf["addr"].(string),
+		DbName:      dataconf["db"].(string),
+		Size:        5,
+	}
+	data_mgo.InitPool()
+}
+func initVarData()  {
+	qu.ReadConfig(&sysconfig)
+	initMgo()
+	nextNode = qu.ObjArrToMapArr(sysconfig["nextNode"].([]interface{}))
+}
+
+//加载抽取
+func initExtractNode()  {
+	resetExtNodeArr() //重置抽取节点数组
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	it := sess.DB(data_mgo.DbName).C(data_c_name).Find(&q).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp);{
+		isuse := qu.IntAll(tmp["isuse"])
+		if isuse == 0 {
+			invalid_ext_node = append(invalid_ext_node,tmp)
+		}else if isuse == 1 {
+			using_ext_node = append(using_ext_node,tmp)
+		}else if isuse == 2 {
+			standby_ext_node = append(standby_ext_node,tmp)
+		}else {
+
+		}
+		tmp = make(map[string]interface{})
+	}
+	//根据实际情况~把备用节点~与正常节点综合一下
+	for {
+		if len(using_ext_node) < 3 {
+			if len(standby_ext_node)==0 {
+				break
+			}
+			tmp_node := standby_ext_node[0]
+			using_ext_node = append(using_ext_node,tmp_node)
+			standby_ext_node = standby_ext_node[1:]
+		}else {
+			break
+		}
+	}
+	if len(using_ext_node)<=0 {
+		sendErrMailApi("抽取控制程序停止~严重错误","当前无可用机器......")
+	}else if len(using_ext_node)==1 {
+		//sendErrMailApi("抽取控制中心~警告","当前可用机器...仅有一个...请检查...")
+	}else {
+
+	}
+	log.Debug("综合后节点~有效~备用~无效",len(using_ext_node),len(standby_ext_node),len(invalid_ext_node))
+}
+
+
+//重置抽取
+func resetExtNodeArr () {
+	isAction = false
+	using_ext_node = []map[string]interface{}{}
+	standby_ext_node = []map[string]interface{}{}
+	invalid_ext_node = []map[string]interface{}{}
+
+	extractAction = map[string]map[string]interface{}{}
+	heartAction = map[string]interface{}{}
+
+}

+ 12 - 184
udpcontrol/src/main.go

@@ -1,201 +1,29 @@
 package main
 package main
 
 
 import (
 import (
-	"encoding/json"
-	"fmt"
-	"log"
+	log "github.com/donnie4w/go-logger/logger"
 	mu "mfw/util"
 	mu "mfw/util"
-	"net"
-	qu "qfw/util"
-	"strconv"
-	"sync"
-	"time"
-)
-var (
-	Config    		map[string]interface{} 		//配置文件
-	nextNode     	[]map[string]interface{} 	//下节点数组
-	extractNode     []map[string]interface{} 	//抽取节点数组
-	udpclient    	mu.UdpClient             	//udp对象
-	extractLevel    map[string]interface{} 		//抽取节点状态
-	udplock 		sync.Mutex         			//锁
 )
 )
+
 func init() {
 func init() {
-	qu.ReadConfig(&Config)
-	nextNode = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
-	extractNode = qu.ObjArrToMapArr(Config["extractNode"].([]interface{}))
-	resetExtractLevel()
-}
-//重置抽取状态
-func resetExtractLevel()  {
-	extractLevel = make(map[string]interface{},0)
-	for _,v:=range extractNode{
-		key := fmt.Sprintf("%s",qu.ObjToString(v["stype"]))
-		extractLevel[key] = 0
-	}
-}
-func main()  {
-	go checkMailJob()
-	updport := Config["udpport"].(string)
+	initVarData() 	//初始化属性
+	updport := sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", updport)
-	time.Sleep(99999 * time.Hour)
+	log.Debug("Udp服务监听", updport)
 }
 }
 
 
-//udp接收
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var mapInfo map[string]interface{}
-		err := json.Unmarshal(data, &mapInfo)
-		if err != nil {
-			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
-		} else if mapInfo != nil {
-			sid, _ := mapInfo["gtid"].(string)
-			eid, _ := mapInfo["lteid"].(string)
-			if sid == "" || eid == "" {
-				log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
-			} else {
-				udpinfo, _ := mapInfo["key"].(string)
-				if udpinfo == "" {
-					udpinfo = "udpok"
-				}
-				go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
-				log.Println("")
-				log.Println("接收当前段落,udp通知抽取-需拆分",len(extractNode),"组", sid, "~~", eid)
-				udplock.Lock()
-				resetExtractLevel() //重置状态
-				extractLevel["sid"]=sid
-				extractLevel["eid"]=eid
-				udplock.Unlock()
+func main()  {
+	go extractRunningMonitoring() //监控
 
 
-				//拆分段落方法
-				splitArr:=splitIdMethod(sid,eid)
-				if len(splitArr)!=len(extractNode){//直接发送整段
-					log.Println("段落划分异常...请检查程序...")
-				}
 
 
-				key:=fmt.Sprintf("%s~%s",sid,eid)
-				node := &udpNode{time.Now().Unix()}
-				udptaskmap.Store(key, node)
-				sendExtractNode(splitArr) //通知抽取
-			}
-		}
-	case mu.OP_NOOP: //下个节点回应
-		//抽取多节点
-		udplock.Lock()
-		str := string(data)
-		if extractLevel[str] != nil {
-			extractLevel[str] = 1
-			log.Println("抽取节点回应:",str)
-			f := validExtractFinish() //验证段落是否均抽取完毕
-			if f {//发送下节点整体udp,补城市,敏感词等
-				sid := qu.ObjToString(extractLevel["sid"])
-				eid := qu.ObjToString(extractLevel["eid"])
-				if sid != ""&&eid != "" {
-					key:=fmt.Sprintf("%s~%s",sid,eid)
-					udptaskmap.Delete(key)
-					sendNextNode(sid,eid)
-				}
-			}
-		}else {
-			log.Println("其他节点回应:",str)
-		}
-		udplock.Unlock()
-	}
+	lock := make(chan bool)
+	<-lock
 }
 }
 
 
 
 
 
 
-//验证抽取是否完毕	不验证-sid eid key
-func validExtractFinish() bool  {
-	for k,v :=range extractLevel{
-		if k=="sid" || k=="eid" {
-			continue
-		}
-		if qu.Int64All(v)==0 {
-			return false
-		}
-	}
-	return true
-}
-//拆分ID段方法
-func splitIdMethod(sid string,eid string)([]map[string]interface{}) {
-	dataArr := make([]map[string]interface{},0)
-	if len(extractNode)==1 {
-		dataArr = append(dataArr, map[string]interface{}{
-			"sid":sid,
-			"eid":eid,
-		})
-	}else {
-		interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
-		num := interval/int64(len(extractNode))
-		tmp_time :=  hex2Dec(string(sid[:8]))+num
-		for  i:=0;i<len(extractNode);i++ {
-			if i==0 {
-				tmp_eid := fmt.Sprintf("%x",tmp_time)
-				dataArr = append(dataArr, map[string]interface{}{
-					"sid":sid,
-					"eid":tmp_eid+"0000000000000000",
-				})
-			}else if i==len(extractNode)-1 {
-				tmp_sid := fmt.Sprintf("%x",tmp_time)
-				dataArr = append(dataArr, map[string]interface{}{
-					"sid":tmp_sid+"0000000000000000",
-					"eid":eid,
-				})
-			}else {
-				tmp_sid := fmt.Sprintf("%x",tmp_time)
-				tmp_time = tmp_time+num
-				tmp_eid := fmt.Sprintf("%x",tmp_time)
-				dataArr = append(dataArr, map[string]interface{}{
-					"sid":tmp_sid+"0000000000000000",
-					"eid":tmp_eid+"0000000000000000",
-				})
-			}
-		}
-	}
-	return dataArr
-}
-func hex2Dec(val string)int64{
-	n,_ := strconv.ParseInt(val,16,32)
-	return n
-}
-//发送抽取
-func sendExtractNode(splitArr []map[string]interface{})  {
-	for index, node := range extractNode {
-		tmp:=splitArr[index]
-		by, _ := json.Marshal(map[string]interface{}{
-			"gtid":  qu.ObjToString(tmp["sid"]),
-			"lteid": qu.ObjToString(tmp["eid"]),
-			"stype": qu.ObjToString(node["stype"]),
-		})
-		err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-			IP:   net.ParseIP(node["addr"].(string)),
-			Port: qu.IntAll(node["port"]),
-		})
-		if err != nil {
-			log.Println("发送段落异常:",node,tmp,err)
-		}
-	}
-	log.Println("通知抽取udp...等待抽取...回应...")
-}
 
 
-//发送其他
-func sendNextNode(sid string,eid string)  {
-	for _, node := range nextNode {
-		by, _ := json.Marshal(map[string]interface{}{
-			"gtid":  sid,
-			"lteid": eid,
-			"stype": qu.ObjToString(node["stype"]),
-		})
-		err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-			IP:   net.ParseIP(node["addr"].(string)),
-			Port: qu.IntAll(node["port"]),
-		})
-		if err != nil {
-			log.Println(err)
-		}
-	}
-	log.Println("udp通知抽取完成...通知下阶段udp-敏感词,补城市",sid,"~",eid)
-}
+
+
+

+ 241 - 0
udpcontrol/src/method.go

@@ -0,0 +1,241 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	log "github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+	"strings"
+	"sync"
+	"time"
+)
+
+var methodlock 		sync.Mutex
+
+//监控~当前抽取段~状态  生命周期
+func extractRunningMonitoring()  {
+	for  {
+		if isAction {
+			log.Debug("检测一次...")
+			time_now := time.Now().Unix()
+			isErr := false
+			methodlock.Lock()
+			for k,v := range extractAction {
+				if k=="extract_ids" {
+					continue
+				}
+				//抽取行为完成~状态
+				action:=qu.IntAll(v["action"])
+				if action==1 {
+					continue
+				}
+
+				//心跳监测~回应
+				keyArr := strings.Split(k,":")
+				if len(keyArr)==3 {
+					by, _ := json.Marshal(map[string]interface{}{
+						"stype": "heart_extract",
+						"skey" :"heart_extract"+k,
+					})
+					sendSingleOtherNode(by,keyArr[0],keyArr[1])
+					heart_num := qu.IntAll(heartAction[k])
+					heartAction[k] = heart_num+1
+				}
+				life:=qu.Int64All(v["life"])
+				if time_now>life || qu.IntAll(heartAction[k]) > 10 {
+					isErr = true //超时~无响应~认为机器异常
+					data_mgo.UpdateById(data_c_name,qu.ObjToString(v["uid"]), map[string]interface{}{
+						"$set": map[string]interface{}{
+							"isuse":0,
+						},
+					})
+				}
+
+			}
+			methodlock.Unlock()
+			if isErr {
+				sid:= qu.ObjToString(extractAction["extract_ids"]["sid"])
+				eid:= qu.ObjToString(extractAction["extract_ids"]["eid"])
+				isAction = false
+				sendStopExtractNode(using_ext_node)
+				if len(standby_ext_node)==0 {
+					sendErrMailApi("抽取控制告警~机器异常~无备用机器",fmt.Sprintf("此段落需要过滤~%s~%s",sid,eid))
+					time.Sleep(15*time.Second)
+					sendNextNode(sid,eid)
+				}else {
+					sendErrMailApi("抽取控制告警~机器异常~有备用机器",fmt.Sprintf("启用备用机器~%s~%s",sid,eid))
+					time.Sleep(15*time.Second)
+					dealWithExtUdpData(sid,eid)
+				}
+			}
+		}
+		time.Sleep(15*time.Second)
+	}
+}
+
+
+//验证抽取是否完毕	不验证-extract_ids~key
+func validExtractFinish() bool  {
+	for k,v :=range extractAction{
+		if k=="extract_ids" {
+			continue
+		}
+		if qu.Int64All(v["action"])==0 {
+			return false
+		}
+	}
+	return true
+}
+//拆分ID段方法
+func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
+	dataArr := make([]map[string]interface{},0)
+	lifeArr := make([]int64,0)
+	if sid=="" || eid=="" || len(using_ext_node)==0 {
+		return dataArr,lifeArr
+	}
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	q ,total := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(sid),
+			"$lte": StringTOBsonId(eid),
+		},
+	},int64(0)
+	count,_ := sess.DB(data_mgo.DbName).C(data_s_name).Find(&q).Count()
+	if len(using_ext_node)==1 {
+		dataArr = append(dataArr, map[string]interface{}{
+			"sid":sid,
+			"eid":eid,
+		})
+		lifeArr = append(lifeArr,calculateLiftime(count))
+	} else {
+		node_num := int64(len(using_ext_node))
+		if count<node_num{ //采用一个节点-多余临时删除
+			tmp_node := using_ext_node[0]
+			using_ext_node = []map[string]interface{}{}
+			using_ext_node = append(using_ext_node,tmp_node)
+			dataArr = append(dataArr, map[string]interface{}{
+				"sid":sid,
+				"eid":eid,
+			})
+			lifeArr = append(lifeArr,calculateLiftime(count))
+		}else {
+			limit := count/node_num
+			limit_lifetime := calculateLiftime(limit)
+			tmp_sid:=sid
+			it := sess.DB(data_mgo.DbName).C(data_s_name).Find(&q).Sort("_id").Select(map[string]interface{}{
+				"_id":1,
+			}).Iter()
+			for tmp := make(map[string]interface{}); it.Next(&tmp);{
+				total++
+				if total%limit==0 {
+					if total/limit==node_num {
+						dataArr = append(dataArr, map[string]interface{}{
+							"sid":tmp_sid,
+							"eid":eid,
+						})
+						lifeArr = append(lifeArr,limit_lifetime)
+						break
+					}else {
+						dataArr = append(dataArr, map[string]interface{}{
+							"sid":tmp_sid,
+							"eid":BsonTOStringId(tmp["_id"]),
+						})
+						tmp_sid = BsonTOStringId(tmp["_id"])
+						lifeArr = append(lifeArr,limit_lifetime)
+					}
+				}
+				tmp = make(map[string]interface{})
+			}
+		}
+	}
+
+	if len(dataArr)!=len(using_ext_node) || len(dataArr)!=len(lifeArr) {
+		log.Debug("划分段落异常~请检查~只能采用唯一节点~")
+		tmp_node := using_ext_node[0]
+		using_ext_node = []map[string]interface{}{}
+		using_ext_node = append(using_ext_node,tmp_node)
+		dataArr = []map[string]interface{}{}
+		lifeArr = []int64{}
+		dataArr = append(dataArr, map[string]interface{}{
+			"sid":sid,
+			"eid":eid,
+		})
+		lifeArr = append(lifeArr,calculateLiftime(count))
+	}
+	return dataArr,lifeArr
+}
+//计算生命周期
+func calculateLiftime(count int64) int64 {
+	time_one := 1000.0/1000.0//暂定~每千条用时1000秒
+	life_time := int64(time_one*float64(count)*3.0)
+	if life_time<2000 {
+		life_time = 2000
+	}
+	return time.Now().Unix()+life_time
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+//暂时弃用
+//func sqlitID(){
+//	if len(using_ext_node)==1 {
+//		dataArr = append(dataArr, map[string]interface{}{
+//			"sid":sid,
+//			"eid":eid,
+//		})
+//
+//	}else {
+//		interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
+//		num := interval/int64(len(using_ext_node))
+//		tmp_time :=  hex2Dec(string(sid[:8]))+num
+//		for  i:=0;i<len(using_ext_node);i++ {
+//			if i==0 {
+//				tmp_eid := fmt.Sprintf("%x",tmp_time)
+//				dataArr = append(dataArr, map[string]interface{}{
+//					"sid":sid,
+//					"eid":tmp_eid+"0000000000000000",
+//				})
+//			}else if i==len(using_ext_node)-1 {
+//				tmp_sid := fmt.Sprintf("%x",tmp_time)
+//				dataArr = append(dataArr, map[string]interface{}{
+//					"sid":tmp_sid+"0000000000000000",
+//					"eid":eid,
+//				})
+//			}else {
+//				tmp_sid := fmt.Sprintf("%x",tmp_time)
+//				tmp_time = tmp_time+num
+//				tmp_eid := fmt.Sprintf("%x",tmp_time)
+//				dataArr = append(dataArr, map[string]interface{}{
+//					"sid":tmp_sid+"0000000000000000",
+//					"eid":tmp_eid+"0000000000000000",
+//				})
+//			}
+//		}
+//	}
+//}
+//
+//func hex2Dec(val string)int64{
+//	n,_ := strconv.ParseInt(val,16,32)
+//	return n
+//}

+ 349 - 0
udpcontrol/src/mgo.go

@@ -0,0 +1,349 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Count() (int64, error) {
+	if ms.Query != nil {
+		return ms.M.C.Database(ms.Db).Collection(ms.Coll).CountDocuments(ms.M.Ctx, ms.Query)
+	}
+	return ms.M.C.Database(ms.Db).Collection(ms.Coll).EstimatedDocumentCount(ms.M.Ctx)
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+func (m *MongodbSim) UpdateStrId(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": id}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 29 - 0
udpcontrol/src/sendmail.go

@@ -0,0 +1,29 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+)
+var tomail string
+var api string
+
+func sendErrMailApi(title,body string)  {
+	log.Println(title,body)
+	return
+	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail,api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	}else {
+		log.Println("邮件发送失败:", err)
+	}
+}

+ 0 - 48
udpcontrol/src/udptaskmail.go

@@ -1,48 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"io/ioutil"
-	"log"
-	"net/http"
-	"sync"
-	"time"
-)
-
-var udptaskmap = &sync.Map{}
-var tomail string
-var api string
-
-type udpNode struct {
-	timestamp int64
-}
-
-func checkMailJob() {
-
-	//阿里云内网无法发送邮件
-	jkmail, _ := Config["jkmail"].(map[string]interface{})
-	if jkmail != nil {
-		tomail, _ = jkmail["to"].(string)
-		api, _ = jkmail["api"].(string)
-	}
-	log.Println("start check mail Job", tomail, Config["jkmail"])
-	for {
-		udptaskmap.Range(func(k, v interface{}) bool {
-			now := time.Now().Unix()
-			node, _ := v.(*udpNode)
-			if now-node.timestamp >= 600 {
-				udptaskmap.Delete(k)
-				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract_control-warning",k.(string)))
-				if err == nil {
-					defer res.Body.Close()
-					read, err := ioutil.ReadAll(res.Body)
-					log.Println("控制中心-邮件发送成功:", string(read), err)
-				}else {
-					log.Println("控制中心-邮件发送异常:", err)
-				}
-			}
-			return true
-		})
-		time.Sleep(60 * time.Second)
-	}
-}

+ 161 - 0
udpcontrol/src/updprocess.go

@@ -0,0 +1,161 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	log "github.com/donnie4w/go-logger/logger"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"strings"
+	"sync"
+)
+var (
+	nextNode     		[]map[string]interface{}
+	udpclient    		mu.UdpClient
+	udplock 			sync.Mutex
+	extractAction    	map[string]map[string]interface{}
+	heartAction			map[string]interface{}
+	isAction			bool
+	using_ext_node,standby_ext_node,invalid_ext_node	[]map[string]interface{}
+)
+//udp接收
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			sid ,eid:= qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
+			if sid == "" || eid == "" {
+				log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
+			} else {
+				go udpclient.WriteUdp([]byte("ok"), mu.OP_NOOP, ra)
+				udplock.Lock()
+				dealWithExtUdpData(sid,eid)
+				udplock.Unlock()
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		//抽取多节点
+		udplock.Lock()
+		str := string(data)
+		if strings.Contains(str,"heart_extract") {
+			dealWithHeartBackUdpData(strings.ReplaceAll(str,"heart_extract",""))
+		}else {
+			dealWithCallBackUdpData(str)
+		}
+		udplock.Unlock()
+	}
+}
+
+//处理~新接收抽取段~
+func dealWithExtUdpData(sid,eid string) {
+	//获取最新-抽取节点状态
+	initExtractNode()
+	log.Debug("接收当前段落,udp通知抽取-需拆分",len(using_ext_node),"组", sid, "~~", eid)
+	if len(using_ext_node)>0 {
+		//拆分段落方法~并附加抽取状态标记~有效期等
+		splitArr,lifeArr:=splitIdMethod(sid,eid)
+		extractAction = map[string]map[string]interface{}{}
+		heartAction = map[string]interface{}{}
+		for k,v:=range using_ext_node{
+			skey := fmt.Sprintf("%s:%d:%s",v["addr"],v["port"],v["stype"])
+			extractAction[skey] = map[string]interface{}{
+				"life":lifeArr[k],
+				"action":0,
+				"uid":BsonTOStringId(v["_id"]),
+			}
+			heartAction[skey] = 0
+		}
+		extractAction["extract_ids"] = map[string]interface{}{
+			"sid":sid,
+			"eid":eid,
+		}
+		sendRunExtractNode(splitArr) //通知抽取
+	}else {
+		log.Debug("无有效机器抽取...程序停止于此...")
+	}
+}
+
+//处理回调udp~相关数据
+func dealWithCallBackUdpData(str string) {
+	if extractAction[str] != nil {
+		extractAction[str]["action"] = 1
+		log.Debug("抽取节点回应:",str)
+		f := validExtractFinish()
+		if f {
+			sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
+			eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
+			isAction = false
+			sendNextNode(sid,eid)
+		}
+	}else {
+		log.Debug("其他节点回应:",str)
+	}
+}
+//处理-心跳回调
+func dealWithHeartBackUdpData(str string) {
+	if heartAction[str] != nil {
+		heartAction[str] = 0
+	}
+}
+
+
+//通知所有节点~进行抽取~
+func sendRunExtractNode(splitArr []map[string]interface{})  {
+	for index, node := range using_ext_node {
+		tmp:=splitArr[index]
+		skey := fmt.Sprintf("%s:%d:%s",node["addr"],node["port"],node["stype"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  qu.ObjToString(tmp["sid"]),
+			"lteid": qu.ObjToString(tmp["eid"]),
+			"stype": skey,
+		})
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		})
+	}
+	isAction = true
+	log.Debug("通知抽取udp...等待抽取...回应...")
+}
+
+//通知所有抽取节点~结束抽取
+func sendStopExtractNode(splitArr []map[string]interface{})  {
+	for _, node := range using_ext_node {
+		by, _ := json.Marshal(map[string]interface{}{
+			"stype": "stop_extract",
+		})
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		})
+	}
+}
+
+//发送下阶段节点~
+func sendNextNode(sid string,eid string)  {
+	for _, node := range nextNode {
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": qu.ObjToString(node["stype"]),
+		})
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		})
+	}
+	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市",sid,"~",eid)
+}
+
+//发送单节点~
+func sendSingleOtherNode(by []byte,addr string,port string )  {
+	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+		IP:   net.ParseIP(addr),
+		Port: qu.IntAll(port),
+	})
+}

+ 9 - 1
udpcontrol/src/mark → udpcontrol/src/zhengkun

@@ -43,4 +43,12 @@
             "memo": "敏感词清理"
             "memo": "敏感词清理"
         }
         }
     ]
     ]
-}
+}
+
+
+
+
+
+
+
+

+ 53 - 0
udpcontrol/src/zk_taskmail.go

@@ -0,0 +1,53 @@
+package main
+//
+//import (
+//	"fmt"
+//	"io/ioutil"
+//	"log"
+//	"net/http"
+//	"sync"
+//	"time"
+//)
+//
+//var udptaskmap = &sync.Map{}
+//var tomail string
+//var api string
+//
+//type udpNode struct {
+//	timestamp int64
+//}
+//
+//func checkMailJob() {
+//
+//	//阿里云内网无法发送邮件
+//	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
+//	if jkmail != nil {
+//		tomail, _ = jkmail["to"].(string)
+//		api, _ = jkmail["api"].(string)
+//	}
+//	for {
+//		udptaskmap.Range(func(k, v interface{}) bool {
+//			now := time.Now().Unix()
+//			node, _ := v.(*udpNode)
+//			if now-node.timestamp >= 600 {
+//				udptaskmap.Delete(k)
+//				res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "警告:抽取十分钟段落未抽完",k.(string)))
+//				if err == nil {
+//					defer res.Body.Close()
+//					read, err := ioutil.ReadAll(res.Body)
+//					log.Println("控制中心-邮件发送成功:", string(read), err)
+//				}else {
+//					log.Println("控制中心-邮件发送异常:", err)
+//				}
+//			}
+//			return true
+//		})
+//		time.Sleep(60 * time.Second)
+//	}
+//}
+//
+///*
+//key:=fmt.Sprintf("%s~%s",sid,eid)
+//node := &udpNode{time.Now().Unix()}
+//udptaskmap.Store(key, node)
+//*/

+ 1 - 1
udps/main.go

@@ -19,7 +19,7 @@ func main() {
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.IntVar(&p, "p", 6601, "端口")
+	flag.IntVar(&p, "p", 1784, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")