udputil.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // extractudp
  2. package util
  3. import (
  4. "context"
  5. "encoding/json"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "net"
  9. "sensitiveWords.udp/proto_grpc"
  10. "strings"
  11. "time"
  12. )
  13. var Udpclient UdpClient //udp对象
  14. var nextNodes []map[string]interface{}
  15. //udp通知抽取
  16. func ExtractUdp() {
  17. nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
  18. Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
  19. log.Println("udp start ", Config["udpport"])
  20. Udpclient.Listen(processUdpMsg)
  21. sid := "1fffffffffffffffffffffff"
  22. eid := "9fffffffffffffffffffffff"
  23. log.Println(sid,eid)
  24. //QuerySensitiveWords(sid,eid )
  25. }
  26. var task chan struct{} = make(chan struct{}, 1)
  27. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  28. task <- struct{}{}
  29. defer func() {
  30. <-task
  31. }()
  32. switch act {
  33. case OP_TYPE_DATA:
  34. var rep map[string]interface{}
  35. err := json.Unmarshal(data, &rep)
  36. if err != nil {
  37. log.Println(err)
  38. } else {
  39. sid, _ := rep["gtid"].(string)
  40. eid, _ := rep["lteid"].(string)
  41. if sid == "" || eid == "" {
  42. log.Println("err", "sid=", sid, ",eid=", eid)
  43. return
  44. }
  45. go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
  46. log.Println("udp通知抽取id段", sid, " ", eid)
  47. QuerySensitiveWords(sid, eid)
  48. for _, m := range nextNodes {
  49. by, _ := json.Marshal(map[string]interface{}{
  50. "gtid": sid,
  51. "lteid": eid,
  52. "stype": ObjToString(m["stype"]),
  53. })
  54. err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
  55. IP: net.ParseIP(m["addr"].(string)),
  56. Port: IntAll(m["port"]),
  57. })
  58. if err != nil {
  59. log.Println(err)
  60. }
  61. }
  62. log.Println("udp通知抽取完成,eid=", eid)
  63. }
  64. case OP_NOOP: //下个节点回应
  65. log.Println(string(data))
  66. }
  67. }
  68. func QuerySensitiveWords(sid, eid string) {
  69. log.Println("QuerySensitiveWords:", sid, eid)
  70. objSid, err := primitive.ObjectIDFromHex(sid)
  71. if err != nil {
  72. log.Println("转换sid err", err)
  73. return
  74. }
  75. objEid, err := primitive.ObjectIDFromHex(eid)
  76. if err != nil {
  77. log.Println("转换eid err", err)
  78. return
  79. }
  80. var num int64
  81. iter := BiddingMgo.GetMgoConn().C("bidding").Find(map[string]interface{}{
  82. "_id": map[string]interface{}{
  83. "$gte": objSid,
  84. "$lte": objEid,
  85. },
  86. }).Select(Fields).Sort("_id").Iter()
  87. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  88. log.Println(tmp["_id"])
  89. if win, isok := tmp["winner"].(string); isok {
  90. queryGrpcWinner := query_grpc(win, FindWinnerC)
  91. if queryGrpcWinner == "" {
  92. } else {
  93. tmp["winner"] = queryGrpcWinner
  94. }
  95. }
  96. if win, isok := tmp["s_winner"].(string); isok {
  97. queryGrpcWinner := query_grpc(win, FindWinnerC)
  98. if queryGrpcWinner == "" {
  99. } else {
  100. tmp["s_winner"] = queryGrpcWinner
  101. }
  102. }
  103. if agency, isok := tmp["agency"].(string); isok {
  104. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  105. if queryGrpcAgency == "" {
  106. } else {
  107. tmp["agency"] = queryGrpcAgency
  108. }
  109. }
  110. if buyer, isok := tmp["buyer"].(string); isok {
  111. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  112. if queryGrpcBuyer == "" {
  113. } else {
  114. tmp["buyer"] = queryGrpcBuyer
  115. }
  116. }
  117. num++
  118. }
  119. iter2 := BiddingMgo.GetMgoConn().C("bidding_back").Find(map[string]interface{}{
  120. "_id": map[string]interface{}{
  121. "$gte": objSid,
  122. "$lte": objEid,
  123. },
  124. }).Select(Fields).Sort("_id").Iter()
  125. for tmp := map[string]interface{}{}; iter2.Next(&tmp); tmp = map[string]interface{}{} {
  126. if win, isok := tmp["winner"].(string); isok {
  127. queryGrpcWinner := query_grpc(win, FindWinnerC)
  128. if queryGrpcWinner == "" {
  129. } else {
  130. tmp["winner"] = queryGrpcWinner
  131. }
  132. }
  133. if win, isok := tmp["s_winner"].(string); isok {
  134. queryGrpcWinner := query_grpc(win, FindWinnerC)
  135. if queryGrpcWinner == "" {
  136. } else {
  137. tmp["s_winner"] = queryGrpcWinner
  138. }
  139. }
  140. if agency, isok := tmp["agency"].(string); isok {
  141. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  142. if queryGrpcAgency == "" {
  143. } else {
  144. tmp["agency"] = queryGrpcAgency
  145. }
  146. }
  147. if buyer, isok := tmp["buyer"].(string); isok {
  148. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  149. if queryGrpcBuyer == "" {
  150. } else {
  151. tmp["buyer"] = queryGrpcBuyer
  152. }
  153. }
  154. num++
  155. }
  156. log.Println("处理完成:", num)
  157. }
  158. func query_grpc(enterprise, findC string) string {
  159. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  160. defer cancel()
  161. var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL
  162. switch findC {
  163. case "buyer":
  164. CorpusType = proto_grpc.CorpusType_BUYER
  165. case "agency":
  166. CorpusType = proto_grpc.CorpusType_BUYER
  167. case "winner":
  168. CorpusType = proto_grpc.CorpusType_BUYER
  169. default:
  170. CorpusType = proto_grpc.CorpusType_ALL
  171. }
  172. lenc := len(QAddrs)
  173. c := make(chan map[int]string, lenc)
  174. defer close(c)
  175. for i, v := range QAddrs {
  176. go func(index int, vc *proto_grpc.SensitiveWordsClient) {
  177. sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
  178. if err != nil {
  179. log.Println(index, err)
  180. return
  181. }
  182. c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
  183. return
  184. }(i, v)
  185. }
  186. result := []string{}
  187. var q int
  188. for v := range c {
  189. for _, vv := range v {
  190. if vv == "" {
  191. continue
  192. }
  193. result = append(result,vv)
  194. }
  195. q++
  196. if q >= lenc {
  197. break
  198. }
  199. }
  200. return strings.Join(result,",")
  201. }