// extractudp package util import ( "encoding/json" "fmt" "github.com/importcjj/sensitive" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "log" "net" "regexp" "strings" "sync" "time" ) var task chan struct{} = make(chan struct{}, 1) var Udpclient UdpClient //udp对象 var nextNodes []map[string]interface{} //udp通知抽取 func ExtractUdp() { nextNodes = ObjArrToMapArr(Config["nextNode"].([]interface{})) Udpclient = UdpClient{Local: ":" + ObjToString(Config["udpport"]), BufSize: 1024} log.Println("udp start ", Config["udpport"]) Udpclient.Listen(processUdpMsg) /*//临时测试 sid := "1fffffffffffffffffffffff" eid := "9fffffffffffffffffffffff" QuerySensitiveWords(sid,eid )*/ } var syc sync.WaitGroup func QuerySensitiveWords(sid, eid string) { log.Println("QuerySensitiveWords:", sid, eid) objSid, err := primitive.ObjectIDFromHex(sid) if err != nil { log.Println("转换sid err", err) return } objEid, err := primitive.ObjectIDFromHex(eid) if err != nil { log.Println("转换eid err", err) return } var num, unum int mgoSess := QfwMgo85.GetMgoConn() defer QfwMgo85.DestoryMongoConn(mgoSess) iter := mgoSess.DB(QfwMgo85.DbName).C(Collection).Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": objSid, "$lte": objEid, }, }).Select(Fields).Iter() c := make(chan struct{}, 3) for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { c <- struct{}{} syc.Add(1) go handletmp(tmp, &unum, c) num++ } syc.Wait() log.Printf("%s--->%s 处理完成:%d,更新数:%d\n", sid, eid, num, unum) } func handletmp(tmp map[string]interface{}, unum *int, c <-chan struct{}) { defer func() { <-c syc.Done() }() up := make(map[string]string) if win, isok := tmp["winner"].(string); isok { if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" { tmp["winner"] = fname up["winner"] = fmt.Sprintf("%s_%s", flog, win) } } if win, isok := tmp["s_winner"].(string); isok { if fok, flog, fname := cheakname(win); fok && flog != "" && flog != "tremQuery" { tmp["s_winner"] = fname up["s_winner"] = fmt.Sprintf("%s_%s", flog, win) } } if agency, isok := tmp["agency"].(string); isok { if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "tremQuery" { tmp["agency"] = fname up["agency"] = fmt.Sprintf("%s_%s", flog, agency) } } if buyer, isok := tmp["buyer"].(string); isok { if fok, flog, fname := cheakname(buyer); fok && flog != "" && flog != "tremQuery"&& flog != "queryScore"&& flog != "queryString" { tmp["buyer"] = fname up["buyer"] = fmt.Sprintf("%s_%s", flog, buyer) } } if len(up) > 0 { *unum++ tmp["log"] = up id := tmp["_id"].(primitive.ObjectID).Hex() log.Println(tmp) QfwMgo85.UpdateById(Collection, id, map[string]interface{}{"$set": tmp}) } } func cheakname(name string) (up bool, log, rname string) { filter := sensitive.New() var cheaklog string //更新,匹配 if tremQuery(name) { cheaklog = "tremQuery" return true, cheaklog, name } rname, isok, _ ,datas := dealWithNameScoreRules(name) if len(datas) > 0 { for _, v := range datas { filter.AddWord(v["name"].(string)) } findAll := filter.FindAll(name) data := handleData(findAll) //更新,匹配 if len(data) > 0 { cheaklog = "queryString" return true, cheaklog, data } } //更新,匹配 if rname != "" && isok { cheaklog = "queryScore" return true, cheaklog, rname } return false, "", name } func tremQuery(name string) bool { query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}` tmp := make(map[string]interface{}) json.Unmarshal([]byte(query), &tmp) searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do() if err != nil { log.Println("从ES查询出错", err.Error(), name) return false } else { data := make(map[string]interface{}, 1) if searchResult.Hits != nil { for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &data) if data["name"].(string) == name { return true } } } } return false } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { task <- struct{}{} defer func() { <-task }() switch act { case OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } go Udpclient.WriteUdp([]byte("get:"+sid+"_"+eid), OP_NOOP, ra) log.Println("udp通知抽取id段", sid, " ", eid) QuerySensitiveWords(sid, eid) for _, m := range nextNodes { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": ObjToString(m["stype"]), }) err := Udpclient.WriteUdp(by, OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: IntAll(m["port"]), }) if err != nil { log.Println(err) } } log.Println("udp通知抽取完成,eid=", eid) } case OP_NOOP: //下个节点回应 log.Println(string(data)) } } func handleData(datas []string) string { dataslen := len(datas) del := map[int]bool{} if dataslen <= 1 { rstr := strings.Join(datas, ",") return rstr } m2 := make(map[string]bool) for i, v := range datas { if m2[v] { del[i] = true } else { m2[v] = true } } for i := 0; i < dataslen; i++ { if !del[i] { for j := i + 1; j < dataslen; j++ { jdata := datas[j] idata := datas[i] if len(jdata) > len(idata) && strings.Contains(jdata, idata) { del[i] = true break } else if len(idata) > len(jdata) && strings.Contains(idata, jdata) { del[j] = true break } } } } caplen := dataslen - len(del) rdata := make([]string, caplen, caplen) var m int for i, v := range datas { if !del[i] { rdata[m] = v m++ } } rstr := strings.Join(rdata, ",") return rstr } //定时增量数据处理---冯 func AddTaskSensitiveWordsData() { defer func() { if err := recover(); err != nil { log.Println("func() addTaskSensitiveWordsData", err) } }() mmmgo, err := InitMgoEn("mongodb://"+Config["163_mgo_addr"].(string), 10, Config["163_userName"].(string), Config["163_passWord"].(string)) if err != nil { log.Fatalln(err) } con := mmmgo.GetCon() if con == nil { log.Fatalln("mgo con err") } tick := time.Tick(time.Hour * 24 * 7) //查询七天前 for { //定时任务 ctime := <-tick cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local) findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{ "updatetime": bson.M{"$gte": cronData.Unix()}, }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1, "company_type": 1, "company_type_old": 1})) if err != nil { log.Println("tick err", cronData) continue } defer findByupdate.Close(nil) for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} { err := findByupdate.Decode(&tmp) if err == nil { if company_name, ok := tmp["company_name"].(string); ok { if reglen.MatchString(company_name) || !unstart_strReg.MatchString(company_name) || con_strReg.MatchString(company_name) { continue } company_type:= ObjToString(tmp["company_type"]) if strings.Contains(company_type,"个人")||strings.Contains(company_type,"个体"){ continue } //存mgo new_tmp ,err:= con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{ "qy_name": company_name, }) if err==nil { dealWithEsData(company_name, BsonTOStringId(new_tmp.InsertedID)) } } } } log.Println("tick ok", cronData) } } //处理是否新增es func dealWithEsData(name string, tmpid string) { query := `{"query":{"bool":{"must":[{"term":{"` + es_index + `.name":"` + name + `"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}` tmp := make(map[string]interface{}) json.Unmarshal([]byte(query), &tmp) searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do() if err != nil { log.Println("从ES查询出错", err.Error()) } else { data := make(map[string]interface{}, 0) if searchResult.Hits != nil { for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &data) } } if len(data) == 0 { _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{ "name": name, "name_word": name, }).Do() if err != nil { log.Println("新增失败:", name, tmpid) } } } } var reg_alias = regexp.MustCompile("(税务局|工商行政管理局|文化广播电视新闻出版局|外国专家局|" + "中医药管理局|市场监督管理局|广播电视局|医疗保障局|机关事务管理局|粮食和物资储备局|" + "监狱管理局|畜牧兽医局|食品药品监督管理局|城市管理行政执法局|城市管理局|国家保密局|密码管理局|" + "地方金融监督管理局|住房保障和房屋管理局|质量技术监督局|人力资源与社会保障局|公路管理局|国土资源局|" + "卫生和计划生育局|民事政务局|公众安全局|交通管理局|人力资源和社会保障局|劳动和社会保障局|" + "住房和城乡建设局|就业服务局|文物管理局|环境保护局|粮食和物资储备局|教育体育局|" + "体育局|教育局|招商局|农业局|农机局|水务局|林业局|财政局|审计局|统计局|商务局)$") var reglen *regexp.Regexp = regexp.MustCompile("^(.{1,3}|.{40,})$") var strReg *regexp.Regexp = regexp.MustCompile("^(.{0,3}工程队|.{0,3}总公司|_+|.{0,2}设备安装公司|.{0,2}装[饰修潢]公司|.{0,2}开发公司|.{0,4}有限公司|.{0,4}有限责任公司|.{0,4}设计院|建筑设计研?究?院|省文物考古研究所|经济开发区|省.*|镇人民政府|.{0,2}服务公司|" + ".{0,2}工程质量监督站|.{0,3}经[营销]部|.{0,3}事务所|.{0,4}工程公司|.{0,4}责任公司|.*勘测|.{0,4}研究院|.*能源建|.{0,2}安装工程|.*[市省]{1}|.{0,4}中心|.*区.?|" + ".{0,3}税务局|.{0,3}财政局|.{0,3}商行|.{0,2}公安处|.{0,2}测绘院|.{0,3}开发|.{0,2}建设局|.{0,2}经销部|.{0,3}委员会|.{0,2}分公司|.{0,2}管理站|.{0,2}事务管理局|" + ".*资料|.{0,2}办公用品.{1,2}|.*唯亭|.*设备|.+安装|.{0,2}技术服务|市.+[台院社局司]|城?区.+[府局室院]|县.+[院台局]|.{0,2}发展公司|经济技术开发|" + "发展和改革局|贵州有色地质|铝塑门窗加工|生产力促进中心|特殊普通合伙|工业集团公司|人民调解协会|人民政府办公厅|机电设备公司|房地产开发有限公司|.{0,4}商店|中等专业学校|" + "农村信用联社|.{0,4}经营部|.{0,4}销售部|驾驶员培训学校|.{2}县.{2}镇|保安服务总公司|住房和城乡建设局|地产评估事务所|生产资料门市部|×+|.{0,3}[0-9]{15}|.*[0-9]+|.*路|.*无字号名称.*|.*车|.*[,,]{1}.*|.*个体工商户|.*运输户)$") //非中文开头... var unstart_strReg *regexp.Regexp = regexp.MustCompile("^([\u4e00-\u9fa5])") //开头 var start_strReg *regexp.Regexp = regexp.MustCompile("^([a-zA-Z]{1,2}[\u4e00-\u9fa5]{6,}|省|市|县|区|业绩|资格|中标|项目|预算单位)") //结尾 var end_strReg *regexp.Regexp = regexp.MustCompile("(\\.|\\.\\.|餐馆|店|腻子|肉庄|画社|美发屋|发廊|网吧|网咖|零售点|新街|包子铺|奶茶铺|(株)|先生|女士|小姐|" + "资格|业绩|中标|项目|预算单位|摊位号|号|厅|室|部|点|馆|场|厂|床|所|处|站|行|中心|合作社|ATMS|" + "吧|楼|摊|摊位|廊|茶社|坊|圃|汤锅|园|民宿|美容院|房|排挡|府|庄|栈|队|批发|苑|养殖户|棋牌|农家乐|货运|" + "城|社|基地|会|服务|娱乐|种植|百货|汽修|农家菜|亭|小吃|快餐|粮库|卫生院|书画院|面|门窗|鸡排|屋|橱|堂|肉铺|服务|服饰|/*)$") //包含 var con_strReg *regexp.Regexp = regexp.MustCompile("(\\?|?|%|代码标识|删除|错误|吊销|注销|发起人|待清理|&#|护照号|身份证号|" + "法人| |国家拨入|借款|积累资金|单位自有|认股人|--|、|&|`|美元|[\u4e00-\u9fa5]{2,6}·[\u4e00-\u9fa5]{2,6})|" + "[a-zA-Z]{5,}") var uncon_strReg *regexp.Regexp = regexp.MustCompile("(园|政府|集团|公司|有限|合伙|企|院|学|局|处)") var startWordReg_1 *regexp.Regexp = regexp.MustCompile("^(.{1,5})(省|市|县|州|自治区|特别行政区)") var startWordReg_2 *regexp.Regexp = regexp.MustCompile("^(北京|天津|重庆|上海|河北|山西|" + "浙江|江西|湖北|吉林|海南|甘肃|广东|陕西|辽宁|山东|河南|云南|黑龙江|福建|贵州|江苏|安徽|" + "湖南|四川|青海|台湾|新疆|内蒙古|宁夏|西藏|广西|澳门|香港)") var startWordReg_3 *regexp.Regexp = regexp.MustCompile("^(北京市|天津市|重庆市|上海市|河北省|山西省|" + "浙江省|江西省|湖北省|吉林省|海南省|甘肃省|广东省|陕西省|辽宁省|山东省|河南省|云南省|黑龙江省|福建省|贵州省|江苏省|安徽省|" + "湖南省|四川省|青海省|台湾省|新疆维吾尔自治区|内蒙古自治区|宁夏回族自治区|西藏自治区|广西壮族自治区|澳门特别行政区|香港特别行政区)") var endWordReg *regexp.Regexp = regexp.MustCompile("(有限公司|有限责任公司)$")