package main import ( "encoding/json" "github.com/importcjj/sensitive" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/mgo.v2/bson" "log" "runtime" "strings" "sync" "time" ) //部署-历史-敏感词库 func initSensitiveWordsData() { log.Println("初始化敏感词-源数据...") gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId) if err != nil { log.Fatalln(err) } lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId) if err != nil { log.Fatalln(err) } log.Println("id段落:",BsonTOStringId(gteid),BsonTOStringId(lteid)) iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": gteid, "$lte": lteid, }, }).Sort("_id").Iter() Filter = sensitive.New() var initnum uint for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { initnum++ if initnum%10000==0 { log.Println("current index ", initnum,tmp["qy_name"]) } Filter.AddWord(tmp["qy_name"].(string)) } log.Println("init ok", initnum) } //定时增量数据处理---冯 func addTaskSensitiveWordsData() { mmmgo, err := InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123") if err != nil { log.Fatalln(err) } con := mmmgo.GetCon() if con == nil { log.Fatalln("mgo con err") } Filter = sensitive.New() tick := time.Tick(time.Hour * 24 * 7)//查询七天前 for {//定时任务 ctime := <-tick cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local) findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{ "updatetime": bson.M{"$gte": cronData.Unix()}, }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1,"company_type": 1,"company_type_old": 1}).SetSort(bson.M{"_id": 1})) if err != nil { log.Println("tick err", cronData) continue } defer findByupdate.Close(nil) for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} { err := findByupdate.Decode(&tmp) if err == nil { if company_name, ok := tmp["company_name"].(string); ok { if reglen.MatchString(company_name) || strReg.MatchString(company_name) || !uncon_strReg.MatchString(company_name)|| !unstart_strReg.MatchString(company_name)|| start_strReg.MatchString(company_name)|| end_strReg.MatchString(company_name)|| con_strReg.MatchString(company_name) { continue } if strings.Contains(ObjToString(tmp["company_type"]),"个人")|| strings.Contains(ObjToString(tmp["company_type"]),"个体")|| strings.Contains(ObjToString(tmp["company_type_old"]),"个人")|| strings.Contains(ObjToString(tmp["company_type_old"]),"个体") { continue } //存mgo con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{ "qy_name": company_name, }) //存敏感词 Filter.AddWord(company_name) //存es=判断+新增 dealWithEsData(company_name,BsonTOStringId(tmp["_id"])) } } } log.Println("tick ok", cronData) } } //处理是否新增es func dealWithEsData(name string,tmpid string) { query:= `{"query":{"bool":{"must":[{"term":{"`+es_index+`.name":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}` tmp := make(map[string]interface{}) json.Unmarshal([]byte(query),&tmp) searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do() if err != nil { log.Println("从ES查询出错", err.Error()) }else { data := make(map[string]interface{},0) if searchResult.Hits != nil { for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &data) } } if len(data)==0{ //log.Println("无数据-新增") _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{ "name":name, "name_word":name, }).Do() if err != nil { //log.Println("新增失败:",name,tmpid) } } } } //处理内存分段 func dealWithDataMemory() { iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{ "_id": map[string]interface{}{ "$gte": BsonTOStringId("1fffffffffffffffffffffff"), "$lte": BsonTOStringId("9fffffffffffffffffffffff"), }, }).Sort("_id").Iter() Filter = sensitive.New() var initnum uint saveIdArr ,start_id:= make([]map[string]string,0),"" var m runtime.MemStats for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} { if start_id=="" { start_id = BsonTOStringId(tmp["_id"]) } Filter.AddWord(tmp["qy_name"].(string)) initnum++ if initnum%100000==0 { runtime.ReadMemStats(&m) men :=toMegaBytes(m.HeapAlloc) log.Printf("current index %d\tos %.2f M",initnum, men) if men>7.5*1024 { //7.5G saveIdArr = append(saveIdArr, map[string]string{ "start":start_id, "end":BsonTOStringId(tmp["_id"]), }) runtime.GC() Filter = sensitive.New() start_id = "" time.Sleep(time.Second*5) } } break } saveIdArr = append(saveIdArr, map[string]string{ "start":start_id, "end":"", }) for k,v:=range saveIdArr{ log.Println("第",k,"段",v["start"],v["end"]) } log.Println("memory is ok", initnum) } func temporaryTest() { log.Println("测试......导出数据") q := map[string]interface{}{} sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) //多线程升索引 pool_es := make(chan bool, 20) wg_es := &sync.WaitGroup{} it := sess.DB(data_mgo.DbName).C("zk_company_test").Find(&q).Iter() total,isOK:=0,0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total % 1000 == 0 { log.Println("current index",total,isOK) } name:=ObjToString(tmp["name"]) pool_es <- true wg_es.Add(1) go func(name string) { defer func() { <-pool_es wg_es.Done() }() new_name,b :=dealWithNameScoreRules(name) if new_name!="" && b { isOK++ data_mgo.Save("zk_words_test_test", map[string]interface{}{ "name":name, "new_name":new_name, }) } }(name) tmp = make(map[string]interface{}) } wg_es.Wait() log.Println("is over",total) }