Jelajahi Sumber

敏感词库-服务端 , udp监听 - 抽取结果-分词相关处理

apple 4 tahun lalu
induk
melakukan
e71a6a0a35

+ 3 - 6
udpdataclear/udpSensitiveWords/config.json

@@ -23,10 +23,7 @@
   ],
   "userName": "",
   "passWord": "",
-  "winner_es_type": "azktest",
-  "winner_es_index": "azktest",
-  "buyer_es_type": "azktest",
-  "buyer_es_index": "azktest",
-  "agency_es_type": "azktest",
-  "agency_es_index": "azktest"
+  "es_type": "azktest",
+  "es_index": "azktest"
+
 }

+ 2 - 0
udpdataclear/udpSensitiveWords/go.mod

@@ -10,6 +10,8 @@ require (
 	google.golang.org/grpc v1.36.1
 	google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 // indirect
 	google.golang.org/protobuf v1.26.0
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 	gopkg.in/olivere/elastic.v1 v1.0.1
+	gopkg.in/yaml.v2 v2.2.8
 	gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
 )

+ 3 - 0
udpdataclear/udpSensitiveWords/go.sum

@@ -195,9 +195,12 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/olivere/elastic.v1 v1.0.1 h1:ZoJwTKCI0jJdVptoGB0QEFt/4bDUs6A5Pjrmn/Zb+5g=
 gopkg.in/olivere/elastic.v1 v1.0.1/go.mod h1:sMIrW2Y2hS8bEAqdTvdcrNN/KV21XXOfjdi4tHxwVnI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 194 - 0
udpdataclear/udpSensitiveWords/grpc_server/data.go

