Procházet zdrojové kódy

备份:敏感词程序,抽取程序

zhengkun před 3 roky
rodič
revize
1409d5b5c1

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 119 - 1613
data_monitoring/listen_data/src/zkmethod.go


+ 6 - 12
udpdataclear/udpSensitiveWords/config.json

@@ -1,10 +1,10 @@
 {
-  "85_mgo_addr": "192.168.3.207:27092",
+  "85_mgo_addr": "127.0.0.1:27017",
   "mongodbPoolSize": 10,
-  "collection": "result_20210109",
-  "163_mgo_addr": "172.17.4.187:27082,172.17.145.163:27083",
-  "163_userName": "fengweiqiang",
-  "163_passWord": "fwq@123123",
+  "collection": "zk_sensitive_test_data",
+  "163_mgo_addr": "127.0.0.1:27017",
+  "163_userName": "",
+  "163_passWord": "",
   "fields": {
     "buyer": 1,
     "agency": 1,
@@ -15,13 +15,7 @@
   "agency_c": "",
   "winner_c": "",
   "udpport": "1782",
-  "nextNode": [
-    {
-      "addr": "127.0.0.1",
-      "port": 88888,
-      "memo": "抽取城市"
-    }
-  ],
+  "nextNode": [],
   "userName": "",
   "passWord": "",
   "client_es": "http://ela.spdata.jianyu360.com",

+ 1 - 8
udpdataclear/udpSensitiveWords/main.go

@@ -1,21 +1,14 @@
 package main
 
 import (
-	"log"
 	"sensitiveWords.udp/util"
 )
 
 func init() {
-	log.Println("what init")
-	return
 	util.InitC()
 }
 func main() {
-	log.Println("what main")
-	return
-	//util.TestData()
-	//return
-	go util.AddTaskSensitiveWordsData() //增量
+	//go util.AddTaskSensitiveWordsData() //增量
 	// 主函数中添加
 	util.ExtractUdp() //udp通知抽取
 	lock := make(chan bool)

+ 58 - 0
udpdataclear/udpSensitiveWords/mark

@@ -0,0 +1,58 @@
+{
+  "85_mgo_addr": "192.168.3.207:27092",
+  "mongodbPoolSize": 10,
+  "collection": "result_20210109",
+  "163_mgo_addr": "172.17.4.187:27082,172.17.145.163:27083",
+  "163_userName": "zhengkun",
+  "163_passWord": "zk@123123",
+  "fields": {
+    "buyer": 1,
+    "agency": 1,
+    "winner": 1,
+    "s_winner": 1
+  },
+  "buyer_c": "",
+  "agency_c": "",
+  "winner_c": "",
+  "udpport": "1782",
+  "nextNode": [
+    {
+      "addr": "127.0.0.1",
+      "port": 88888,
+      "memo": "抽取城市"
+    }
+  ],
+  "userName": "",
+  "passWord": "",
+  "client_es": "http://ela.spdata.jianyu360.com",
+  "es_type": "unique_qy",
+  "es_index": "unique_qy"
+}
+
+
+if win, isok := tmp["winner"].(string); isok {
+		if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
+			tmp["winner"] = fname
+			up["winner"] = fmt.Sprintf("%s_%s", flog, win)
+		}
+	}
+	if win, isok := tmp["s_winner"].(string); isok {
+		if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
+			tmp["s_winner"] = fname
+			up["s_winner"] = fmt.Sprintf("%s_%s", flog, win)
+		}
+	}
+
+	if agency, isok := tmp["agency"].(string); isok {
+		if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "tremQuery" {
+			tmp["agency"] = fname
+			up["agency"] = fmt.Sprintf("%s_%s", flog, agency)
+		}
+	}
+
+	if buyer, isok := tmp["buyer"].(string); isok {
+		if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "tremQuery"&& flog != "queryScore"&& flog != "queryString" {
+			tmp["buyer"] = fname
+			up["buyer"] = fmt.Sprintf("%s_%s", flog, buyer)
+		}
+	}

+ 29 - 1
udpdataclear/udpSensitiveWords/util/config.go

@@ -1,6 +1,7 @@
 package util
 
 import (
+	"github.com/importcjj/sensitive"
 	"gopkg.in/olivere/elastic.v1"
 	"log"
 	"net/http"
@@ -24,6 +25,15 @@ func InitC() {
 	}
 	QfwMgo85.InitPool()
 
+	QfwMgo163 = &MongodbSim{
+		MongodbAddr: Config["163_mgo_addr"].(string),
+		Size:        IntAll(Config["mongodbPoolSize"]),
+		DbName:      "qfw",
+		UserName:    Config["163_userName"].(string),
+		PassWord:    Config["163_passWord"].(string),
+	}
+	QfwMgo163.InitPool()
+
 	//Client_Es, _ = elastic.NewClient(http.DefaultClient, "http://192.168.3.11:9800")"http://ela.spdata.jianyu360.com"
 	Client_Es, _ = elastic.NewClient(http.DefaultClient, Config["client_es"].(string))
 
@@ -33,12 +43,30 @@ func InitC() {
 	FindBuyerC, FindAgencyC, FindWinnerC = Config["buyer_c"].(string), Config["agency_c"].(string), Config["winner_c"].(string)
 	Collection = Config["collection"].(string)
 
+	//临时加载敏感词
+	sess := QfwMgo163.GetMgoConn()
+	defer QfwMgo163.DestoryMongoConn(sess)
+	q,total:=map[string]interface{}{
+		"enabled":1,
+	},0
+	it := sess.DB(QfwMgo163.DbName).C("zk_sensitive_buyer").Find(&q).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp);total++{
+		if total%100000==0 {
+			log.Println("current index ",total)
+		}
+		name:=ObjToString(tmp["name"])
+		BuyerFilter.AddWord(name)
+		tmp = make(map[string]interface{})
+	}
+	log.Println("buyer_sensitive load over",total)
 }
 
 var Config map[string]interface{}
-var QfwMgo85 *MongodbSim
+var QfwMgo85  *MongodbSim
+var QfwMgo163 *MongodbSim
 var Collection string
 var Fields map[string]interface{}
 var FindBuyerC, FindAgencyC, FindWinnerC string
 var es_type, es_index string
 var Client_Es *elastic.Client
+var BuyerFilter = sensitive.New()

+ 88 - 46
udpdataclear/udpSensitiveWords/util/udpdata.go

@@ -26,33 +26,30 @@ func ExtractUdp() {
 	Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
 	log.Println("udp start ", Config["udpport"])
 	Udpclient.Listen(processUdpMsg)
-	/*//临时测试
+	//临时测试
 	sid := "1fffffffffffffffffffffff"
 	eid := "9fffffffffffffffffffffff"
-	QuerySensitiveWords(sid,eid )*/
+	QuerySensitiveWords(sid,eid )
+
+
+
 }
 var syc sync.WaitGroup
+
+//处理方法
 func QuerySensitiveWords(sid, eid string) {
-	log.Println("QuerySensitiveWords:", sid, eid)
-	objSid, err := primitive.ObjectIDFromHex(sid)
-	if err != nil {
-		log.Println("转换sid err", err)
-		return
-	}
-	objEid, err := primitive.ObjectIDFromHex(eid)
-	if err != nil {
-		log.Println("转换eid err", err)
-		return
+	log.Println("SensitiveWords:", sid, eid)
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(sid),
+			"$lte": StringTOBsonId(eid),
+		},
 	}
 	var num, unum int
 	mgoSess := QfwMgo85.GetMgoConn()
 	defer QfwMgo85.DestoryMongoConn(mgoSess)
-	iter := mgoSess.DB(QfwMgo85.DbName).C(Collection).Find(map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gte": objSid,
-			"$lte": objEid,
-		},
-	}).Select(Fields).Iter()
+	iter := mgoSess.DB(QfwMgo85.DbName).C(Collection).Find(&q).Select(Fields).Iter()
+
 	c := make(chan struct{}, 1)
 	for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
 		c <- struct{}{}
