// extractudp package util import ( "encoding/json" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "log" mu "mfw/util" "net" qu "qfw/util" es7 "qfw/util/elastic_v7" "regexp" "strings" "sync" ) var task chan struct{} = make(chan struct{}, 1) var udpclient mu.UdpClient //udp对象 var nextNodes []map[string]interface{} var specReg *regexp.Regexp = regexp.MustCompile("(,|,)") // udp通知抽取 func SentiveUdp() { nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) } var syc sync.WaitGroup // udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { task <- struct{}{} defer func() { <-task }() switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) stype := qu.ObjToString(rep["stype"]) if stype == "monitor" { log.Println("收到监测......") key := qu.ObjToString(rep["key"]) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) return } if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } go udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), mu.OP_NOOP, ra) log.Println("Udp回应上节点~id段") QuerySensitiveWords(sid, eid) log.Println("...计划发送udp~统计下一节点...") } case mu.OP_NOOP: //下个节点回应 log.Println(string(data)) } } // 处理方法 func QuerySensitiveWords(sid, eid string) { log.Println("开始处理敏感词匹配:", sid, "~", eid) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } var num, unum int sess := Save_Mgo.GetMgoConn() defer Save_Mgo.DestoryMongoConn(sess) iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Select(Fields).Iter() c := make(chan struct{}, 2) for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { c <- struct{}{} syc.Add(1) go handletmp(tmp, &unum, c) num++ } syc.Wait() log.Printf("处理完成:%d,更新数:%d\n", num, unum) } func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) { defer func() { <-c syc.Done() }() up := make(map[string]interface{}) id := tmp["_id"].(primitive.ObjectID).Hex() //buyer := qu.ObjToString(tmp["buyer"]) agency := qu.ObjToString(tmp["agency"]) winner := qu.ObjToString(tmp["winner"]) s_winner := qu.ObjToString(tmp["s_winner"]) //暂时弃用 //if buyer != "" { // if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery" { // tmp["buyer"] = fname // up["log"] =map[string]interface{}{ // "buyer":fmt.Sprintf("%s_%s", flog, buyer), // } // up["buyer"] = fname // } //} if agency != "" { if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" { tmp["agency"] = fname up["log"] = map[string]interface{}{ "agency": fmt.Sprintf("%s_%s", flog, agency), } up["agency"] = fname } } if winner != "" && !specReg.MatchString(winner) { if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" { tmp["winner"] = fname up["log"] = map[string]interface{}{ "winner": fmt.Sprintf("%s_%s", flog, winner), } up["winner"] = fname } } if s_winner != "" && !specReg.MatchString(s_winner) { if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" { tmp["s_winner"] = fname up["log"] = map[string]interface{}{ "s_winner": fmt.Sprintf("%s_%s", flog, s_winner), } up["s_winner"] = fname } } if len(up) > 0 { *unum++ Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up}) } } func cheakname(name string) (up bool, log, rname string) { //filter := sensitive.New() var cheaklog string //更新,匹配 if termQuery(name) { cheaklog = "termQuery" return true, cheaklog, name } rname, isok, _, _ := dealWithNameScoreRules(name) //if len(datas) > 0 { // for _, v := range datas { // if qu.Float64All(v["score"]) < 1.0 { // break // } // if utf8.RuneCountInString(v["name"].(string))>4 { // filter.AddWord(v["name"].(string)) // } // } // findAll := filter.FindAll(name) // data := handleData(findAll) // //更新,匹配 // if len(data) > 0 { // cheaklog = "queryString" // return true, cheaklog, data // } //} //更新,匹配 if rname != "" && isok { cheaklog = "queryScore" return true, cheaklog, rname } return false, "", name } func termQuery(name string) bool { query := `{"query":{"bool":{"must":[{"term":{"company_name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"aggs":{}}` res := es7.Get(Qyxy_Es_index, query) if res != nil { if len(*res) > 0 { return true } } //tmp := make(map[string]interface{}) //json.Unmarshal([]byte(query), &tmp) //Qyxy_Client_Es := es7.GetEsConn() //defer es7.DestoryEsConn(Qyxy_Client_Es) //searchResult, err := Qyxy_Client_Es.Search().Index(Qyxy_Es_index).Type(Qyxy_Es_type).Source(tmp).Do(context.TODO()) //if err != nil { // log.Println("从ES查询出错", err.Error(), name) // return false //} else { // data := make(map[string]interface{}, 1) // if searchResult.Hits != nil { // for _, hit := range searchResult.Hits.Hits { // json.Unmarshal(hit.Source, &data) // if data["name"].(string) == name { // return true // } // } // } //} return false } func handleData(datas []string) string { dataslen := len(datas) del := map[int]bool{} if dataslen <= 1 { rstr := strings.Join(datas, ",") return rstr } m2 := make(map[string]bool) for i, v := range datas { if m2[v] { del[i] = true } else { m2[v] = true } } for i := 0; i < dataslen; i++ { if !del[i] { for j := i + 1; j < dataslen; j++ { jdata := datas[j] idata := datas[i] if len(jdata) > len(idata) && strings.Contains(jdata, idata) { del[i] = true break } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) { del[j] = true break } } } } caplen := dataslen - len(del) rdata := make([]string, caplen, caplen) var m int for i, v := range datas { if !del[i] { rdata[m] = v m++ } } rstr := strings.Join(rdata, ",") return rstr } // 流式修补... func FlowHandleInfo(msgInfo *MsgInfo) { if data := msgInfo.Data; data != nil { if tmp := *qu.ObjToMap(data["ext"]); tmp != nil { up := make(map[string]interface{}) id := BsonTOStringId(tmp["_id"]) agency := qu.ObjToString(tmp["agency"]) winner := qu.ObjToString(tmp["winner"]) s_winner := qu.ObjToString(tmp["s_winner"]) if agency != "" { if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" { tmp["agency"] = fname up["log"] = map[string]interface{}{ "agency": fmt.Sprintf("%s_%s", flog, agency), } up["agency"] = fname } } if winner != "" && !specReg.MatchString(winner) { if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" { tmp["winner"] = fname up["log"] = map[string]interface{}{ "winner": fmt.Sprintf("%s_%s", flog, winner), } up["winner"] = fname } } if s_winner != "" && !specReg.MatchString(s_winner) { if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" { tmp["s_winner"] = fname up["log"] = map[string]interface{}{ "s_winner": fmt.Sprintf("%s_%s", flog, s_winner), } up["s_winner"] = fname } } if len(up) > 0 { for k, v := range up { tmp[k] = v } data["ext"] = tmp msgInfo.Data = data Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up}) } } } }