|
- package main
- import (
- "encoding/json"
- "github.com/importcjj/sensitive"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo/options"
- "gopkg.in/mgo.v2/bson"
- "log"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- //部署-历史-敏感词库
- func initSensitiveWordsData() {
- log.Println("初始化敏感词-源数据...")
- gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId)
- if err != nil {
- log.Fatalln(err)
- }
- lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId)
- if err != nil {
- log.Fatalln(err)
- }
- log.Println("id段落:",BsonTOStringId(gteid),BsonTOStringId(lteid))
- iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": gteid,
- "$lte": lteid,
- },
- }).Sort("_id").Iter()
- Filter = sensitive.New()
- var initnum uint
- for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
- initnum++
- if initnum%10000==0 {
- log.Println("current index ", initnum,tmp["qy_name"])
- }
- Filter.AddWord(tmp["qy_name"].(string))
- }
- log.Println("init ok", initnum)
- }
- //定时增量数据处理---冯
- func addTaskSensitiveWordsData() {
- mmmgo, err := InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123")
- if err != nil {
- log.Fatalln(err)
- }
- con := mmmgo.GetCon()
- if con == nil {
- log.Fatalln("mgo con err")
- }
- Filter = sensitive.New()
- tick := time.Tick(time.Hour * 24 * 7)//查询七天前
- for {//定时任务
- ctime := <-tick
- cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
- findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{
- "updatetime": bson.M{"$gte": cronData.Unix()},
- }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1,"company_type": 1,"company_type_old": 1}).SetSort(bson.M{"_id": 1}))
- if err != nil {
- log.Println("tick err", cronData)
- continue
- }
- defer findByupdate.Close(nil)
- for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} {
- err := findByupdate.Decode(&tmp)
- if err == nil {
- if company_name, ok := tmp["company_name"].(string); ok {
- if reglen.MatchString(company_name) || strReg.MatchString(company_name) ||
- !uncon_strReg.MatchString(company_name)|| !unstart_strReg.MatchString(company_name)||
- start_strReg.MatchString(company_name)|| end_strReg.MatchString(company_name)||
- con_strReg.MatchString(company_name) {
- continue
- }
- if strings.Contains(ObjToString(tmp["company_type"]),"个人")||
- strings.Contains(ObjToString(tmp["company_type"]),"个体")||
- strings.Contains(ObjToString(tmp["company_type_old"]),"个人")||
- strings.Contains(ObjToString(tmp["company_type_old"]),"个体") {
- continue
- }
- //存mgo
- con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{
- "qy_name": company_name,
- })
- //存敏感词
- Filter.AddWord(company_name)
- //存es=判断+新增
- dealWithEsData(company_name,BsonTOStringId(tmp["_id"]))
- }
- }
- }
- log.Println("tick ok", cronData)
- }
- }
- //处理是否新增es
- func dealWithEsData(name string,tmpid string) {
- query:= `{"query":{"bool":{"must":[{"term":{"`+es_index+`.name":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
- tmp := make(map[string]interface{})
- json.Unmarshal([]byte(query),&tmp)
- searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- }else {
- data := make(map[string]interface{},0)
- if searchResult.Hits != nil {
- for _, hit := range searchResult.Hits.Hits {
- json.Unmarshal(*hit.Source, &data)
- }
- }
- if len(data)==0{
- //log.Println("无数据-新增")
- _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{
- "name":name,
- "name_word":name,
- }).Do()
- if err != nil {
- //log.Println("新增失败:",name,tmpid)
- }
- }
- }
- }
- //处理内存分段
- func dealWithDataMemory() {
- iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": BsonTOStringId("1fffffffffffffffffffffff"),
- "$lte": BsonTOStringId("9fffffffffffffffffffffff"),
- },
- }).Sort("_id").Iter()
- Filter = sensitive.New()
- var initnum uint
- saveIdArr ,start_id:= make([]map[string]string,0),""
- var m runtime.MemStats
- for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
- if start_id=="" {
- start_id = BsonTOStringId(tmp["_id"])
- }
- Filter.AddWord(tmp["qy_name"].(string))
- initnum++
- if initnum%100000==0 {
- runtime.ReadMemStats(&m)
- men :=toMegaBytes(m.HeapAlloc)
- log.Printf("current index %d\tos %.2f M",initnum, men)
- if men>7.5*1024 { //7.5G
- saveIdArr = append(saveIdArr, map[string]string{
- "start":start_id,
- "end":BsonTOStringId(tmp["_id"]),
- })
- runtime.GC()
- Filter = sensitive.New()
- start_id = ""
- time.Sleep(time.Second*5)
- }
- }
- break
- }
- saveIdArr = append(saveIdArr, map[string]string{
- "start":start_id,
- "end":"",
- })
- for k,v:=range saveIdArr{
- log.Println("第",k,"段",v["start"],v["end"])
- }
- log.Println("memory is ok", initnum)
- }
- func temporaryTest() {
- log.Println("测试......导出数据")
- q := map[string]interface{}{}
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- //多线程升索引
- pool_es := make(chan bool, 20)
- wg_es := &sync.WaitGroup{}
- it := sess.DB(data_mgo.DbName).C("zk_company_test").Find(&q).Iter()
- total,isOK:=0,0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total % 1000 == 0 {
- log.Println("current index",total,isOK)
- }
- name:=ObjToString(tmp["name"])
- pool_es <- true
- wg_es.Add(1)
- go func(name string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- new_name,b :=dealWithNameScoreRules(name)
- if new_name!="" && b {
- isOK++
- data_mgo.Save("zk_words_test_test", map[string]interface{}{
- "name":name,
- "new_name":new_name,
- })
- }
- }(name)
- tmp = make(map[string]interface{})
- }
- wg_es.Wait()
- log.Println("is over",total)
- }
|