@@ -61,54 +58,80 @@ func QuerySensitiveWords(sid, eid string) {
 		num++
 	}
 	syc.Wait()
-	log.Printf("%s--->%s 处理完成:%d,更新数:%d\n", sid, eid, num, unum)
+	log.Printf("处理完成:%d,更新数:%d\n", num, unum)
 }
 func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
 	defer func() {
 		<-c
 		syc.Done()
 	}()
-	up := make(map[string]string)
-	if win, isok := tmp["winner"].(string); isok {
-		if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
-			tmp["winner"] = fname
-			up["winner"] = fmt.Sprintf("%s_%s", flog, win)
+	up := make(map[string]interface{})
+	id := tmp["_id"].(primitive.ObjectID).Hex()
+	buyer := ObjToString(tmp["buyer"])
+	agency := ObjToString(tmp["agency"])
+	winner := ObjToString(tmp["winner"])
+	s_winner := ObjToString(tmp["s_winner"])
+
+
+	if buyer != "" {
+		if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery"&& flog != "queryScore"&& flog != "queryString" {
+			tmp["buyer"] = fname
+			up["log"] =map[string]interface{}{
+				"buyer":fmt.Sprintf("%s_%s", flog, buyer),
+			}
+			up["buyer"] = fname
 		}
-	}
-	if win, isok := tmp["s_winner"].(string); isok {
-		if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
-			tmp["s_winner"] = fname
-			up["s_winner"] = fmt.Sprintf("%s_%s", flog, win)
+	}else {
+		if fok, flog, fname := cheakbuyername(buyer,id); fok&&fname!=""{
+			tmp["buyer_sensitive"] = fname
+			up["log"] =map[string]interface{}{
+				"buyer":fmt.Sprintf("%s", flog),
+			}
+			up["buyer_test"] = fname
 		}
 	}
 
