// extractudp package util import ( "context" "encoding/json" "go.mongodb.org/mongo-driver/bson/primitive" "log" "net" "sensitiveWords.udp/proto_grpc" "strings" "time" ) var task chan struct{} = make(chan struct{}, 1) var Udpclient UdpClient //udp对象 var nextNodes []map[string]interface{} //udp通知抽取 func ExtractUdp() { nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{})) Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024} log.Println("udp start ", Config["udpport"]) Udpclient.Listen(processUdpMsg) /*//临时测试 sid := "1fffffffffffffffffffffff" eid := "9fffffffffffffffffffffff" QuerySensitiveWords(sid,eid )*/ } func QuerySensitiveWords(sid, eid string) { log.Println("QuerySensitiveWords:", sid, eid) objSid, err := primitive.ObjectIDFromHex(sid) if err != nil { log.Println("转换sid err", err) return } objEid, err := primitive.ObjectIDFromHex(eid) if err != nil { log.Println("转换eid err", err) return } var num int64 iter := QfwMgo85.GetMgoConn().C(Collection).Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": objSid, "$lte": objEid, }, }).Select(Fields).Iter() for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { if win, isok := tmp["winner"].(string); isok { queryGrpcWinner := query_grpc(win, FindWinnerC) if queryGrpcWinner == "" { } else { tmp["winner"] = queryGrpcWinner } } if win, isok := tmp["s_winner"].(string); isok { queryGrpcWinner := query_grpc(win, FindWinnerC) if queryGrpcWinner == "" { } else { tmp["s_winner"] = queryGrpcWinner } } if agency, isok := tmp["agency"].(string); isok { queryGrpcAgency := query_grpc(agency, FindAgencyC) if queryGrpcAgency == "" { } else { tmp["agency"] = queryGrpcAgency } } if buyer, isok := tmp["buyer"].(string); isok { queryGrpcBuyer := query_grpc(buyer, FindBuyerC) if queryGrpcBuyer == "" { } else { tmp["buyer"] = queryGrpcBuyer } } num++ } log.Println("处理完成:", num) } //grpc - 处理 func query_grpc(enterprise, findC string) string { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL switch findC { case "buyer": CorpusType = proto_grpc.CorpusType_BUYER case "agency": CorpusType = proto_grpc.CorpusType_BUYER case "winner": CorpusType = proto_grpc.CorpusType_BUYER default: CorpusType = proto_grpc.CorpusType_ALL } lenc := len(QAddrs) c := make(chan map[int]string, lenc) defer close(c) for i, v := range QAddrs { go func(index int, vc *proto_grpc.SensitiveWordsClient) { sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType}) if err != nil { log.Println(index, err) return } c <- map[int]string{index: sensitiveWords.GetSensitiveWords()} return }(i, v) } result := []string{} var q int for v := range c { for _, vv := range v { if vv == "" { continue } result = append(result,vv) } q++ if q >= lenc { break } } return strings.Join(result,",") } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { task <- struct{}{} defer func() { <-task }() switch act { case OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra) log.Println("udp通知抽取id段", sid, " ", eid) QuerySensitiveWords(sid, eid) for _, m := range nextNodes { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": ObjToString(m["stype"]), }) err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: IntAll(m["port"]), }) if err != nil { log.Println(err) } } log.Println("udp通知抽取完成,eid=", eid) } case OP_NOOP: //下个节点回应 log.Println(string(data)) } }