udputil.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // extractudp
  2. package util
  3. import (
  4. "context"
  5. "encoding/json"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "net"
  9. "sensitiveWords.udp/proto_grpc"
  10. "strings"
  11. "time"
  12. )
  13. var Udpclient UdpClient //udp对象
  14. var nextNodes []map[string]interface{}
  15. //udp通知抽取
  16. func ExtractUdp() {
  17. nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
  18. Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
  19. log.Println("udp start ", Config["udpport"])
  20. Udpclient.Listen(processUdpMsg)
  21. sid := "1fffffffffffffffffffffff"
  22. eid := "9fffffffffffffffffffffff"
  23. QuerySensitiveWords(sid,eid )
  24. }
  25. var task chan struct{} = make(chan struct{}, 1)
  26. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  27. task <- struct{}{}
  28. defer func() {
  29. <-task
  30. }()
  31. switch act {
  32. case OP_TYPE_DATA:
  33. var rep map[string]interface{}
  34. err := json.Unmarshal(data, &rep)
  35. if err != nil {
  36. log.Println(err)
  37. } else {
  38. sid, _ := rep["gtid"].(string)
  39. eid, _ := rep["lteid"].(string)
  40. if sid == "" || eid == "" {
  41. log.Println("err", "sid=", sid, ",eid=", eid)
  42. return
  43. }
  44. go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
  45. log.Println("udp通知抽取id段", sid, " ", eid)
  46. QuerySensitiveWords(sid, eid)
  47. for _, m := range nextNodes {
  48. by, _ := json.Marshal(map[string]interface{}{
  49. "gtid": sid,
  50. "lteid": eid,
  51. "stype": ObjToString(m["stype"]),
  52. })
  53. err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
  54. IP: net.ParseIP(m["addr"].(string)),
  55. Port: IntAll(m["port"]),
  56. })
  57. if err != nil {
  58. log.Println(err)
  59. }
  60. }
  61. log.Println("udp通知抽取完成,eid=", eid)
  62. }
  63. case OP_NOOP: //下个节点回应
  64. log.Println(string(data))
  65. }
  66. }
  67. func QuerySensitiveWords(sid, eid string) {
  68. log.Println("QuerySensitiveWords:", sid, eid)
  69. objSid, err := primitive.ObjectIDFromHex(sid)
  70. if err != nil {
  71. log.Println("转换sid err", err)
  72. return
  73. }
  74. objEid, err := primitive.ObjectIDFromHex(eid)
  75. if err != nil {
  76. log.Println("转换eid err", err)
  77. return
  78. }
  79. var num int64
  80. iter := BiddingMgo.GetMgoConn().C("bidding").Find(map[string]interface{}{
  81. "_id": map[string]interface{}{
  82. "$gte": objSid,
  83. "$lte": objEid,
  84. },
  85. }).Select(Fields).Sort("_id").Iter()
  86. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  87. if win, isok := tmp["winner"].(string); isok {
  88. queryGrpcWinner := query_grpc(win, FindWinnerC)
  89. if queryGrpcWinner == "" {
  90. /**********处理未匹配的数据-进行es-分词打分比较**********/
  91. new_name,b :=dealWithScoreRules(win,winner_type,winner_index)
  92. if b {
  93. tmp["winner"] = new_name
  94. }
  95. } else {
  96. tmp["winner"] = queryGrpcWinner
  97. }
  98. }
  99. if win, isok := tmp["s_winner"].(string); isok {
  100. queryGrpcWinner := query_grpc(win, FindWinnerC)
  101. if queryGrpcWinner == "" {
  102. new_name,b :=dealWithScoreRules(win,winner_type,winner_index)
  103. if b {
  104. tmp["s_winner"] = new_name
  105. }
  106. } else {
  107. tmp["s_winner"] = queryGrpcWinner
  108. }
  109. }
  110. if agency, isok := tmp["agency"].(string); isok {
  111. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  112. if queryGrpcAgency == "" {
  113. new_name,b :=dealWithScoreRules(agency,agency_type,agency_index)
  114. if b {
  115. tmp["agency"] = new_name
  116. }
  117. } else {
  118. tmp["agency"] = queryGrpcAgency
  119. }
  120. }
  121. if buyer, isok := tmp["buyer"].(string); isok {
  122. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  123. if queryGrpcBuyer == "" {
  124. new_name,b :=dealWithScoreRules(buyer,buyer_type,buyer_index)
  125. if b {
  126. tmp["buyer"] = new_name
  127. }
  128. } else {
  129. tmp["buyer"] = queryGrpcBuyer
  130. }
  131. }
  132. num++
  133. }
  134. iter2 := BiddingMgo.GetMgoConn().C("bidding_back").Find(map[string]interface{}{
  135. "_id": map[string]interface{}{
  136. "$gte": objSid,
  137. "$lte": objEid,
  138. },
  139. }).Select(Fields).Sort("_id").Iter()
  140. for tmp := map[string]interface{}{}; iter2.Next(&tmp); tmp = map[string]interface{}{} {
  141. if win, isok := tmp["winner"].(string); isok {
  142. queryGrpcWinner := query_grpc(win, FindWinnerC)
  143. if queryGrpcWinner == "" {
  144. } else {
  145. tmp["winner"] = queryGrpcWinner
  146. }
  147. }
  148. if win, isok := tmp["s_winner"].(string); isok {
  149. queryGrpcWinner := query_grpc(win, FindWinnerC)
  150. if queryGrpcWinner == "" {
  151. } else {
  152. tmp["s_winner"] = queryGrpcWinner
  153. }
  154. }
  155. if agency, isok := tmp["agency"].(string); isok {
  156. queryGrpcAgency := query_grpc(agency, FindAgencyC)
  157. if queryGrpcAgency == "" {
  158. } else {
  159. tmp["agency"] = queryGrpcAgency
  160. }
  161. }
  162. if buyer, isok := tmp["buyer"].(string); isok {
  163. queryGrpcBuyer := query_grpc(buyer, FindBuyerC)
  164. if queryGrpcBuyer == "" {
  165. } else {
  166. tmp["buyer"] = queryGrpcBuyer
  167. }
  168. }
  169. num++
  170. break //测试
  171. }
  172. log.Println("处理完成:", num)
  173. }
  174. func query_grpc(enterprise, findC string) string {
  175. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  176. defer cancel()
  177. var CorpusType proto_grpc.CorpusType = proto_grpc.CorpusType_ALL
  178. switch findC {
  179. case "buyer":
  180. CorpusType = proto_grpc.CorpusType_BUYER
  181. case "agency":
  182. CorpusType = proto_grpc.CorpusType_BUYER
  183. case "winner":
  184. CorpusType = proto_grpc.CorpusType_BUYER
  185. default:
  186. CorpusType = proto_grpc.CorpusType_ALL
  187. }
  188. lenc := len(QAddrs)
  189. c := make(chan map[int]string, lenc)
  190. defer close(c)
  191. for i, v := range QAddrs {
  192. go func(index int, vc *proto_grpc.SensitiveWordsClient) {
  193. sensitiveWords, err := (*vc).Search(ctx, &proto_grpc.Request{Text: enterprise, Corpus: CorpusType})
  194. if err != nil {
  195. log.Println(index, err)
  196. return
  197. }
  198. c <- map[int]string{index: sensitiveWords.GetSensitiveWords()}
  199. return
  200. }(i, v)
  201. }
  202. result := []string{}
  203. var q int
  204. for v := range c {
  205. for _, vv := range v {
  206. if vv == "" {
  207. continue
  208. }
  209. result = append(result,vv)
  210. }
  211. q++
  212. if q >= lenc {
  213. break
  214. }
  215. }
  216. return strings.Join(result,",")
  217. }