udpdata.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. // extractudp
  2. package util
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. es7 "qfw/util/elastic_v7"
  12. "regexp"
  13. "strings"
  14. "sync"
  15. )
  16. var task chan struct{} = make(chan struct{}, 1)
  17. var udpclient mu.UdpClient //udp对象
  18. var nextNodes []map[string]interface{}
  19. var specReg *regexp.Regexp = regexp.MustCompile("(,|,)")
  20. // udp通知抽取
  21. func SentiveUdp() {
  22. nextNodes = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  23. updport := Sysconfig["udpport"].(string)
  24. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  25. udpclient.Listen(processUdpMsg)
  26. log.Println("Udp服务监听", updport)
  27. }
  28. var syc sync.WaitGroup
  29. // udp接收
  30. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  31. task <- struct{}{}
  32. defer func() {
  33. <-task
  34. }()
  35. switch act {
  36. case mu.OP_TYPE_DATA:
  37. var rep map[string]interface{}
  38. err := json.Unmarshal(data, &rep)
  39. if err != nil {
  40. log.Println(err)
  41. } else {
  42. sid, _ := rep["gtid"].(string)
  43. eid, _ := rep["lteid"].(string)
  44. stype := qu.ObjToString(rep["stype"])
  45. if stype == "monitor" {
  46. log.Println("收到监测......")
  47. key := qu.ObjToString(rep["key"])
  48. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  49. return
  50. }
  51. if sid == "" || eid == "" {
  52. log.Println("err", "sid=", sid, ",eid=", eid)
  53. return
  54. }
  55. go udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), mu.OP_NOOP, ra)
  56. log.Println("Udp回应上节点~id段")
  57. QuerySensitiveWords(sid, eid)
  58. log.Println("...计划发送udp~统计下一节点...")
  59. }
  60. case mu.OP_NOOP: //下个节点回应
  61. log.Println(string(data))
  62. }
  63. }
  64. // 处理方法
  65. func QuerySensitiveWords(sid, eid string) {
  66. log.Println("开始处理敏感词匹配:", sid, "~", eid)
  67. q := map[string]interface{}{
  68. "_id": map[string]interface{}{
  69. "$gt": StringTOBsonId(sid),
  70. "$lte": StringTOBsonId(eid),
  71. },
  72. }
  73. var num, unum int
  74. sess := Save_Mgo.GetMgoConn()
  75. defer Save_Mgo.DestoryMongoConn(sess)
  76. iter := sess.DB(Save_Mgo.DbName).C(SaveCollName).Find(&q).Select(Fields).Iter()
  77. c := make(chan struct{}, 2)
  78. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  79. c <- struct{}{}
  80. syc.Add(1)
  81. go handletmp(tmp, &unum, c)
  82. num++
  83. }
  84. syc.Wait()
  85. log.Printf("处理完成:%d,更新数:%d\n", num, unum)
  86. }
  87. func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
  88. defer func() {
  89. <-c
  90. syc.Done()
  91. }()
  92. up := make(map[string]interface{})
  93. id := tmp["_id"].(primitive.ObjectID).Hex()
  94. //buyer := qu.ObjToString(tmp["buyer"])
  95. agency := qu.ObjToString(tmp["agency"])
  96. winner := qu.ObjToString(tmp["winner"])
  97. s_winner := qu.ObjToString(tmp["s_winner"])
  98. //暂时弃用
  99. //if buyer != "" {
  100. // if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "termQuery" {
  101. // tmp["buyer"] = fname
  102. // up["log"] =map[string]interface{}{
  103. // "buyer":fmt.Sprintf("%s_%s", flog, buyer),
  104. // }
  105. // up["buyer"] = fname
  106. // }
  107. //}
  108. if agency != "" {
  109. if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
  110. tmp["agency"] = fname
  111. up["log"] = map[string]interface{}{
  112. "agency": fmt.Sprintf("%s_%s", flog, agency),
  113. }
  114. up["agency"] = fname
  115. }
  116. }
  117. if winner != "" && !specReg.MatchString(winner) {
  118. if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
  119. tmp["winner"] = fname
  120. up["log"] = map[string]interface{}{
  121. "winner": fmt.Sprintf("%s_%s", flog, winner),
  122. }
  123. up["winner"] = fname
  124. }
  125. }
  126. if s_winner != "" && !specReg.MatchString(s_winner) {
  127. if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
  128. tmp["s_winner"] = fname
  129. up["log"] = map[string]interface{}{
  130. "s_winner": fmt.Sprintf("%s_%s", flog, s_winner),
  131. }
  132. up["s_winner"] = fname
  133. }
  134. }
  135. if len(up) > 0 {
  136. *unum++
  137. Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up})
  138. }
  139. }
  140. func cheakname(name string) (up bool, log, rname string) {
  141. //filter := sensitive.New()
  142. var cheaklog string
  143. //更新,匹配
  144. if termQuery(name) {
  145. cheaklog = "termQuery"
  146. return true, cheaklog, name
  147. }
  148. rname, isok, _, _ := dealWithNameScoreRules(name)
  149. //if len(datas) > 0 {
  150. // for _, v := range datas {
  151. // if qu.Float64All(v["score"]) < 1.0 {
  152. // break
  153. // }
  154. // if utf8.RuneCountInString(v["name"].(string))>4 {
  155. // filter.AddWord(v["name"].(string))
  156. // }
  157. // }
  158. // findAll := filter.FindAll(name)
  159. // data := handleData(findAll)
  160. // //更新,匹配
  161. // if len(data) > 0 {
  162. // cheaklog = "queryString"
  163. // return true, cheaklog, data
  164. // }
  165. //}
  166. //更新,匹配
  167. if rname != "" && isok {
  168. cheaklog = "queryScore"
  169. return true, cheaklog, rname
  170. }
  171. return false, "", name
  172. }
  173. func termQuery(name string) bool {
  174. query := `{"query":{"bool":{"must":[{"term":{"company_name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":1,"sort":[],"aggs":{}}`
  175. res := es7.Get(Qyxy_Es_index, query)
  176. if res != nil {
  177. if len(*res) > 0 {
  178. return true
  179. }
  180. }
  181. //tmp := make(map[string]interface{})
  182. //json.Unmarshal([]byte(query), &tmp)
  183. //Qyxy_Client_Es := es7.GetEsConn()
  184. //defer es7.DestoryEsConn(Qyxy_Client_Es)
  185. //searchResult, err := Qyxy_Client_Es.Search().Index(Qyxy_Es_index).Type(Qyxy_Es_type).Source(tmp).Do(context.TODO())
  186. //if err != nil {
  187. // log.Println("从ES查询出错", err.Error(), name)
  188. // return false
  189. //} else {
  190. // data := make(map[string]interface{}, 1)
  191. // if searchResult.Hits != nil {
  192. // for _, hit := range searchResult.Hits.Hits {
  193. // json.Unmarshal(hit.Source, &data)
  194. // if data["name"].(string) == name {
  195. // return true
  196. // }
  197. // }
  198. // }
  199. //}
  200. return false
  201. }
  202. func handleData(datas []string) string {
  203. dataslen := len(datas)
  204. del := map[int]bool{}
  205. if dataslen <= 1 {
  206. rstr := strings.Join(datas, ",")
  207. return rstr
  208. }
  209. m2 := make(map[string]bool)
  210. for i, v := range datas {
  211. if m2[v] {
  212. del[i] = true
  213. } else {
  214. m2[v] = true
  215. }
  216. }
  217. for i := 0; i < dataslen; i++ {
  218. if !del[i] {
  219. for j := i + 1; j < dataslen; j++ {
  220. jdata := datas[j]
  221. idata := datas[i]
  222. if len(jdata) > len(idata) && strings.Contains(jdata, idata) {
  223. del[i] = true
  224. break
  225. } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) {
  226. del[j] = true
  227. break
  228. }
  229. }
  230. }
  231. }
  232. caplen := dataslen - len(del)
  233. rdata := make([]string, caplen, caplen)
  234. var m int
  235. for i, v := range datas {
  236. if !del[i] {
  237. rdata[m] = v
  238. m++
  239. }
  240. }
  241. rstr := strings.Join(rdata, ",")
  242. return rstr
  243. }