@@ -0,0 +1,194 @@
+package main
+
+import (
+	"github.com/importcjj/sensitive"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	"regexp"
+	"runtime"
+	"strings"
+	"time"
+)
+
+var reg_alias = regexp.MustCompile("(税务局|工商行政管理局|文化广播电视新闻出版局|外国专家局|" +
+	"中医药管理局|市场监督管理局|广播电视局|医疗保障局|机关事务管理局|粮食和物资储备局|" +
+	"监狱管理局|畜牧兽医局|食品药品监督管理局|城市管理行政执法局|城市管理局|国家保密局|密码管理局|" +
+	"地方金融监督管理局|住房保障和房屋管理局|质量技术监督局|人力资源与社会保障局|公路管理局|国土资源局|" +
+	"卫生和计划生育局|民事政务局|公众安全局|交通管理局|人力资源和社会保障局|劳动和社会保障局|" +
+	"住房和城乡建设局|就业服务局|文物管理局|环境保护局|粮食和物资储备局|教育体育局|" +
+	"体育局|教育局|招商局|农业局|农机局|水务局|林业局|财政局|审计局|统计局|商务局)$")
+var reglen *regexp.Regexp = regexp.MustCompile("^(.{1,5}|.{40,})$")
+var strReg *regexp.Regexp = regexp.MustCompile("^(.{0,3}工程队|.{0,3}总公司|_+|.{0,2}设备安装公司|.{0,2}装[饰修潢]公司|.{0,2}开发公司|.{0,4}有限公司|.{0,4}有限责任公司|.{0,4}设计院|建筑设计研?究?院|省文物考古研究所|经济开发区|省.*|镇人民政府|.{0,2}服务公司|" +
+	".{0,2}工程质量监督站|.{0,3}经[营销]部|.{0,3}事务所|.{0,4}工程公司|.{0,4}责任公司|.*勘测|.{0,4}研究院|.*能源建|.{0,2}安装工程|.*[市省]{1}|.{0,4}中心|.*区.?|" +
+	".{0,3}税务局|.{0,3}财政局|.{0,3}商行|.{0,2}公安处|.{0,2}测绘院|.{0,3}开发|.{0,2}建设局|.{0,2}经销部|.{0,3}委员会|.{0,2}分公司|.{0,2}管理站|.{0,2}事务管理局|" +
+	".*资料|.{0,2}办公用品.{1,2}|.*唯亭|.*设备|.+安装|.{0,2}技术服务|市.+[台院社局司]|城?区.+[府局室院]|县.+[院台局]|.{0,2}发展公司|经济技术开发|" +
+	"发展和改革局|贵州有色地质|铝塑门窗加工|生产力促进中心|特殊普通合伙|工业集团公司|人民调解协会|人民政府办公厅|机电设备公司|房地产开发有限公司|.{0,4}商店|中等专业学校|" +
+	"农村信用联社|.{0,4}经营部|.{0,4}销售部|驾驶员培训学校|.{2}县.{2}镇|保安服务总公司|住房和城乡建设局|地产评估事务所|生产资料门市部|×+|.{0,3}[0-9]{15}|.*[0-9]+|.*路|.*无字号名称.*|.*车|.*[,,]{1}.*|.*个体工商户|.*运输户)$")
+
+//非中文开头...
+var unstart_strReg *regexp.Regexp = regexp.MustCompile("^([\u4e00-\u9fa5])")
+//开头
+var start_strReg *regexp.Regexp = regexp.MustCompile("^([a-zA-Z]{1,2}[\u4e00-\u9fa5]{6,}|省|市|县|区|业绩|资格|中标|项目|预算单位)")
+//结尾
+var end_strReg *regexp.Regexp =  regexp.MustCompile("(\\.|\\.\\.|餐馆|店|腻子|肉庄|画社|美发屋|发廊|网吧|网咖|零售点|新街|包子铺|奶茶铺|(株)|先生|女士|小姐|" +
+	"资格|业绩|中标|项目|预算单位|摊位号|号|厅|室|部|点|馆|场|厂|床|所|处|站|行|中心|合作社|ATMS|" +
+	"吧|楼|摊|摊位|廊|茶社|坊|圃|汤锅|园|民宿|美容院|房|排挡|府|庄|栈|队|批发|苑|养殖户|棋牌|农家乐|货运|" +
+	"城|社|基地|会|服务|娱乐|种植|百货|汽修|农家菜|亭|小吃|快餐|粮库|卫生院|书画院|面|门窗|鸡排|屋|橱|堂|肉铺|服务|服饰|/*)$")
+//包含
+var con_strReg *regexp.Regexp = regexp.MustCompile("(\\?|?|%|代码标识|删除|错误|吊销|注销|发起人|待清理|&#|护照号|身份证号|" +
+	"法人|&nbsp|国家拨入|借款|积累资金|单位自有|认股人|--|、|&|`|美元|[\u4e00-\u9fa5]{2,6}·[\u4e00-\u9fa5]{2,6})|" +
+	"[a-zA-Z]{5,}")
+
+var uncon_strReg *regexp.Regexp = regexp.MustCompile("(园|政府|集团|公司|有限|合伙|企|院|学|局|处)")
+
+
+
+func initSensitiveWordsData()  {
+	log.Println("初始化敏感词-源数据...")
+	gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	log.Println("id段落:",BsonTOStringId(gteid),BsonTOStringId(lteid))
+	iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": gteid,
+			"$lte": lteid,
+		},
+	}).Sort("_id").Iter()
+	Filter = sensitive.New()
+	var initnum uint
+	for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
+		initnum++
+		if initnum%10000==0 {
+			log.Println("current index ", initnum,tmp["qy_name"])
+		}
+		Filter.AddWord(tmp["qy_name"].(string))
+	}
+	log.Println("init ok", initnum)
+}
+
+
+
+
+
+
+
+//定时增量数据处理
+func addTaskSensitiveWordsData()  {
+	mmmgo, err := InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123")
+	if err != nil {
+		log.Fatalln(err)
+	}
+	con := mmmgo.GetCon()
+	if con == nil {
+		log.Fatalln("mgo con err")
+	}
+	Filter = sensitive.New()
+	tick := time.Tick(time.Hour * 24 * 7)//查询七天前
+	for {//定时任务
+		ctime := <-tick
+		cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
+		findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{
+			"updatetime": bson.M{"$gte": cronData.Unix()},
+		}, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1,"company_type": 1,"company_type_old": 1}).SetSort(bson.M{"_id": 1}))
+		if err != nil {
+			log.Println("tick err", cronData)
+			continue
+		}
+		defer findByupdate.Close(nil)
+		for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} {
+			err := findByupdate.Decode(&tmp)
+			if err == nil {
+				if company_name, ok := tmp["company_name"].(string); ok {
+					if reglen.MatchString(company_name) || strReg.MatchString(company_name) {
+						continue
+					}
+					if strings.Contains(ObjToString(tmp["company_type"]),"个人")||
+						strings.Contains(ObjToString(tmp["company_type"]),"个体")||
+						strings.Contains(ObjToString(tmp["company_type_old"]),"个人")||
+						strings.Contains(ObjToString(tmp["company_type_old"]),"个体") {
+						continue
+					}
+
+					//存mgo
+					con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{
+						"qy_name": company_name,
+					})
+					//存敏感词
+					Filter.AddWord(tmp["qy_name"].(string))
+					//存es=判断+新增
+
+
+
+
+
+				}
+			}
+		}
+		log.Println("tick ok", cronData)
+	}
+}
+
+
+
+
+
+
+
+
+
+//处理内存分段
+func dealWithDataMemory()  {
+
+	iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": BsonTOStringId("1fffffffffffffffffffffff"),
+			"$lte":  BsonTOStringId("9fffffffffffffffffffffff"),
+		},
+	}).Sort("_id").Iter()
+	Filter = sensitive.New()
+	var initnum uint
+	saveIdArr ,start_id:= make([]map[string]string,0),""
+	var m runtime.MemStats
+	for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
+		if start_id=="" {
+			start_id = BsonTOStringId(tmp["_id"])
+		}
+		Filter.AddWord(tmp["qy_name"].(string))
+		initnum++
+		if initnum%100000==0 {
+			runtime.ReadMemStats(&m)
+			men :=toMegaBytes(m.HeapAlloc)
+			log.Printf("current index %d\tos %.2f M",initnum, men)
+			if men>7.5*1024 { //7.5G
+				saveIdArr = append(saveIdArr, map[string]string{
+					"start":start_id,
+					"end":BsonTOStringId(tmp["_id"]),
+				})
+				runtime.GC()
+				Filter = sensitive.New()
+				start_id = ""
+				time.Sleep(time.Second*5)
+			}
+		}
+		break
+	}
+
+	saveIdArr = append(saveIdArr, map[string]string{
+		"start":start_id,
+		"end":"",
+	})
+
+	for k,v:=range saveIdArr{
+		log.Println("第",k,"段",v["start"],v["end"])
+	}
+
+	log.Println("memory is ok", initnum)
+}

