|
@@ -17,11 +17,9 @@ import (
|
|
|
)
|
|
|
|
|
|
var Udpclient mu.UdpClient //udp对象
|
|
|
-var nextNodes []map[string]interface{}
|
|
|
|
|
|
//udp通知抽取
|
|
|
func ExtractUdp() {
|
|
|
- nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
|
|
|
Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
|
|
|
Udpclient.Listen(processUdpMsg)
|
|
|
}
|
|
@@ -58,28 +56,13 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
if sid == "" || eid == "" {
|
|
|
log.Debug("err", "sid=", sid, ",eid=", eid)
|
|
|
} else {
|
|
|
- udpinfo, _ := rep["key"].(string)
|
|
|
+ udpinfo, _ := rep["stype"].(string)
|
|
|
if udpinfo == "" {
|
|
|
udpinfo = "udpok"
|
|
|
}
|
|
|
- go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
|
|
|
- log.Debug("udp通知抽取id段", sid, " ", eid)
|
|
|
ExtractByUdp(sid, eid, ra)
|
|
|
- for _, m := range nextNodes {
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
- "stype": qu.ObjToString(m["stype"]),
|
|
|
- })
|
|
|
- err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(m["addr"].(string)),
|
|
|
- Port: qu.IntAll(m["port"]),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- log.Debug(err)
|
|
|
- }
|
|
|
- }
|
|
|
- log.Debug("udp通知抽取完成,eid=", eid)
|
|
|
+ log.Debug("抽取完成udp通知抽取id段",udpinfo, sid, "~", eid)
|
|
|
+ Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
|
|
|
}
|
|
|
}
|
|
|
}
|