123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- // extractudp
- package util
- import (
- "encoding/json"
- "fmt"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- es7 "qfw/util/elastic_v7"
- "regexp"
- "strings"
- "sync"
- )
- var task chan struct{} = make(chan struct{}, 1)
- var udpclient mu.UdpClient //udp对象
- var nextNodes []map[string]interface{}
- var specReg *regexp.Regexp = regexp.MustCompile("(,|,)")
- // udp通知抽取
- func SentiveUdp() {
- nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- }
- var syc sync.WaitGroup
- // udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- task <- struct{}{}
- defer func() {
- <-task
- }()
- switch act {
- case mu.OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- stype := qu.ObjToString(rep["stype"])
- if stype == "monitor" {
- log.Println("收到监测......")
- key := qu.ObjToString(rep["key"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- return
- }
- if sid == "" || eid == "" {
- log.Println("err", "sid=", sid, ",eid=", eid)
- return
- }
- go udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), mu.OP_NOOP, ra)
- log.Println("Udp回应上节点~id段")
- QuerySensitiveWords(sid, eid)
- log.Println("...计划发送udp~统计下一节点...")
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println(string(data))
- }
- }
- // 处理方法
- func QuerySensitiveWords(sid, eid string) {
- log.Println("开始处理敏感词匹配:", sid, "~", eid)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(sid),
- "$lte": StringTOBsonId(eid),
- },
- }
- var num, unum int
- sess := Save_Mgo.GetMgoConn()
- defer Save_Mgo.DestoryMongoConn(sess)
- iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Select(Fields).Iter()
- c := make(chan struct{}, 2)
- for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
- c <- struct{}{}
- syc.Add(1)
- go handletmp(tmp, &unum, c)
- num++
- }
- syc.Wait()
- log.Printf("处理完成:%d,更新数:%d\n", num, unum)
- }
- func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
- defer func() {
- <-c
- syc.Done()
- }()
- up := make(map[string]interface{})
- id := tmp["_id"].(primitive.ObjectID).Hex()
- //buyer := qu.ObjToString(tmp["buyer"])
- agency := qu.ObjToString(tmp["agency"])
- winner := qu.ObjToString(tmp["winner"])
- s_winner := qu.ObjToString(tmp["s_winner"])
- //暂时弃用
- //if buyer != "" {
- // if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery" {
- // tmp["buyer"] = fname
- // up["log"] =map[string]interface{}{
- // "buyer":fmt.Sprintf("%s_%s", flog, buyer),
- // }
- // up["buyer"] = fname
- // }
- //}
- if agency != "" {
- if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
- tmp["agency"] = fname
- up["log"] = map[string]interface{}{
- "agency": fmt.Sprintf("%s_%s", flog, agency),
- }
- up["agency"] = fname
- }
- }
- if winner != "" && !specReg.MatchString(winner) {
- if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
- tmp["winner"] = fname
- up["log"] = map[string]interface{}{
- "winner": fmt.Sprintf("%s_%s", flog, winner),
- }
- up["winner"] = fname
- }
- }
- if s_winner != "" && !specReg.MatchString(s_winner) {
- if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
- tmp["s_winner"] = fname
- up["log"] = map[string]interface{}{
- "s_winner": fmt.Sprintf("%s_%s", flog, s_winner),
- }
- up["s_winner"] = fname
- }
- }
- if len(up) > 0 {
- *unum++
- Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up})
- }
- }
- func cheakname(name string) (up bool, log, rname string) {
- //filter := sensitive.New()
- var cheaklog string
- //更新,匹配
- if termQuery(name) {
- cheaklog = "termQuery"
- return true, cheaklog, name
- }
- rname, isok, _, _ := dealWithNameScoreRules(name)
- //if len(datas) > 0 {
- // for _, v := range datas {
- // if qu.Float64All(v["score"]) < 1.0 {
- // break
- // }
- // if utf8.RuneCountInString(v["name"].(string))>4 {
- // filter.AddWord(v["name"].(string))
- // }
- // }
- // findAll := filter.FindAll(name)
- // data := handleData(findAll)
- // //更新,匹配
- // if len(data) > 0 {
- // cheaklog = "queryString"
- // return true, cheaklog, data
- // }
- //}
- //更新,匹配
- if rname != "" && isok {
- cheaklog = "queryScore"
- return true, cheaklog, rname
- }
- return false, "", name
- }
- func termQuery(name string) bool {
- query := `{"query":{"bool":{"must":[{"term":{"company_name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"aggs":{}}`
- res := es7.Get(Qyxy_Es_index, query)
- if res != nil {
- if len(*res) > 0 {
- return true
- }
- }
- //tmp := make(map[string]interface{})
- //json.Unmarshal([]byte(query), &tmp)
- //Qyxy_Client_Es := es7.GetEsConn()
- //defer es7.DestoryEsConn(Qyxy_Client_Es)
- //searchResult, err := Qyxy_Client_Es.Search().Index(Qyxy_Es_index).Type(Qyxy_Es_type).Source(tmp).Do(context.TODO())
- //if err != nil {
- // log.Println("从ES查询出错", err.Error(), name)
- // return false
- //} else {
- // data := make(map[string]interface{}, 1)
- // if searchResult.Hits != nil {
- // for _, hit := range searchResult.Hits.Hits {
- // json.Unmarshal(hit.Source, &data)
- // if data["name"].(string) == name {
- // return true
- // }
- // }
- // }
- //}
- return false
- }
- func handleData(datas []string) string {
- dataslen := len(datas)
- del := map[int]bool{}
- if dataslen <= 1 {
- rstr := strings.Join(datas, ",")
- return rstr
- }
- m2 := make(map[string]bool)
- for i, v := range datas {
- if m2[v] {
- del[i] = true
- } else {
- m2[v] = true
- }
- }
- for i := 0; i < dataslen; i++ {
- if !del[i] {
- for j := i + 1; j < dataslen; j++ {
- jdata := datas[j]
- idata := datas[i]
- if len(jdata) > len(idata) && strings.Contains(jdata, idata) {
- del[i] = true
- break
- } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) {
- del[j] = true
- break
- }
- }
- }
- }
- caplen := dataslen - len(del)
- rdata := make([]string, caplen, caplen)
- var m int
- for i, v := range datas {
- if !del[i] {
- rdata[m] = v
- m++
- }
- }
- rstr := strings.Join(rdata, ",")
- return rstr
- }
|