udpdata.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // extractudp
  2. package util
  3. import (
  4. "context"
  5. "encoding/json"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "net"
  9. "sensitiveWords.udp/proto_grpc"
  10. "strings"
  11. "time"
  12. )
  13. var task chan struct{} = make(chan struct{}, 1)
  14. var Udpclient UdpClient //udp对象
  15. var nextNodes []map[string]interface{}
  16. //udp通知抽取
  17. func ExtractUdp() {
  18. nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
  19. Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
  20. log.Println("udp start ", Config["udpport"])
  21. Udpclient.Listen(processUdpMsg)
  22. /*//临时测试
  23. sid := "1fffffffffffffffffffffff"
  24. eid := "9fffffffffffffffffffffff"
  25. QuerySensitiveWords(sid,eid )*/
  26. }
  27. func QuerySensitiveWords(sid, eid string) {
  28. log.Println("QuerySensitiveWords:", sid, eid)
  29. objSid, err := primitive.ObjectIDFromHex(sid)
  30. if err != nil {
  31. log.Println("转换sid err", err)
  32. return
  33. }
  34. objEid, err := primitive.ObjectIDFromHex(eid)
  35. if err != nil {
  36. log.Println("转换eid err", err)
  37. return
  38. }
  39. var num int64
  40. mgoSess := QfwMgo85.GetMgoConn()
  41. defer QfwMgo85.DestoryMongoConn(mgoSess)
  42. iter := mgoSess.DB(QfwMgo85.DbName).C(Collection).Find(map[string]interface{}{
  43. "_id": map[string]interface{}{
  44. "$gte": objSid,
  45. "$lte": objEid,
  46. },
  47. }).Select(Fields).Iter()
  48. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  49. if win, isok := tmp["winner"].(string); isok {
  50. queryGrpcWinner := query_grpc(win, FindWinnerC)
  51. if queryGrpcWinner == "" {
  52. } else {
  53. tmp["winner"] = queryGrpcWinner
  54. }
  55. }
  56. if win, isok := tmp["s_winner"].(string); isok {
  57. queryGrpcWinner := query_grpc(win, FindWinnerC)
  58. if queryGrpcWinner == "" {
  59. } else {
  60. tmp["s_winner"] = queryGrpcWinner
  61. }
  62. }
  63. if agency, isok := tmp["agency"].(string); isok {
  64. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  65. if queryGrpcAgency == "" {
  66. } else {
  67. tmp["agency"] = queryGrpcAgency
  68. }
  69. }
  70. if buyer, isok := tmp["buyer"].(string); isok {
  71. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  72. if queryGrpcBuyer == "" {
  73. } else {
  74. tmp["buyer"] = queryGrpcBuyer
  75. }
  76. }
  77. num++
  78. }
  79. log.Println(sid, eid,"处理完成:", num)
  80. }
  81. //grpc - 处理
  82. func query_grpc(enterprise, findC string) string {
  83. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  84. defer cancel()
  85. var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL
  86. switch findC {
  87. case "buyer":
  88. CorpusType = proto_grpc.CorpusType_BUYER
  89. case "agency":
  90. CorpusType = proto_grpc.CorpusType_BUYER
  91. case "winner":
  92. CorpusType = proto_grpc.CorpusType_BUYER
  93. default:
  94. CorpusType = proto_grpc.CorpusType_ALL
  95. }
  96. lenc := len(QAddrs)
  97. c := make(chan map[int]string, lenc)
  98. defer close(c)
  99. for i, v := range QAddrs {
  100. go func(index int, vc *proto_grpc.SensitiveWordsClient) {
  101. sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
  102. if err != nil {
  103. log.Println(index, err)
  104. c <- map[int]string{index: ""}
  105. return
  106. }
  107. c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
  108. return
  109. }(i, v)
  110. }
  111. result := []string{}
  112. var q int
  113. for v := range c {
  114. for _, vv := range v {
  115. if vv == "" {
  116. continue
  117. }
  118. result = append(result, vv)
  119. }
  120. q++
  121. if q >= lenc {
  122. break
  123. }
  124. }
  125. rarr := handleData(result)
  126. rstr := strings.Join(rarr, ",")
  127. return rstr
  128. }
  129. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  130. task <- struct{}{}
  131. defer func() {
  132. <-task
  133. }()
  134. switch act {
  135. case OP_TYPE_DATA:
  136. var rep map[string]interface{}
  137. err := json.Unmarshal(data, &rep)
  138. if err != nil {
  139. log.Println(err)
  140. } else {
  141. sid, _ := rep["gtid"].(string)
  142. eid, _ := rep["lteid"].(string)
  143. if sid == "" || eid == "" {
  144. log.Println("err", "sid=", sid, ",eid=", eid)
  145. return
  146. }
  147. go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
  148. log.Println("udp通知抽取id段", sid, " ", eid)
  149. QuerySensitiveWords(sid, eid)
  150. for _, m := range nextNodes {
  151. by, _ := json.Marshal(map[string]interface{}{
  152. "gtid": sid,
  153. "lteid": eid,
  154. "stype": ObjToString(m["stype"]),
  155. })
  156. err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
  157. IP: net.ParseIP(m["addr"].(string)),
  158. Port: IntAll(m["port"]),
  159. })
  160. if err != nil {
  161. log.Println(err)
  162. }
  163. }
  164. log.Println("udp通知抽取完成,eid=", eid)
  165. }
  166. case OP_NOOP: //下个节点回应
  167. log.Println(string(data))
  168. }
  169. }
  170. func handleData(datas []string) []string {
  171. del := map[int]bool{}
  172. dataslen := len(datas)
  173. if dataslen == 0{
  174. return []string{}
  175. }
  176. for i := 0; i < dataslen; i++ {
  177. if !del[i] {
  178. for j := i + 1; j < dataslen; j++ {
  179. jdata := datas[j]
  180. idata := datas[i]
  181. if len(jdata) > len(idata) && strings.Contains(jdata, idata) {
  182. del[i] = true
  183. break
  184. } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) {
  185. del[j] = true
  186. break
  187. }
  188. }
  189. }
  190. }
  191. caplen := dataslen - len(del)
  192. rdata := make([]string, caplen, caplen)
  193. var m int
  194. for i, v := range datas {
  195. if !del[i] {
  196. rdata[m] = v
  197. m++
  198. }
  199. }
  200. return rdata
  201. }