udpdata.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // extractudp
  2. package util
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "importcjj/sensitive"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. qu "qfw/util"
  12. "strings"
  13. "sync"
  14. )
  15. var task chan struct{} = make(chan struct{}, 1)
  16. var udpclient mu.UdpClient //udp对象
  17. var nextNodes []map[string]interface{}
  18. //udp通知抽取
  19. func SentiveUdp() {
  20. nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  21. updport := Sysconfig["udpport"].(string)
  22. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  23. udpclient.Listen(processUdpMsg)
  24. log.Println("Udp服务监听", updport)
  25. //临时测试
  26. //sid := "1fffffffffffffffffffffff"
  27. //eid := "9fffffffffffffffffffffff"
  28. //QuerySensitiveWords(sid,eid )
  29. }
  30. var syc sync.WaitGroup
  31. //udp接收
  32. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  33. task <- struct{}{}
  34. defer func() {
  35. <-task
  36. }()
  37. switch act {
  38. case mu.OP_TYPE_DATA:
  39. var rep map[string]interface{}
  40. err := json.Unmarshal(data, &rep)
  41. if err != nil {
  42. log.Println(err)
  43. } else {
  44. sid, _ := rep["gtid"].(string)
  45. eid, _ := rep["lteid"].(string)
  46. if sid == "" || eid == "" {
  47. log.Println("err", "sid=", sid, ",eid=", eid)
  48. return
  49. }
  50. go udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), mu.OP_NOOP, ra)
  51. log.Println("udp通知-敏感词id段", sid, " ", eid)
  52. QuerySensitiveWords(sid, eid)
  53. }
  54. case mu.OP_NOOP: //下个节点回应
  55. log.Println(string(data))
  56. }
  57. }
  58. //处理方法
  59. func QuerySensitiveWords(sid, eid string) {
  60. log.Println("SensitiveWords:", sid, eid)
  61. q := map[string]interface{}{
  62. "_id": map[string]interface{}{
  63. "$gt": StringTOBsonId(sid),
  64. "$lte": StringTOBsonId(eid),
  65. },
  66. }
  67. var num, unum int
  68. sess := Save_Mgo.GetMgoConn()
  69. defer Save_Mgo.DestoryMongoConn(sess)
  70. iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Select(Fields).Iter()
  71. c := make(chan struct{}, 1)
  72. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  73. c <- struct{}{}
  74. syc.Add(1)
  75. go handletmp(tmp, &unum, c)
  76. num++
  77. }
  78. syc.Wait()
  79. log.Printf("处理完成:%d,更新数:%d\n", num, unum)
  80. }
  81. func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
  82. defer func() {
  83. <-c
  84. syc.Done()
  85. }()
  86. up := make(map[string]interface{})
  87. id := tmp["_id"].(primitive.ObjectID).Hex()
  88. buyer := qu.ObjToString(tmp["buyer"])
  89. agency := qu.ObjToString(tmp["agency"])
  90. winner := qu.ObjToString(tmp["winner"])
  91. s_winner := qu.ObjToString(tmp["s_winner"])
  92. if buyer != "" {
  93. if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery"&& flog != "queryScore"&& flog != "queryString" {
  94. tmp["buyer"] = fname
  95. up["log"] =map[string]interface{}{
  96. "buyer":fmt.Sprintf("%s_%s", flog, buyer),
  97. }
  98. up["buyer"] = fname
  99. }
  100. }
  101. if agency !="" {
  102. if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
  103. tmp["agency"] = fname
  104. up["log"] =map[string]interface{}{
  105. "agency":fmt.Sprintf("%s_%s", flog, agency),
  106. }
  107. up["agency"] = fname
  108. }
  109. }
  110. if winner != "" {
  111. if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
  112. tmp["winner"] = fname
  113. up["log"] =map[string]interface{}{
  114. "winner":fmt.Sprintf("%s_%s", flog, winner),
  115. }
  116. up["winner"] = fname
  117. }
  118. }
  119. if s_winner != "" {
  120. if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
  121. tmp["s_winner"] = fname
  122. up["log"] =map[string]interface{}{
  123. "s_winner":fmt.Sprintf("%s_%s", flog, s_winner),
  124. }
  125. up["s_winner"] = fname
  126. }
  127. }
  128. if len(up) > 0 {
  129. *unum++
  130. Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up})
  131. }
  132. }
  133. func cheakname(name string) (up bool, log, rname string) {
  134. filter := sensitive.New()
  135. var cheaklog string
  136. //更新,匹配
  137. if termQuery(name) {
  138. cheaklog = "termQuery"
  139. return true, cheaklog, name
  140. }
  141. rname, isok, _ ,datas := dealWithNameScoreRules(name)
  142. if len(datas) > 0 {
  143. for _, v := range datas {
  144. filter.AddWord(v["name"].(string))
  145. }
  146. findAll := filter.FindAll(name)
  147. data := handleData(findAll)
  148. //更新,匹配
  149. if len(data) > 0 {
  150. cheaklog = "queryString"
  151. return true, cheaklog, data
  152. }
  153. }
  154. //更新,匹配
  155. if rname != "" && isok {
  156. cheaklog = "queryScore"
  157. return true, cheaklog, rname
  158. }
  159. return false, "", name
  160. }
  161. func termQuery(name string) bool {
  162. query := `{"query":{"bool":{"must":[{"term":{"` + Es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
  163. tmp := make(map[string]interface{})
  164. json.Unmarshal([]byte(query), &tmp)
  165. searchResult, err := Client_Es.Search().Index(Es_index).Type(Es_type).Source(tmp).Do()
  166. if err != nil {
  167. log.Println("从ES查询出错", err.Error(), name)
  168. return false
  169. } else {
  170. data := make(map[string]interface{}, 1)
  171. if searchResult.Hits != nil {
  172. for _, hit := range searchResult.Hits.Hits {
  173. json.Unmarshal(*hit.Source, &data)
  174. if data["name"].(string) == name {
  175. return true
  176. }
  177. }
  178. }
  179. }
  180. return false
  181. }
  182. func handleData(datas []string) string {
  183. dataslen := len(datas)
  184. del := map[int]bool{}
  185. if dataslen <= 1 {
  186. rstr := strings.Join(datas, ",")
  187. return rstr
  188. }
  189. m2 := make(map[string]bool)
  190. for i, v := range datas {
  191. if m2[v] {
  192. del[i] = true
  193. } else {
  194. m2[v] = true
  195. }
  196. }
  197. for i := 0; i < dataslen; i++ {
  198. if !del[i] {
  199. for j := i + 1; j < dataslen; j++ {
  200. jdata := datas[j]
  201. idata := datas[i]
  202. if len(jdata) > len(idata) && strings.Contains(jdata, idata) {
  203. del[i] = true
  204. break
  205. } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) {
  206. del[j] = true
  207. break
  208. }
  209. }
  210. }
  211. }
  212. caplen := dataslen - len(del)
  213. rdata := make([]string, caplen, caplen)
  214. var m int
  215. for i, v := range datas {
  216. if !del[i] {
  217. rdata[m] = v
  218. m++
  219. }
  220. }
  221. rstr := strings.Join(rdata, ",")
  222. return rstr
  223. }