// extractudp package util import ( "encoding/json" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "importcjj/sensitive" "log" mu "mfw/util" "net" qu "qfw/util" "strings" "sync" ) var task chan struct{} = make(chan struct{}, 1) var udpclient mu.UdpClient //udp对象 var nextNodes []map[string]interface{} //udp通知抽取 func SentiveUdp() { nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) //临时测试 //sid := "1fffffffffffffffffffffff" //eid := "9fffffffffffffffffffffff" //QuerySensitiveWords(sid,eid ) } var syc sync.WaitGroup //udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { task <- struct{}{} defer func() { <-task }() switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } go udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), mu.OP_NOOP, ra) log.Println("udp通知-敏感词id段", sid, " ", eid) QuerySensitiveWords(sid, eid) } case mu.OP_NOOP: //下个节点回应 log.Println(string(data)) } } //处理方法 func QuerySensitiveWords(sid, eid string) { log.Println("SensitiveWords:", sid, eid) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } var num, unum int sess := Save_Mgo.GetMgoConn() defer Save_Mgo.DestoryMongoConn(sess) iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Select(Fields).Iter() c := make(chan struct{}, 1) for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { c <- struct{}{} syc.Add(1) go handletmp(tmp, &unum, c) num++ } syc.Wait() log.Printf("处理完成:%d,更新数:%d\n", num, unum) } func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) { defer func() { <-c syc.Done() }() up := make(map[string]interface{}) id := tmp["_id"].(primitive.ObjectID).Hex() buyer := qu.ObjToString(tmp["buyer"]) agency := qu.ObjToString(tmp["agency"]) winner := qu.ObjToString(tmp["winner"]) s_winner := qu.ObjToString(tmp["s_winner"]) if buyer != "" { if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery"&& flog != "queryScore"&& flog != "queryString" { tmp["buyer"] = fname up["log"] =map[string]interface{}{ "buyer":fmt.Sprintf("%s_%s", flog, buyer), } up["buyer"] = fname } } if agency !="" { if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" { tmp["agency"] = fname up["log"] =map[string]interface{}{ "agency":fmt.Sprintf("%s_%s", flog, agency), } up["agency"] = fname } } if winner != "" { if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" { tmp["winner"] = fname up["log"] =map[string]interface{}{ "winner":fmt.Sprintf("%s_%s", flog, winner), } up["winner"] = fname } } if s_winner != "" { if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" { tmp["s_winner"] = fname up["log"] =map[string]interface{}{ "s_winner":fmt.Sprintf("%s_%s", flog, s_winner), } up["s_winner"] = fname } } if len(up) > 0 { *unum++ Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up}) } } func cheakname(name string) (up bool, log, rname string) { filter := sensitive.New() var cheaklog string //更新,匹配 if termQuery(name) { cheaklog = "termQuery" return true, cheaklog, name } rname, isok, _ ,datas := dealWithNameScoreRules(name) if len(datas) > 0 { for _, v := range datas { filter.AddWord(v["name"].(string)) } findAll := filter.FindAll(name) data := handleData(findAll) //更新,匹配 if len(data) > 0 { cheaklog = "queryString" return true, cheaklog, data } } //更新,匹配 if rname != "" && isok { cheaklog = "queryScore" return true, cheaklog, rname } return false, "", name } func termQuery(name string) bool { query := `{"query":{"bool":{"must":[{"term":{"` + Es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}` tmp := make(map[string]interface{}) json.Unmarshal([]byte(query), &tmp) searchResult, err := Client_Es.Search().Index(Es_index).Type(Es_type).Source(tmp).Do() if err != nil { log.Println("从ES查询出错", err.Error(), name) return false } else { data := make(map[string]interface{}, 1) if searchResult.Hits != nil { for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &data) if data["name"].(string) == name { return true } } } } return false } func handleData(datas []string) string { dataslen := len(datas) del := map[int]bool{} if dataslen <= 1 { rstr := strings.Join(datas, ",") return rstr } m2 := make(map[string]bool) for i, v := range datas { if m2[v] { del[i] = true } else { m2[v] = true } } for i := 0; i < dataslen; i++ { if !del[i] { for j := i + 1; j < dataslen; j++ { jdata := datas[j] idata := datas[i] if len(jdata) > len(idata) && strings.Contains(jdata, idata) { del[i] = true break } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) { del[j] = true break } } } } caplen := dataslen - len(del) rdata := make([]string, caplen, caplen) var m int for i, v := range datas { if !del[i] { rdata[m] = v m++ } } rstr := strings.Join(rdata, ",") return rstr }