udpdata.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. // extractudp
  2. package util
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "github.com/importcjj/sensitive"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. "go.mongodb.org/mongo-driver/mongo/options"
  10. "gopkg.in/olivere/elastic.v1"
  11. "log"
  12. "net"
  13. "net/http"
  14. "regexp"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var task chan struct{} = make(chan struct{}, 1)
  20. var Udpclient UdpClient //udp对象
  21. var nextNodes []map[string]interface{}
  22. //udp通知抽取
  23. func ExtractUdp() {
  24. nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{}))
  25. Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024}
  26. log.Println("udp start ", Config["udpport"])
  27. Udpclient.Listen(processUdpMsg)
  28. /*//临时测试
  29. sid := "1fffffffffffffffffffffff"
  30. eid := "9fffffffffffffffffffffff"
  31. QuerySensitiveWords(sid,eid )*/
  32. }
  33. var syc sync.WaitGroup
  34. func QuerySensitiveWords(sid, eid string) {
  35. log.Println("QuerySensitiveWords:", sid, eid)
  36. objSid, err := primitive.ObjectIDFromHex(sid)
  37. if err != nil {
  38. log.Println("转换sid err", err)
  39. return
  40. }
  41. objEid, err := primitive.ObjectIDFromHex(eid)
  42. if err != nil {
  43. log.Println("转换eid err", err)
  44. return
  45. }
  46. var num, unum int
  47. mgoSess := QfwMgo85.GetMgoConn()
  48. defer QfwMgo85.DestoryMongoConn(mgoSess)
  49. iter := mgoSess.DB(QfwMgo85.DbName).C(Collection).Find(map[string]interface{}{
  50. "_id": map[string]interface{}{
  51. "$gte": objSid,
  52. "$lte": objEid,
  53. },
  54. }).Select(Fields).Iter()
  55. c := make(chan struct{}, 3)
  56. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  57. c <- struct{}{}
  58. syc.Add(1)
  59. go handletmp(tmp, &unum, c)
  60. num++
  61. }
  62. syc.Wait()
  63. log.Printf("%s--->%s 处理完成:%d,更新数:%d\n", sid, eid, num, unum)
  64. }
  65. func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) {
  66. defer func() {
  67. <-c
  68. syc.Done()
  69. }()
  70. up := make(map[string]string)
  71. if win, isok := tmp["winner"].(string); isok {
  72. if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
  73. tmp["winner"] = fname
  74. up["winner"] = fmt.Sprintf("%s_%s", flog, win)
  75. }
  76. }
  77. if win, isok := tmp["s_winner"].(string); isok {
  78. if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" {
  79. tmp["s_winner"] = fname
  80. up["s_winner"] = fmt.Sprintf("%s_%s", flog, win)
  81. }
  82. }
  83. if agency, isok := tmp["agency"].(string); isok {
  84. if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "tremQuery" {
  85. tmp["agency"] = fname
  86. up["agency"] = fmt.Sprintf("%s_%s", flog, agency)
  87. }
  88. }
  89. if buyer, isok := tmp["buyer"].(string); isok {
  90. if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "tremQuery" {
  91. tmp["buyer"] = fname
  92. up["buyer"] = fmt.Sprintf("%s_%s", flog, buyer)
  93. }
  94. }
  95. if len(up) > 0 {
  96. *unum++
  97. tmp["log"] = up
  98. id := tmp["_id"].(primitive.ObjectID).Hex()
  99. log.Println(tmp)
  100. QfwMgo85.UpdateById(Collection, id, map[string]interface{}{"$set": tmp})
  101. }
  102. }
  103. func cheakname(name string) (up bool, log, rname string) {
  104. filter := sensitive.New()
  105. var cheaklog string
  106. //更新,匹配
  107. if tremQuery(name) {
  108. cheaklog = "tremQuery"
  109. return true, cheaklog, name
  110. }
  111. rname, isok, _ ,datas := dealWithNameScoreRules(name)
  112. if len(datas) > 0 {
  113. for _, v := range datas {
  114. filter.AddWord(v["name"].(string))
  115. }
  116. findAll := filter.FindAll(name)
  117. data := handleData(findAll)
  118. //更新,匹配
  119. if len(data) > 0 {
  120. cheaklog = "queryString"
  121. return true, cheaklog, data
  122. }
  123. }
  124. //更新,匹配
  125. if rname != "" && isok {
  126. cheaklog = "queryScore"
  127. return true, cheaklog, rname
  128. }
  129. return false, "", name
  130. }
  131. func tremQuery(name string) bool {
  132. query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
  133. tmp := make(map[string]interface{})
  134. json.Unmarshal([]byte(query), &tmp)
  135. searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
  136. if err != nil {
  137. log.Println("从ES查询出错", err.Error(), name)
  138. return false
  139. } else {
  140. data := make(map[string]interface{}, 1)
  141. if searchResult.Hits != nil {
  142. for _, hit := range searchResult.Hits.Hits {
  143. json.Unmarshal(*hit.Source, &data)
  144. if data["name"].(string) == name {
  145. return true
  146. }
  147. }
  148. }
  149. }
  150. return false
  151. }
  152. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  153. task <- struct{}{}
  154. defer func() {
  155. <-task
  156. }()
  157. switch act {
  158. case OP_TYPE_DATA:
  159. var rep map[string]interface{}
  160. err := json.Unmarshal(data, &rep)
  161. if err != nil {
  162. log.Println(err)
  163. } else {
  164. sid, _ := rep["gtid"].(string)
  165. eid, _ := rep["lteid"].(string)
  166. if sid == "" || eid == "" {
  167. log.Println("err", "sid=", sid, ",eid=", eid)
  168. return
  169. }
  170. go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra)
  171. log.Println("udp通知抽取id段", sid, " ", eid)
  172. QuerySensitiveWords(sid, eid)
  173. for _, m := range nextNodes {
  174. by, _ := json.Marshal(map[string]interface{}{
  175. "gtid": sid,
  176. "lteid": eid,
  177. "stype": ObjToString(m["stype"]),
  178. })
  179. err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{
  180. IP: net.ParseIP(m["addr"].(string)),
  181. Port: IntAll(m["port"]),
  182. })
  183. if err != nil {
  184. log.Println(err)
  185. }
  186. }
  187. log.Println("udp通知抽取完成,eid=", eid)
  188. }
  189. case OP_NOOP: //下个节点回应
  190. log.Println(string(data))
  191. }
  192. }
  193. func handleData(datas []string) string {
  194. dataslen := len(datas)
  195. del := map[int]bool{}
  196. if dataslen <= 1 {
  197. rstr := strings.Join(datas, ",")
  198. return rstr
  199. }
  200. m2 := make(map[string]bool)
  201. for i, v := range datas {
  202. if m2[v] {
  203. del[i] = true
  204. } else {
  205. m2[v] = true
  206. }
  207. }
  208. for i := 0; i < dataslen; i++ {
  209. if !del[i] {
  210. for j := i + 1; j < dataslen; j++ {
  211. jdata := datas[j]
  212. idata := datas[i]
  213. if len(jdata) > len(idata) && strings.Contains(jdata, idata) {
  214. del[i] = true
  215. break
  216. } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) {
  217. del[j] = true
  218. break
  219. }
  220. }
  221. }
  222. }
  223. caplen := dataslen - len(del)
  224. rdata := make([]string, caplen, caplen)
  225. var m int
  226. for i, v := range datas {
  227. if !del[i] {
  228. rdata[m] = v
  229. m++
  230. }
  231. }
  232. rstr := strings.Join(rdata, ",")
  233. return rstr
  234. }
  235. //定时增量数据处理---冯
  236. func AddTaskSensitiveWordsData() {
  237. defer func() {
  238. if err := recover(); err != nil {
  239. log.Println("func() addTaskSensitiveWordsData", err)
  240. }
  241. }()
  242. mmmgo, err := InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123")
  243. if err != nil {
  244. log.Fatalln(err)
  245. }
  246. con := mmmgo.GetCon()
  247. if con == nil {
  248. log.Fatalln("mgo con err")
  249. }
  250. tick := time.Tick(time.Hour * 24 * 7) //查询七天前
  251. for { //定时任务
  252. ctime := <-tick
  253. cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
  254. findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{
  255. "updatetime": bson.M{"$gte": cronData.Unix()},
  256. }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1, "company_type": 1, "company_type_old": 1}))
  257. if err != nil {
  258. log.Println("tick err", cronData)
  259. continue
  260. }
  261. defer findByupdate.Close(nil)
  262. for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} {
  263. err := findByupdate.Decode(&tmp)
  264. if err == nil {
  265. if company_name, ok := tmp["company_name"].(string); ok {
  266. if reglen.MatchString(company_name) || strReg.MatchString(company_name) ||
  267. !uncon_strReg.MatchString(company_name) || !unstart_strReg.MatchString(company_name) ||
  268. start_strReg.MatchString(company_name) || end_strReg.MatchString(company_name) ||
  269. con_strReg.MatchString(company_name) {
  270. continue
  271. }
  272. if strings.Contains(ObjToString(tmp["company_type"]), "个人") ||
  273. strings.Contains(ObjToString(tmp["company_type"]), "个体") ||
  274. strings.Contains(ObjToString(tmp["company_type_old"]), "个人") ||
  275. strings.Contains(ObjToString(tmp["company_type_old"]), "个体") {
  276. continue
  277. }
  278. //存mgo
  279. new_tmp ,err:= con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{
  280. "qy_name": company_name,
  281. })
  282. if err==nil {
  283. dealWithEsData(company_name, BsonTOStringId(new_tmp.InsertedID))
  284. }
  285. //存敏感词
  286. //存es=判断+新增
  287. }
  288. }
  289. }
  290. log.Println("tick ok", cronData)
  291. }
  292. }
  293. //处理是否新增es
  294. func dealWithEsData(name string, tmpid string) {
  295. query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
  296. tmp := make(map[string]interface{})
  297. json.Unmarshal([]byte(query), &tmp)
  298. searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
  299. if err != nil {
  300. log.Println("从ES查询出错", err.Error())
  301. } else {
  302. data := make(map[string]interface{}, 0)
  303. if searchResult.Hits != nil {
  304. for _, hit := range searchResult.Hits.Hits {
  305. json.Unmarshal(*hit.Source, &data)
  306. }
  307. }
  308. if len(data) == 0 {
  309. //log.Println("无数据-新增")
  310. _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{
  311. "name": name,
  312. "name_word": name,
  313. }).Do()
  314. if err != nil {
  315. log.Println("新增失败:", name, tmpid)
  316. }
  317. }
  318. }
  319. }
  320. func TemporaryTest() {
  321. log.Println("测试......导出数据")
  322. QfwMgo85 = &MongodbSim{
  323. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
  324. Size: 5,
  325. DbName: "mixdata",
  326. UserName: "fengweiqiang",
  327. PassWord: "fwq@123123",
  328. }
  329. QfwMgo85.InitPool()
  330. Client_Es, _ = elastic.NewClient(http.DefaultClient, "http://ela.spdata.jianyu360.com")
  331. es_type, es_index = "unique_qy", "unique_qy"
  332. q := map[string]interface{}{
  333. "check_history": map[string]interface{}{
  334. "$exists": 0,
  335. },
  336. }
  337. sess := QfwMgo85.GetMgoConn()
  338. defer QfwMgo85.DestoryMongoConn(sess)
  339. //多线程升索引
  340. pool_es := make(chan bool, 3)
  341. wg_es := &sync.WaitGroup{}
  342. it := sess.DB(QfwMgo85.DbName).C("winner_err_new").Find(&q).Iter()
  343. total, isOK := 0, 0
  344. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  345. if total%100 == 0 {
  346. log.Println("current index", total, isOK)
  347. }
  348. name := ObjToString(tmp["name"])
  349. tmpid := BsonTOStringId(tmp["_id"])
  350. pool_es <- true
  351. wg_es.Add(1)
  352. go func(name string, tmpid string) {
  353. defer func() {
  354. <-pool_es
  355. wg_es.Done()
  356. }()
  357. //start := int(time.Now().Unix())
  358. new_name,b,score,_ :=dealWithNameScoreRules(name)
  359. //log.Println("耗时:",int(time.Now().Unix())-start,"秒",b,name,new_name,tmpid)
  360. if new_name != "" && b {
  361. isOK++
  362. QfwMgo85.UpdateById("winner_err_new", tmpid, map[string]interface{}{
  363. "$set": map[string]interface{}{
  364. "is_word": 1,
  365. "name_word" : new_name,
  366. "score":score,
  367. },
  368. })
  369. } else {
  370. QfwMgo85.UpdateById("winner_err_new", tmpid, map[string]interface{}{
  371. "$set": map[string]interface{}{
  372. "is_word": -1,
  373. "name_word" : new_name,
  374. "score":score,
  375. },
  376. })
  377. }
  378. }(name, tmpid)
  379. tmp = make(map[string]interface{})
  380. }
  381. wg_es.Wait()
  382. log.Println("is over", total, isOK)
  383. }
  384. func TemporaryTestNewData() {
  385. log.Println("测试......导出新数据")
  386. QfwMgo85 = &MongodbSim{
  387. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
  388. Size: 5,
  389. DbName: "mixdata",
  390. UserName: "fengweiqiang",
  391. PassWord: "fwq@123123",
  392. }
  393. QfwMgo85.InitPool()
  394. q := map[string]interface{}{
  395. "is_word":map[string]interface{}{
  396. "$exists":1,
  397. },
  398. }
  399. sess := QfwMgo85.GetMgoConn()
  400. defer QfwMgo85.DestoryMongoConn(sess)
  401. it := sess.DB(QfwMgo85.DbName).C("winner_err_new").Find(&q).Select(map[string]interface{}{
  402. "name":1,
  403. "name_word":1,
  404. "is_word":1,
  405. }).Iter()
  406. total:=0
  407. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  408. if total % 100 == 0 {
  409. log.Println("current index",total)
  410. }
  411. savedict := tmp
  412. QfwMgo85.Save("winner_err_new_word",savedict)
  413. tmp = make(map[string]interface{})
  414. }
  415. log.Println("is over",total)
  416. }
  417. var reg_alias = regexp.MustCompile("(税务局|工商行政管理局|文化广播电视新闻出版局|外国专家局|" +
  418. "中医药管理局|市场监督管理局|广播电视局|医疗保障局|机关事务管理局|粮食和物资储备局|" +
  419. "监狱管理局|畜牧兽医局|食品药品监督管理局|城市管理行政执法局|城市管理局|国家保密局|密码管理局|" +
  420. "地方金融监督管理局|住房保障和房屋管理局|质量技术监督局|人力资源与社会保障局|公路管理局|国土资源局|" +
  421. "卫生和计划生育局|民事政务局|公众安全局|交通管理局|人力资源和社会保障局|劳动和社会保障局|" +
  422. "住房和城乡建设局|就业服务局|文物管理局|环境保护局|粮食和物资储备局|教育体育局|" +
  423. "体育局|教育局|招商局|农业局|农机局|水务局|林业局|财政局|审计局|统计局|商务局)$")
  424. var reglen *regexp.Regexp = regexp.MustCompile("^(.{1,5}|.{40,})$")
  425. var strReg *regexp.Regexp = regexp.MustCompile("^(.{0,3}工程队|.{0,3}总公司|_+|.{0,2}设备安装公司|.{0,2}装[饰修潢]公司|.{0,2}开发公司|.{0,4}有限公司|.{0,4}有限责任公司|.{0,4}设计院|建筑设计研?究?院|省文物考古研究所|经济开发区|省.*|镇人民政府|.{0,2}服务公司|" +
  426. ".{0,2}工程质量监督站|.{0,3}经[营销]部|.{0,3}事务所|.{0,4}工程公司|.{0,4}责任公司|.*勘测|.{0,4}研究院|.*能源建|.{0,2}安装工程|.*[市省]{1}|.{0,4}中心|.*区.?|" +
  427. ".{0,3}税务局|.{0,3}财政局|.{0,3}商行|.{0,2}公安处|.{0,2}测绘院|.{0,3}开发|.{0,2}建设局|.{0,2}经销部|.{0,3}委员会|.{0,2}分公司|.{0,2}管理站|.{0,2}事务管理局|" +
  428. ".*资料|.{0,2}办公用品.{1,2}|.*唯亭|.*设备|.+安装|.{0,2}技术服务|市.+[台院社局司]|城?区.+[府局室院]|县.+[院台局]|.{0,2}发展公司|经济技术开发|" +
  429. "发展和改革局|贵州有色地质|铝塑门窗加工|生产力促进中心|特殊普通合伙|工业集团公司|人民调解协会|人民政府办公厅|机电设备公司|房地产开发有限公司|.{0,4}商店|中等专业学校|" +
  430. "农村信用联社|.{0,4}经营部|.{0,4}销售部|驾驶员培训学校|.{2}县.{2}镇|保安服务总公司|住房和城乡建设局|地产评估事务所|生产资料门市部|×+|.{0,3}[0-9]{15}|.*[0-9]+|.*路|.*无字号名称.*|.*车|.*[,,]{1}.*|.*个体工商户|.*运输户)$")
  431. //非中文开头...
  432. var unstart_strReg *regexp.Regexp = regexp.MustCompile("^([\u4e00-\u9fa5])")
  433. //开头
  434. var start_strReg *regexp.Regexp = regexp.MustCompile("^([a-zA-Z]{1,2}[\u4e00-\u9fa5]{6,}|省|市|县|区|业绩|资格|中标|项目|预算单位)")
  435. //结尾
  436. var end_strReg *regexp.Regexp = regexp.MustCompile("(\\.|\\.\\.|餐馆|店|腻子|肉庄|画社|美发屋|发廊|网吧|网咖|零售点|新街|包子铺|奶茶铺|(株)|先生|女士|小姐|" +
  437. "资格|业绩|中标|项目|预算单位|摊位号|号|厅|室|部|点|馆|场|厂|床|所|处|站|行|中心|合作社|ATMS|" +
  438. "吧|楼|摊|摊位|廊|茶社|坊|圃|汤锅|园|民宿|美容院|房|排挡|府|庄|栈|队|批发|苑|养殖户|棋牌|农家乐|货运|" +
  439. "城|社|基地|会|服务|娱乐|种植|百货|汽修|农家菜|亭|小吃|快餐|粮库|卫生院|书画院|面|门窗|鸡排|屋|橱|堂|肉铺|服务|服饰|/*)$")
  440. //包含
  441. var con_strReg *regexp.Regexp = regexp.MustCompile("(\\?|?|%|代码标识|删除|错误|吊销|注销|发起人|待清理|&#|护照号|身份证号|" +
  442. "法人|&nbsp|国家拨入|借款|积累资金|单位自有|认股人|--|、|&|`|美元|[\u4e00-\u9fa5]{2,6}·[\u4e00-\u9fa5]{2,6})|" +
  443. "[a-zA-Z]{5,}")
  444. var uncon_strReg *regexp.Regexp = regexp.MustCompile("(园|政府|集团|公司|有限|合伙|企|院|学|局|处)")
  445. var startWordReg_1 *regexp.Regexp = regexp.MustCompile("^(.{1,5})(省|市|县|州|自治区|特别行政区)")
  446. var startWordReg_2 *regexp.Regexp = regexp.MustCompile("^(北京|天津|重庆|上海|河北|山西|" +
  447. "浙江|江西|湖北|吉林|海南|甘肃|广东|陕西|辽宁|山东|河南|云南|黑龙江|福建|贵州|江苏|安徽|" +
  448. "湖南|四川|青海|台湾|新疆|内蒙古|宁夏|西藏|广西|澳门|香港)")
  449. var endWordReg *regexp.Regexp = regexp.MustCompile("(有限公司|有限责任公司)$")