Przeglądaj źródła

添加udp 转发配置,支持转发其他udp地址

wcc 1 rok temu
rodzic
commit
6581591f83

+ 2 - 0
createEsIndex/common.toml

@@ -2,6 +2,8 @@
     locport = ":17834"
     jyaddr = "127.0.0.1"
     jyport = 11118
+    neaddr = "127.0.0.1" ## 转发下个 es地址,
+    neport = 1784
 
 [db]
 [db.mongoB] ## bidding标讯数据

+ 2 - 0
createEsIndex/config/conf.go

@@ -35,6 +35,8 @@ type udp struct {
 	LocPort string
 	JyAddr  string
 	JyPort  int
+	NeAddr  string
+	NePort  int
 }
 
 type udpNext struct {

+ 12 - 0
createEsIndex/main.go

@@ -32,6 +32,7 @@ var (
 	UdpClient  udp.UdpClient
 	UdpTaskMap = &sync.Map{}
 	JyUdpAddr  *net.UDPAddr
+	NeUdpAddr  *net.UDPAddr
 
 	EsBulkSize        = 50                                        // es批量保存大小
 	updateBiddingPool = make(chan []map[string]interface{}, 5000) //更新bingding数据
@@ -70,6 +71,13 @@ func init() {
 		Port: util.IntAll(config.Conf.Udp.JyPort),
 	}
 
+	if config.Conf.Udp.NeAddr != "" {
+		NeUdpAddr = &net.UDPAddr{
+			IP:   net.ParseIP(config.Conf.Udp.NeAddr),
+			Port: util.IntAll(config.Conf.Udp.NePort),
+		}
+	}
+
 	BiddingLastNodeResponse = time.Now().Unix()
 	ProjectLastNodeResponse = time.Now().Unix()
 
@@ -115,6 +123,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				key = "udpok"
 			}
 			go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
+			//有udp 转发时
+			if config.Conf.Udp.NeAddr != "" {
+				go SendUdpMsg(mapInfo, NeUdpAddr)
+			}
 			tasktype, _ := mapInfo["stype"].(string)
 			switch tasktype {
 			case "index-by-id": //单个索引,更新pici

+ 10 - 0
createEsIndex/utils.go

@@ -1,12 +1,15 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"math"
+	"net"
 	"regexp"
 	"sort"
 	"strings"
@@ -416,3 +419,10 @@ func ruleBuyer(input string) (res bool) {
 	return res
 
 }
+
+//SendUdpMsg 通知处理企业新增数据
+func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
+	bytes, _ := json.Marshal(data)
+	UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
+	log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target))
+}