udpdata.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // extractudp
  2. package util
  3. import (
  4. "context"
  5. "encoding/json"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "net"
  9. "sensitiveWords.udp/proto_grpc"
  10. "strings"
  11. "time"
  12. )
  13. var task chan struct{} = make(chan struct{}, 1)
  14. var Udpclient UdpClient //udp对象
  15. var nextNodes []map[string]interface{}
  16. //udp通知抽取
  17. func ExtractUdp() {
  18. nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
  19. Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
  20. log.Println("udp start ", Config["udpport"])
  21. Udpclient.Listen(processUdpMsg)
  22. //临时测试
  23. sid := "1fffffffffffffffffffffff"
  24. eid := "9fffffffffffffffffffffff"
  25. QuerySensitiveWords(sid,eid )
  26. }
  27. func QuerySensitiveWords(sid, eid string) {
  28. log.Println("QuerySensitiveWords:", sid, eid)
  29. objSid, err := primitive.ObjectIDFromHex(sid)
  30. if err != nil {
  31. log.Println("转换sid err", err)
  32. return
  33. }
  34. objEid, err := primitive.ObjectIDFromHex(eid)
  35. if err != nil {
  36. log.Println("转换eid err", err)
  37. return
  38. }
  39. var num int64
  40. iter := BiddingMgo.GetMgoConn().C("bidding").Find(map[string]interface{}{
  41. "_id": map[string]interface{}{
  42. "$gte": objSid,
  43. "$lte": objEid,
  44. },
  45. }).Select(Fields).Sort("_id").Iter()
  46. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  47. if win, isok := tmp["winner"].(string); isok {
  48. queryGrpcWinner := query_grpc(win, FindWinnerC)
  49. if queryGrpcWinner == "" {
  50. new_name,b :=dealWithScoreRules(win)
  51. if b {tmp["winner"] = new_name}
  52. log.Println(new_name)
  53. } else {
  54. tmp["winner"] = queryGrpcWinner
  55. }
  56. }
  57. if win, isok := tmp["s_winner"].(string); isok {
  58. queryGrpcWinner := query_grpc(win, FindWinnerC)
  59. if queryGrpcWinner == "" {
  60. new_name,b :=dealWithScoreRules(win)
  61. if b {tmp["s_winner"] = new_name}
  62. } else {
  63. tmp["s_winner"] = queryGrpcWinner
  64. }
  65. }
  66. if agency, isok := tmp["agency"].(string); isok {
  67. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  68. if queryGrpcAgency == "" {
  69. new_name,b :=dealWithScoreRules(agency)
  70. if b {tmp["agency"] = new_name}
  71. } else {
  72. tmp["agency"] = queryGrpcAgency
  73. }
  74. }
  75. if buyer, isok := tmp["buyer"].(string); isok {
  76. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  77. if queryGrpcBuyer == "" {
  78. new_name,b :=dealWithScoreRules(buyer)
  79. if b {tmp["buyer"] = new_name}
  80. } else {
  81. tmp["buyer"] = queryGrpcBuyer
  82. }
  83. }
  84. num++
  85. }
  86. iter2 := BiddingMgo.GetMgoConn().C("bidding_back").Find(map[string]interface{}{
  87. "_id": map[string]interface{}{
  88. "$gte": objSid,
  89. "$lte": objEid,
  90. },
  91. }).Select(Fields).Sort("_id").Iter()
  92. for tmp := map[string]interface{}{}; iter2.Next(&tmp); tmp = map[string]interface{}{} {
  93. if win, isok := tmp["winner"].(string); isok {
  94. queryGrpcWinner := query_grpc(win, FindWinnerC)
  95. if queryGrpcWinner == "" {
  96. } else {
  97. tmp["winner"] = queryGrpcWinner
  98. }
  99. }
  100. if win, isok := tmp["s_winner"].(string); isok {
  101. queryGrpcWinner := query_grpc(win, FindWinnerC)
  102. if queryGrpcWinner == "" {
  103. } else {
  104. tmp["s_winner"] = queryGrpcWinner
  105. }
  106. }
  107. if agency, isok := tmp["agency"].(string); isok {
  108. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  109. if queryGrpcAgency == "" {
  110. } else {
  111. tmp["agency"] = queryGrpcAgency
  112. }
  113. }
  114. if buyer, isok := tmp["buyer"].(string); isok {
  115. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  116. if queryGrpcBuyer == "" {
  117. } else {
  118. tmp["buyer"] = queryGrpcBuyer
  119. }
  120. }
  121. num++
  122. break //测试
  123. }
  124. log.Println("处理完成:", num)
  125. }
  126. //grpc - 处理
  127. func query_grpc(enterprise, findC string) string {
  128. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  129. defer cancel()
  130. var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL
  131. switch findC {
  132. case "buyer":
  133. CorpusType = proto_grpc.CorpusType_BUYER
  134. case "agency":
  135. CorpusType = proto_grpc.CorpusType_BUYER
  136. case "winner":
  137. CorpusType = proto_grpc.CorpusType_BUYER
  138. default:
  139. CorpusType = proto_grpc.CorpusType_ALL
  140. }
  141. lenc := len(QAddrs)
  142. c := make(chan map[int]string, lenc)
  143. defer close(c)
  144. for i, v := range QAddrs {
  145. go func(index int, vc *proto_grpc.SensitiveWordsClient) {
  146. sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
  147. if err != nil {
  148. log.Println(index, err)
  149. return
  150. }
  151. c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
  152. return
  153. }(i, v)
  154. }
  155. result := []string{}
  156. var q int
  157. for v := range c {
  158. for _, vv := range v {
  159. if vv == "" {
  160. continue
  161. }
  162. result = append(result,vv)
  163. }
  164. q++
  165. if q >= lenc {
  166. break
  167. }
  168. }
  169. return strings.Join(result,",")
  170. }
  171. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  172. task <- struct{}{}
  173. defer func() {
  174. <-task
  175. }()
  176. switch act {
  177. case OP_TYPE_DATA:
  178. var rep map[string]interface{}
  179. err := json.Unmarshal(data, &rep)
  180. if err != nil {
  181. log.Println(err)
  182. } else {
  183. sid, _ := rep["gtid"].(string)
  184. eid, _ := rep["lteid"].(string)
  185. if sid == "" || eid == "" {
  186. log.Println("err", "sid=", sid, ",eid=", eid)
  187. return
  188. }
  189. go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
  190. log.Println("udp通知抽取id段", sid, " ", eid)
  191. QuerySensitiveWords(sid, eid)
  192. for _, m := range nextNodes {
  193. by, _ := json.Marshal(map[string]interface{}{
  194. "gtid": sid,
  195. "lteid": eid,
  196. "stype": ObjToString(m["stype"]),
  197. })
  198. err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
  199. IP: net.ParseIP(m["addr"].(string)),
  200. Port: IntAll(m["port"]),
  201. })
  202. if err != nil {
  203. log.Println(err)
  204. }
  205. }
  206. log.Println("udp通知抽取完成,eid=", eid)
  207. }
  208. case OP_NOOP: //下个节点回应
  209. log.Println(string(data))
  210. }
  211. }