123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- // extractudp
- package util
- import (
- "context"
- "encoding/json"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- "net"
- "sensitiveWords.udp/proto_grpc"
- "strings"
- "time"
- )
- var task chan struct{} = make(chan struct{}, 1)
- var Udpclient UdpClient //udp对象
- var nextNodes []map[string]interface{}
- //udp通知抽取
- func ExtractUdp() {
- nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
- Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
- log.Println("udp start ", Config["udpport"])
- Udpclient.Listen(processUdpMsg)
- /*//临时测试
- sid := "1fffffffffffffffffffffff"
- eid := "9fffffffffffffffffffffff"
- QuerySensitiveWords(sid,eid )*/
- }
- func QuerySensitiveWords(sid, eid string) {
- log.Println("QuerySensitiveWords:", sid, eid)
- objSid, err := primitive.ObjectIDFromHex(sid)
- if err != nil {
- log.Println("转换sid err", err)
- return
- }
- objEid, err := primitive.ObjectIDFromHex(eid)
- if err != nil {
- log.Println("转换eid err", err)
- return
- }
- var num int64
- iter := QfwMgo85.GetMgoConn().C(Collection).Find(map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": objSid,
- "$lte": objEid,
- },
- }).Select(Fields).Iter()
- for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
- if win, isok := tmp["winner"].(string); isok {
- queryGrpcWinner := query_grpc(win, FindWinnerC)
- if queryGrpcWinner == "" {
- } else {
- tmp["winner"] = queryGrpcWinner
- }
- }
- if win, isok := tmp["s_winner"].(string); isok {
- queryGrpcWinner := query_grpc(win, FindWinnerC)
- if queryGrpcWinner == "" {
- } else {
- tmp["s_winner"] = queryGrpcWinner
- }
- }
- if agency, isok := tmp["agency"].(string); isok {
- queryGrpcAgency := query_grpc(agency, FindAgencyC)
- if queryGrpcAgency == "" {
- } else {
- tmp["agency"] = queryGrpcAgency
- }
- }
- if buyer, isok := tmp["buyer"].(string); isok {
- queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
- if queryGrpcBuyer == "" {
- } else {
- tmp["buyer"] = queryGrpcBuyer
- }
- }
- num++
- }
- log.Println("处理完成:", num)
- }
- //grpc - 处理
- func query_grpc(enterprise, findC string) string {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
- defer cancel()
- var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL
- switch findC {
- case "buyer":
- CorpusType = proto_grpc.CorpusType_BUYER
- case "agency":
- CorpusType = proto_grpc.CorpusType_BUYER
- case "winner":
- CorpusType = proto_grpc.CorpusType_BUYER
- default:
- CorpusType = proto_grpc.CorpusType_ALL
- }
- lenc := len(QAddrs)
- c := make(chan map[int]string, lenc)
- defer close(c)
- for i, v := range QAddrs {
- go func(index int, vc *proto_grpc.SensitiveWordsClient) {
- sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
- if err != nil {
- log.Println(index, err)
- return
- }
- c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
- return
- }(i, v)
- }
- result := []string{}
- var q int
- for v := range c {
- for _, vv := range v {
- if vv == "" {
- continue
- }
- result = append(result,vv)
- }
- q++
- if q >= lenc {
- break
- }
- }
- return strings.Join(result,",")
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- task <- struct{}{}
- defer func() {
- <-task
- }()
- switch act {
- case OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- if sid == "" || eid == "" {
- log.Println("err", "sid=", sid, ",eid=", eid)
- return
- }
- go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
- log.Println("udp通知抽取id段", sid, " ", eid)
- QuerySensitiveWords(sid, eid)
- for _, m := range nextNodes {
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": ObjToString(m["stype"]),
- })
- err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(m["addr"].(string)),
- Port: IntAll(m["port"]),
- })
- if err != nil {
- log.Println(err)
- }
- }
- log.Println("udp通知抽取完成,eid=", eid)
- }
- case OP_NOOP: //下个节点回应
- log.Println(string(data))
- }
- }
|