+ 217 - 62
udpdataclear/udpSensitiveWords/grpc_server/main.go

@@ -6,25 +6,28 @@ import (
 	"github.com/importcjj/sensitive"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"google.golang.org/grpc"
-	"gopkg.in/yaml.v3"
+	"gopkg.in/yaml.v2"
 	"io/ioutil"
 	"log"
+	"math/big"
 	"net"
-	"runtime"
+	"net/http"
 	"sensitiveWords.udp/proto_grpc"
 	"sensitiveWords.udp/util"
+	"strconv"
 	"strings"
-	"time"
+	"gopkg.in/olivere/elastic.v1"
 )
 
 const (
-	PORT     = ":50051"
 	YAMLFILE = "./server.yaml"
 )
 
 var YamlConfig YAMLConfig
 var MixDataMgo *util.MongodbSim
 var Filter *sensitive.Filter
+var es_type, es_index	string
+var Client_Es  *elastic.Client
 
 func init() {
 	yamlFile, err := ioutil.ReadFile(YAMLFILE)
@@ -45,82 +48,59 @@ func init() {
 	}
 	MixDataMgo.InitPool()
 
-	gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gte": gteid,
-			"$lte": lteid,
-		},
-	}).Sort("_id").Iter()
-	Filter = sensitive.New()
-	var initnum uint
-	saveIdArr ,start_id:= make([]map[string]string,0),""
-	var m runtime.MemStats
-	for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
-		if start_id=="" {
-			start_id = BsonTOStringId(tmp["_id"])
-		}
-		Filter.AddWord(tmp["qy_name"].(string))
-		initnum++
-		if initnum%10000==0 {
-			runtime.ReadMemStats(&m)
-			men :=toMegaBytes(m.HeapAlloc)
-			log.Printf("current index %d\tos %.2f M",initnum, men)
-			if men>500 {
-				saveIdArr = append(saveIdArr, map[string]string{
-					"start":start_id,
-					"end":BsonTOStringId(tmp["_id"]),
-				})
-				runtime.GC()
-				Filter = sensitive.New()
-				start_id = ""
-				time.Sleep(time.Second*5)
-			}
-		}
-		break
+	Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
+	es_type, es_index = "azktest","azktest"
 
