fengweiqiang 4 سال پیش
والد
کامیت
ddcb8f942d

+ 22 - 26
udpdataclear/udpSensitiveWords/grpc_server/main.go

@@ -28,18 +28,19 @@ var Filter *sensitive.Filter
 var es_type, es_index string
 var Client_Es *elastic.Client
 
-var MixDataMgo  *util.MongodbSim
+var MixDataMgo *util.MongodbSim
 var (
 	// Create a metrics registry.
 	reg = prometheus.NewRegistry()
 
 	// Create some standard server metrics.
-	grpcMetrics = grpc_prometheus.NewServerMetrics()
+	grpcMetrics             = grpc_prometheus.NewServerMetrics()
 	customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Name: "demo_server_search_method_handle_count",
 		Help: "Total number of RPCs handled on the server.",
 	}, []string{"name"})
 )
+
 func init() {
 	yamlFile, err := ioutil.ReadFile(YAMLFILE)
 	if err != nil {
@@ -59,11 +60,9 @@ func init() {
 	}
 	MixDataMgo.InitPool()
 
-	Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
-
-
-	es_type, es_index = "azktest","azktest"
+	Client_Es, _ = elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
 
+	es_type, es_index = "azktest", "azktest"
 
 	//政府采购 - 公共资源   (23 25)
 
@@ -71,28 +70,26 @@ func init() {
 
 }
 
-
-
 func main() {
 
-/*
-	MixDataMgo = &util.MongodbSim{
-		MongodbAddr: "192.168.3.207:27092",
-		Size:        20,
-		DbName:      "zhengkun",
-		UserName:    "",
-		PassWord:    "",
-	}
-	MixDataMgo.InitPool()
+	/*
+		MixDataMgo = &util.MongodbSim{
+			MongodbAddr: "192.168.3.207:27092",
+			Size:        20,
+			DbName:      "zhengkun",
+			UserName:    "",
+			PassWord:    "",
+		}
+		MixDataMgo.InitPool()
 
-	Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
+		Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
 
 
-	es_type, es_index = "azktest","azktest"
+		es_type, es_index = "azktest","azktest"
 
-	temporaryTest()
-	return
-*/
+		temporaryTest()
+		return
+	*/
 	//淡赌跑断
 	if YamlConfig.IsAddTask == 0 {
 		initSensitiveWordsData() //初始化敏感词数据
@@ -108,8 +105,8 @@ func main() {
 	// Create a HTTP server for prometheus.
 	httpServer := &http.Server{
 		Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
-		Addr: fmt.Sprintf("0.0.0.0:%d",
-			2092),
+		Addr: fmt.Sprintf("0.0.0.0:%v",
+			YamlConfig.LogPort),
 	}
 	grpcServer := grpc.NewServer(
 		grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
@@ -158,6 +155,5 @@ type YAMLConfig struct {
 	TaskLteId       string `yaml:"taskLteId"`
 	IsAddTask       int    `yaml:"isAddTask"`
 	Port            string `yaml:"port"`
+	LogPort         string `yaml:"log_port"`
 }
-
-

+ 12 - 8
udpdataclear/udpSensitiveWords/util/config.go

@@ -3,6 +3,7 @@ package util
 import (
 	"context"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 	"gopkg.in/olivere/elastic.v1"
 	"log"
 	"net/http"
@@ -28,10 +29,9 @@ func InitC() {
 	}
 	QfwMgo85.InitPool()
 
-	Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
+	Client_Es, _ = elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
 
-
-	es_type, es_index = Config["es_type"].(string),Config["es_index"].(string)
+	es_type, es_index = Config["es_type"].(string), Config["es_index"].(string)
 
 	Fields = Config["fields"].(map[string]interface{})
 	FindBuyerC, FindAgencyC, FindWinnerC = Config["buyer_c"].(string), Config["agency_c"].(string), Config["winner_c"].(string)
@@ -40,18 +40,22 @@ func InitC() {
 	for _, v := range qaddrs {
 		ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5)
 		defer cancelFunc()
-		conn, err := grpc.DialContext(ctx,v.(string),
+		conn, err := grpc.DialContext(ctx, v.(string),
 			grpc.WithInsecure(),
 			grpc.WithBlock(),
+			grpc.WithKeepaliveParams(keepalive.ClientParameters{
+				Time:                10 * time.Second,
+				Timeout:             100 * time.Millisecond,
+				PermitWithoutStream: true}),
 		)
 		if err != nil {
 			log.Fatalf("did not connect: %s %v", v, err)
 		}
 		c := proto_grpc.NewSensitiveWordsClient(conn)
 		_, err = c.Registration(ctx, &proto_grpc.NumberOfRegistrations{Count: 1})
-		if err != nil{
+		if err != nil {
 			log.Fatalln(err)
-		}else {
+		} else {
 			QAddrs = append(QAddrs, &c)
 		}
 	}
@@ -64,5 +68,5 @@ var Collection string
 var Fields map[string]interface{}
 var FindBuyerC, FindAgencyC, FindWinnerC string
 var QAddrs []*proto_grpc.SensitiveWordsClient
-var es_type, es_index	string
-var Client_Es  *elastic.Client
+var es_type, es_index string
+var Client_Es *elastic.Client

+ 4 - 9
udpdataclear/udpSensitiveWords/util/udpdata.go

@@ -11,6 +11,7 @@ import (
 	"strings"
 	"time"
 )
+
 var task chan struct{} = make(chan struct{}, 1)
 var Udpclient UdpClient //udp对象
 var nextNodes []map[string]interface{}
@@ -27,9 +28,6 @@ func ExtractUdp() {
 	QuerySensitiveWords(sid,eid )*/
 }
 
-
-
-
 func QuerySensitiveWords(sid, eid string) {
 	log.Println("QuerySensitiveWords:", sid, eid)
 	objSid, err := primitive.ObjectIDFromHex(sid)
@@ -115,6 +113,7 @@ func query_grpc(enterprise, findC string) string {
 			sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
 			if err != nil {
 				log.Println(index, err)
+				c <- map[int]string{index: ""}
 				return
 			}
 			c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
@@ -128,7 +127,7 @@ func query_grpc(enterprise, findC string) string {
 			if vv == "" {
 				continue
 			}
-			result = append(result,vv)
+			result = append(result, vv)
 		}
 		q++
 		if q >= lenc {
@@ -136,12 +135,9 @@ func query_grpc(enterprise, findC string) string {
 		}
 	}
 
-	return strings.Join(result,",")
+	return strings.Join(result, ",")
 }
 
-
-
-
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	task <- struct{}{}
 	defer func() {
@@ -183,4 +179,3 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		log.Println(string(data))
 	}
 }
-