package main import ( "fmt" "gorm.io/driver/clickhouse" "gorm.io/gorm" "gorm.io/gorm/logger" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "net/url" "regexp" ) var ( Mgo *mongodb.MongodbSim GF GlobalConf Esa *es.Elastic Esb *es.Elastic //Labels []LabelData //全局所有标签规则 ) type DataRes struct { Name string Weight float64 } func main() { esLabels() } // esLabels 组织机构标签生索引 func esLabels() { InitConfig() InitMgo() InitEs() defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") username := GF.Clickhouse.Username password := GF.Clickhouse.Password host := GF.Clickhouse.Host encodedPassword := url.QueryEscape(password) dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host) db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { log.Fatal("打开数据库失败:", err) } else { log.Println("连接数据库成功", db.Name()) } count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } // 存在组织机构标签 if _, ok := tmp["company_labels"]; ok { companyName := util.ObjToString(tmp["company_name"]) AInfo := EntInfo{} db.Model(&EntInfo{}).Where("company_name = ? ", companyName).Select("company_name", "id").First(&AInfo) if AInfo.ID != "" { update := map[string]interface{}{ "tag_labels": tmp["company_labels"], } err = Esa.UpdateDocument(GF.Env.Esindex, AInfo.ID, update) if err != nil { log.Println(AInfo.ID, "更新", update, "错误") } } } } log.Println("over") } // dealLabels 处理组织机构标签 func dealLabels() { REG, _ = regexp.Compile(`\(.*?\)\d*`) InitConfig() InitMgo() //data := map[string]interface{}{ // "company_name": "发展和改革投资", // "cc_name": "ccdd", // "name": "发展治理委员会", //} defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { ResMap := make([]LabelData, 0) //id := mongodb.BsonIdToSId(tmp["_id"]) if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } for _, v := range GF.Labels { v.RegRule = DealRules(v.Rule) //匹配规则,返回规则内匹配的关键词 //根据识别字段,开始各个字段匹配 for kk, vv := range v.Field { text := util.ObjToString(tmp[vv]) if text == "" { continue } rs, _ := DFAAnalyRules(text, v.RegRule) if rs { //多个不同识别字段,多个权重值 v.TotalWeight = round(v.TotalWeight+v.Weight[kk], 2) ResMap = append(ResMap, v) } } v.TotalWeight = 0 } if len(ResMap) > 0 { // 处理有重复的标签 mergedMap := MergeLabelData(ResMap) for key, values := range mergedMap { //fmt.Printf("Sfield: %s\n", key) datas := make([]DataRes, 0) resTypes := make([]string, 0) for _, value := range values { //fmt.Printf(" Name: %s, Weight: %f\n", value.Name, value.TotalWeight) dar := DataRes{ Name: value.Name, Weight: value.TotalWeight, } datas = append(datas, dar) resTypes = append(resTypes, value.Name) } // 更新对应字段 update := map[string]interface{}{ key: datas, "company_labels": resTypes, } where := map[string]interface{}{ "company_name": tmp["company_name"], } Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false) } } } log.Println("over") }