-	}
 
-	saveIdArr = append(saveIdArr, map[string]string{
-		"start":start_id,
-		"end":"",
-	})
+}
 
-	for k,v:=range saveIdArr{
-		log.Println("第",k,"段",v["start"],v["end"])
-	}
 
-	log.Println("init ok", initnum)
-}
 
+func main() {
 
-func toMegaBytes(bytes uint64) float64 {
-	return float64(bytes) / 1024 / 1024
-}
+	//淡赌跑断
 
+	if YamlConfig.IsAddTask==0{
+		initSensitiveWordsData() //初始化敏感词数据
+	}else {
+		go addTaskSensitiveWordsData() //增量
+	}
 
 
-func main() {
-	lis, err := net.Listen("tcp", PORT)
+	lis, err := net.Listen("tcp", YamlConfig.Port)
 	if err != nil {
 		log.Fatalf("failed to listen: %v", err)
 	}
 	s := grpc.NewServer()
 	proto_grpc.RegisterSensitiveWordsServer(s, &server{})
-	log.Println("server start:", PORT)
+	log.Println("server start:", YamlConfig.Port)
 	if err := s.Serve(lis); err != nil {
 		log.Fatalf("failed to serve: %v", err)
 	}
-
 }
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+//协议方法---等
 type server struct {
 	proto_grpc.SensitiveWordsServer
 }
@@ -140,7 +120,15 @@ type YAMLConfig struct {
 	MongodbPoolSize int    `yaml:"mongodbPoolSize"`
 	TaskGteId       string `yaml:"taskGteId"`
 	TaskLteId       string `yaml:"taskLteId"`
+	IsAddTask       int    `yaml:"isAddTask"`
+	Port            string `yaml:"port"`
 }
+
+
+
+
+
+//其他方法
 func StringTOBsonId(id string) primitive.ObjectID {
 	objectId, _ := primitive.ObjectIDFromHex(id)
 	return objectId
@@ -148,4 +136,171 @@ func StringTOBsonId(id string) primitive.ObjectID {
 
 func BsonTOStringId(id interface{}) string {
 	return id.(primitive.ObjectID).Hex()
-}
+}
+
+
+func toMegaBytes(bytes uint64) float64 {
+	return float64(bytes) / 1024 / 1024
+}
+
+func IntAll(num interface{}) int {
+	return IntAllDef(num, 0)
+}
+
+func Int64All(num interface{}) int64 {
+	if i, ok := num.(int64); ok {
+		return int64(i)
+	} else if i0, ok0 := num.(int32); ok0 {
+		return int64(i0)
+	} else if i1, ok1 := num.(float64); ok1 {
+		return int64(i1)
+	} else if i2, ok2 := num.(int); ok2 {
+		return int64(i2)
+	} else if i3, ok3 := num.(float32); ok3 {
+		return int64(i3)
+	} else if i4, ok4 := num.(string); ok4 {
+		i64, _ := strconv.ParseInt(i4, 10, 64)
+		//in, _ := strconv.Atoi(i4)
+		return i64
+	} else if i5, ok5 := num.(int16); ok5 {
+		return int64(i5)
+	} else if i6, ok6 := num.(int8); ok6 {
+		return int64(i6)
+	} else if i7, ok7 := num.(*big.Int); ok7 {
+		in, _ := strconv.ParseInt(fmt.Sprint(i7), 10, 64)
+		return int64(in)
+	} else if i8, ok8 := num.(*big.Float); ok8 {
+		in, _ := strconv.ParseInt(fmt.Sprint(i8), 10, 64)
+		return int64(in)
+	} else {
+		return 0
+	}
+}
+
+func Float64All(num interface{}) float64 {
+	if i, ok := num.(float64); ok {
+		return float64(i)
+	} else if i0, ok0 := num.(int32); ok0 {
+		return float64(i0)
+	} else if i1, ok1 := num.(int64); ok1 {
+		return float64(i1)
+	} else if i2, ok2 := num.(int); ok2 {
+		return float64(i2)
+	} else if i3, ok3 := num.(float32); ok3 {
+		return float64(i3)
+	} else if i4, ok4 := num.(string); ok4 {
+		in, _ := strconv.ParseFloat(i4, 64)
+		return in
+	} else if i5, ok5 := num.(int16); ok5 {
+		return float64(i5)
+	} else if i6, ok6 := num.(int8); ok6 {
+		return float64(i6)
+	} else if i6, ok6 := num.(uint); ok6 {
+		return float64(i6)
+	} else if i6, ok6 := num.(uint8); ok6 {
+		return float64(i6)
+	} else if i6, ok6 := num.(uint16); ok6 {
+		return float64(i6)
+	} else if i6, ok6 := num.(uint32); ok6 {
+		return float64(i6)
+	} else if i6, ok6 := num.(uint64); ok6 {
+		return float64(i6)
+	} else if i7, ok7 := num.(*big.Float); ok7 {
+		in, _ := strconv.ParseFloat(fmt.Sprint(i7), 64)
+		return float64(in)
+	} else if i8, ok8 := num.(*big.Int); ok8 {
+		in, _ := strconv.ParseFloat(fmt.Sprint(i8), 64)
+		return float64(in)
+	} else {
+		return 0
+	}
+}
+
+func IntAllDef(num interface{}, defaultNum int) int {
+	if i, ok := num.(int); ok {
+		return int(i)
+	} else if i0, ok0 := num.(int32); ok0 {
+		return int(i0)
+	} else if i1, ok1 := num.(float64); ok1 {
+		return int(i1)
+	} else if i2, ok2 := num.(int64); ok2 {
+		return int(i2)
+	} else if i3, ok3 := num.(float32); ok3 {
+		return int(i3)
+	} else if i4, ok4 := num.(string); ok4 {
+		in, _ := strconv.Atoi(i4)
+		return int(in)
+	} else if i5, ok5 := num.(int16); ok5 {
+		return int(i5)
+	} else if i6, ok6 := num.(int8); ok6 {
+		return int(i6)
+	} else if i7, ok7 := num.(*big.Int); ok7 {
+		in, _ := strconv.Atoi(fmt.Sprint(i7))
+		return int(in)
+	} else if i8, ok8 := num.(*big.Float); ok8 {
+		in, _ := strconv.Atoi(fmt.Sprint(i8))
+		return int(in)
+	} else {
+		return defaultNum
+	}
+}
+
+func ObjToString(old interface{}) string {
+	if nil == old {
+		return ""
+	} else {
+		r, _ := old.(string)
+		return r
+	}
+}
+
+func ObjToStringDef(old interface{}, defaultstr string) string {
+	if nil == old {
+		return defaultstr
+	} else {
+		r, _ := old.(string)
+		if r == "" {
+			return defaultstr
+		}
+		return r
+	}
+}
+
+//对象数组转成string数组
+func ObjArrToStringArr(old []interface{}) []string {
+	if old != nil {
+		new := make([]string, len(old))
+		for i, v := range old {
+			new[i] = v.(string)
+		}
+		return new
+	} else {
+		return nil
+	}
+}
+
+//对象数组转成map数组
+func ObjArrToMapArr(old []interface{}) []map[string]interface{} {
+	if old != nil {
+		new := make([]map[string]interface{}, len(old))
+		for i, v := range old {
+			new[i] = v.(map[string]interface{})
+		}
+		return new
+	} else {
+		return nil
+	}
+}
+
+//map数组转成对象数组
+func MapArrToObjArr(old []map[string]interface{}) []interface{} {
+	if old != nil {
+		new := make([]interface{}, len(old))
+		for i, v := range old {
+			new[i] = v
+		}
+		return new
+	} else {
+		return nil
+	}
+}

+ 45 - 0
udpdataclear/udpSensitiveWords/grpc_server/mgo.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"context"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type Mgo struct {
+	Uri                string
+	PoolSize           uint64
+	mgoEn              *mongo.Client
+}
+
+func InitMgoEn(uri string, poolSize uint64,username_password ... string) (*Mgo, error) {
+	//fengweiqiang fwq@123123
+	m := Mgo{}
+	m.Uri = uri
+	if poolSize == 0 {
+		m.PoolSize = 100
+	}
+	//fengweiqiang/fwq@123123
+	options_client := options.Client().
+		ApplyURI(uri).SetMaxPoolSize(m.PoolSize)
+	if len(username_password)==2{
+		options_client = options_client.SetAuth(options.Credential{Username: username_password[0], Password: username_password[1]})
+	}
+	client, err := mongo.Connect(context.Background(), options_client)
+	if err != nil {
+		return nil, err
+	} else {
+		m.mgoEn = client
+		return &m, nil
+	}
+}
+
+func (m *Mgo) GetCon() *mongo.Client {
+	if m.mgoEn != nil {
+		return m.mgoEn
+	} else {
+		return nil
+	}
+}
+
+//var zwjeReg *regexp.Regexp = regexp.MustCompile(`([〇零点壹贰叁肆伍陆柒捌玖拾百佰千仟万萬亿億元圆角分整正]{4,40})`)

+ 4 - 2
udpdataclear/udpSensitiveWords/grpc_server/server.yaml

@@ -3,5 +3,7 @@ mongodbPoolSize: 10
 dbName: mixdata
 userName:
 passWord:
-taskGteId: 1fffffffffffffffffffffff
-taskLteId: 9fffffffffffffffffffffff
+taskGteId: 605d4f3ea15e7ed8e49ec97c
+taskLteId: 605d4f3ea15e7ed8e49ec9ad
+isAddTask: 0
+port: :50051

+ 1 - 0
udpdataclear/udpSensitiveWords/main.go

@@ -8,6 +8,7 @@ import (
 )
 
 func init() {
+
 	util.InitC()
 }
 func main() {

+ 2 - 10
udpdataclear/udpSensitiveWords/util/config.go

@@ -29,9 +29,7 @@ func InitC() {
 	Client_Es ,_= elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")
 
 
-	winner_type, winner_index = Config["winner_es_type"].(string),Config["winner_es_index"].(string)
-	buyer_type, buyer_index = Config["buyer_es_type"].(string),Config["buyer_es_type"].(string)
-	agency_type, agency_index = Config["agency_es_type"].(string),Config["agency_es_type"].(string)
+	es_type, es_index = Config["es_type"].(string),Config["es_index"].(string)
 
 	Fields = Config["fields"].(map[string]interface{})
 	FindBuyerC, FindAgencyC, FindWinnerC = Config["buyer_c"].(string), Config["agency_c"].(string), Config["winner_c"].(string)
@@ -44,10 +42,6 @@ func InitC() {
 		c := proto_grpc.NewSensitiveWordsClient(conn)
 		QAddrs = append(QAddrs, &c)
 	}
-
-
-
-
 }
 
 var Config map[string]interface{}
@@ -55,7 +49,5 @@ var BiddingMgo *MongodbSim
 var Fields map[string]interface{}
 var FindBuyerC, FindAgencyC, FindWinnerC string
 var QAddrs []*proto_grpc.SensitiveWordsClient
-var winner_type, winner_index	string
-var buyer_type, buyer_index		string
-var agency_type, agency_index	string
+var es_type, es_index	string
 var Client_Es  *elastic.Client

+ 59 - 61
udpdataclear/udpSensitiveWords/util/udputil.go → udpdataclear/udpSensitiveWords/util/udpdata.go

@@ -11,7 +11,7 @@ import (
 	"strings"
 	"time"
 )
-
+var task chan struct{} = make(chan struct{}, 1)
 var Udpclient UdpClient //udp对象
 var nextNodes []map[string]interface{}
 
@@ -22,54 +22,14 @@ func ExtractUdp() {
 	log.Println("udp start ", Config["udpport"])
 	Udpclient.Listen(processUdpMsg)
 
+	//临时测试
 	sid := "1fffffffffffffffffffffff"
 	eid := "9fffffffffffffffffffffff"
 	QuerySensitiveWords(sid,eid )
 }
 
-var task chan struct{} = make(chan struct{}, 1)
 
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	task <- struct{}{}
-	defer func() {
-		<-task
-	}()
-	switch act {
-	case OP_TYPE_DATA:
-		var rep map[string]interface{}
-		err := json.Unmarshal(data, &rep)
-		if err != nil {
-			log.Println(err)
-		} else {
-			sid, _ := rep["gtid"].(string)
-			eid, _ := rep["lteid"].(string)
-			if sid == "" || eid == "" {
-				log.Println("err", "sid=", sid, ",eid=", eid)
-				return
-			}
-			go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
-			log.Println("udp通知抽取id段", sid, " ", eid)
-			QuerySensitiveWords(sid, eid)
-			for _, m := range nextNodes {
-				by, _ := json.Marshal(map[string]interface{}{
-					"gtid":  sid,
-					"lteid": eid,
-					"stype": ObjToString(m["stype"]),
-				})
-				err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
-					IP:   net.ParseIP(m["addr"].(string)),
-					Port: IntAll(m["port"]),
-				})
-				if err != nil {
-					log.Println(err)
-				}
-			}
-			log.Println("udp通知抽取完成,eid=", eid)
-		}
-	case OP_NOOP: //下个节点回应
-		log.Println(string(data))
-	}
-}
+
 
 func QuerySensitiveWords(sid, eid string) {
 	log.Println("QuerySensitiveWords:", sid, eid)
@@ -95,11 +55,9 @@ func QuerySensitiveWords(sid, eid string) {
 		if win, isok := tmp["winner"].(string); isok {
 			queryGrpcWinner := query_grpc(win, FindWinnerC)
 			if queryGrpcWinner == "" {
-				/**********处理未匹配的数据-进行es-分词打分比较**********/
-				new_name,b :=dealWithScoreRules(win,winner_type,winner_index)
-				if b {
-					tmp["winner"] = new_name
-				}
+				new_name,b :=dealWithScoreRules(win)
+				if b {tmp["winner"] = new_name}
+				log.Println(new_name)
 			} else {
 				tmp["winner"] = queryGrpcWinner
 			}
@@ -107,10 +65,8 @@ func QuerySensitiveWords(sid, eid string) {
 		if win, isok := tmp["s_winner"].(string); isok {
 			queryGrpcWinner := query_grpc(win, FindWinnerC)
 			if queryGrpcWinner == "" {
-				new_name,b :=dealWithScoreRules(win,winner_type,winner_index)
-				if b {
-					tmp["s_winner"] = new_name
-				}
+				new_name,b :=dealWithScoreRules(win)
+				if b {tmp["s_winner"] = new_name}
 			} else {
 				tmp["s_winner"] = queryGrpcWinner
 			}
@@ -119,10 +75,8 @@ func QuerySensitiveWords(sid, eid string) {
 		if agency, isok := tmp["agency"].(string); isok {
 			queryGrpcAgency := query_grpc(agency, FindAgencyC)
 			if queryGrpcAgency == "" {
-				new_name,b :=dealWithScoreRules(agency,agency_type,agency_index)
-				if b {
-					tmp["agency"] = new_name
-				}
+				new_name,b :=dealWithScoreRules(agency)
+				if b {tmp["agency"] = new_name}
 			} else {
 				tmp["agency"] = queryGrpcAgency
 			}
@@ -131,10 +85,8 @@ func QuerySensitiveWords(sid, eid string) {
 		if buyer, isok := tmp["buyer"].(string); isok {
 			queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
 			if queryGrpcBuyer == "" {
-				new_name,b :=dealWithScoreRules(buyer,buyer_type,buyer_index)
-				if b {
-					tmp["buyer"] = new_name
-				}
+				new_name,b :=dealWithScoreRules(buyer)
+				if b {tmp["buyer"] = new_name}
 			} else {
 				tmp["buyer"] = queryGrpcBuyer
 			}
@@ -182,9 +134,9 @@ func QuerySensitiveWords(sid, eid string) {
 		break //测试
 	}
 	log.Println("处理完成:", num)
-
 }
 
+//grpc - 处理
 func query_grpc(enterprise, findC string) string {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 	defer cancel()
@@ -230,3 +182,49 @@ func query_grpc(enterprise, findC string) string {
 
 	return strings.Join(result,",")
 }
+
+
+
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	task <- struct{}{}
+	defer func() {
+		<-task
+	}()
+	switch act {
+	case OP_TYPE_DATA:
+		var rep map[string]interface{}
+		err := json.Unmarshal(data, &rep)
+		if err != nil {
+			log.Println(err)
+		} else {
+			sid, _ := rep["gtid"].(string)
+			eid, _ := rep["lteid"].(string)
+			if sid == "" || eid == "" {
+				log.Println("err", "sid=", sid, ",eid=", eid)
+				return
+			}
+			go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
+			log.Println("udp通知抽取id段", sid, " ", eid)
+			QuerySensitiveWords(sid, eid)
+			for _, m := range nextNodes {
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  sid,
+					"lteid": eid,
+					"stype": ObjToString(m["stype"]),
+				})
+				err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
+					IP:   net.ParseIP(m["addr"].(string)),
+					Port: IntAll(m["port"]),
+				})
+				if err != nil {
+					log.Println(err)
+				}
+			}
+			log.Println("udp通知抽取完成,eid=", eid)
+		}
+	case OP_NOOP: //下个节点回应
+		log.Println(string(data))
+	}
+}
+

+ 3 - 3
udpdataclear/udpSensitiveWords/util/word.go

@@ -7,13 +7,13 @@ import (
 	"unicode/utf8"
 )
 
-func dealWithScoreRules(name string,estype string,esindex string) (string,bool) {
+func dealWithScoreRules(name string) (string,bool) {
 	new_name,isok :="",false
-	query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name_2","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`
+	query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"`+es_index+`.name_2","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"facets":{}}`
 	//默认取最高分-分析多个分-遍历器查询
 	tmp := make(map[string]interface{})
 	json.Unmarshal([]byte(query),&tmp)
-	searchResult, err := Client_Es.Search().Index(esindex).Type(estype).Source(tmp).Do()
+	searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
 	if err != nil {
 		log.Println("从ES查询出错", err.Error())
 		return new_name,isok