-	if agency, isok := tmp["agency"].(string); isok {
-		if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "tremQuery" {
+	if agency !="" {
+		if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
 			tmp["agency"] = fname
-			up["agency"] = fmt.Sprintf("%s_%s", flog, agency)
+			up["log"] =map[string]interface{}{
+				"agency":fmt.Sprintf("%s_%s", flog, agency),
+			}
+			up["agency"] = fname
 		}
 	}
 
-	if buyer, isok := tmp["buyer"].(string); isok {
-		if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "tremQuery"&& flog != "queryScore"&& flog != "queryString" {
-			tmp["buyer"] = fname
-			up["buyer"] = fmt.Sprintf("%s_%s", flog, buyer)
+	if winner != "" {
+		if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
+			tmp["winner"] = fname
+			up["log"] =map[string]interface{}{
+				"winner":fmt.Sprintf("%s_%s", flog, winner),
+			}
+			up["winner"] = fname
+		}
+	}
+
+	if s_winner != "" {
+		if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
+			tmp["s_winner"] = fname
+			up["log"] =map[string]interface{}{
+				"s_winner":fmt.Sprintf("%s_%s", flog, s_winner),
+			}
+			up["s_winner"] = fname
 		}
 	}
+
 	if len(up) > 0 {
 		*unum++
-		tmp["log"] = up
-		id := tmp["_id"].(primitive.ObjectID).Hex()
-		//log.Println(tmp)
-		QfwMgo85.UpdateById(Collection, id, map[string]interface{}{"$set": tmp})
+		QfwMgo85.UpdateById(Collection, id, map[string]interface{}{"$set": up})
 	}
 }
 func cheakname(name string) (up bool, log, rname string) {
 	filter := sensitive.New()
 	var cheaklog string
 	//更新,匹配
-	if tremQuery(name) {
-		cheaklog = "tremQuery"
+	if termQuery(name) {
+		cheaklog = "termQuery"
 		return true, cheaklog, name
 	}
 
@@ -133,8 +156,7 @@ func cheakname(name string) (up bool, log, rname string) {
 
 	return false, "", name
 }
-
-func tremQuery(name string) bool {
+func termQuery(name string) bool {
 	query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
 	tmp := make(map[string]interface{})
 	json.Unmarshal([]byte(query), &tmp)
@@ -196,7 +218,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		log.Println(string(data))
 	}
 }
-
 func handleData(datas []string) string {
 	dataslen := len(datas)
 	del := map[int]bool{}
@@ -295,7 +316,6 @@ func AddTaskSensitiveWordsData() {
 		log.Println("tick ok", cronData)
 	}
 }
-
 //处理是否新增es
 func dealWithEsData(name string, tmpid string) {
 	query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
@@ -328,6 +348,28 @@ func dealWithEsData(name string, tmpid string) {
 
 
 
+//处理敏感采购
+func cheakbuyername(name string,tmpid string) (up bool, log, rname string) {
+
+	tmp := QfwMgo163.FindById("zk_sensitive_test_data",tmpid)
+	title:=ObjToString(tmp["title"])
+	//detail:=ObjToString(tmp["detail"])
+
+	dataArr := BuyerFilter.FindAll(title)
+	if dataArr!=nil && len(dataArr)>0 {
+		new_name := ObjToString(dataArr[0])
+		return true,"buyer sensitive",new_name
+	}
+
+	//dataArr = BuyerFilter.FindAll(detail)
+	//if dataArr!=nil && len(dataArr)>0 {
+	//	new_name := ObjToString(dataArr[0])
+	//	return true,"buyer sensitive",new_name
+	//}
+
+	return false, "", name
+}
+
 
 
 

Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů