package main import ( "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "proposed_project/config" "strings" "sync" "time" ) var TagField = map[string]string{ "owner": "ownerclass", //"projecttype": "", "top_category": "category", "sub_category": "category", "nature": "nature", "project_stage": "project_stage", } var ( //MatchArr []TagMatching //不同标签 规则 ruleMatch = make(map[string][]TagMatching, 0) SelectF = make(map[string]int) ) func InitRule() { info, _ := MgoBid.Find(config.Conf.Serve.TagRule, nil, `{"_id": 1}`, nil, false, -1, -1) for _, m := range *info { tag := TagMatching{} tag.tagName = util.ObjToString(m["label_name"]) tag.tagCode = util.ObjToString(m["code"]) // 关键词 if f := util.ObjToString(m["match_keyword"]); f != "" { tag.matchField = strings.Split(f, ",") for _, s := range tag.matchField { SelectF[s] = 1 } if v := util.ObjToString(m["keyword"]); v != "" { tag.matchKey = util.ObjToString(m["keyword"]) tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"])) } } // 附件词 if f := util.ObjToString(m["match_fjword"]); f != "" { tag.addField = strings.Split(f, ",") for _, s := range tag.addField { SelectF[s] = 1 } if v := util.ObjToString(m["fjword"]); v != "" { tag.addKey = util.ObjToString(m["fjword"]) tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"])) } } // 排除词 if f := util.ObjToString(m["match_pcword"]); f != "" { tag.excludeField = strings.Split(f, ",") for _, s := range tag.excludeField { SelectF[s] = 1 } if v := util.ObjToString(m["pcword"]); v != "" { tag.excludeKey = util.ObjToString(m["pcword"]) tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"])) } } // 清理词 if v := util.ObjToString(m["qlword"]); v != "" { tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",") } rules, _ := ruleMatch[tag.tagName] rules = append(rules, tag) ruleMatch[tag.tagName] = rules } for K, v := range ruleMatch { log.Info("InitRule", zap.Int(K, len(v))) } } func taskRun() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(SelectF).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() tag := taskFuc(tmp) update := make(map[string]interface{}) //项目性质 if tag["nature"] != "" { update["nature_code"] = tag["nature"] tmp["nature_code"] = tag["nature"] } else { update["nature_code"] = "00" tmp["nature_code"] = "00" } //项目阶段 if tag["project_stage"] != "" { update["project_stage_code"] = tag["project_stage"] tmp["project_stage_code"] = tag["project_stage"] } else { update["project_stage_code"] = "00" tmp["project_stage_code"] = "00" } //业主类型 if tag["owner"] != "" { update["ownerclass_code"] = tag["owner"] tmp["ownerclass_code"] = tag["owner"] } else { update["ownerclass_code"] = "00" tmp["ownerclass_code"] = "00" } //项目类别 if tag["sub_category"] != "" { update["category_code"] = tag["sub_category"] tmp["category_code"] = tag["sub_category"] } else { if tag["top_category"] != "" { update["category_code"] = tag["top_category"] tmp["category_code"] = tag["top_category"] } } if util.ObjToString(update["category_code"]) == "" { update["category_code"] = "04" tmp["category_code"] = "04" } //updatePool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": update}, //} savePool <- tmp }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskFuc(tmp map[string]interface{}) map[string]string { tag := make(map[string]string) // 打上的标签 for _, MatchArr := range ruleMatch { for _, v := range MatchArr { // 同个类型的标签如果存在,就不需要再打 if tag[v.tagName] != "" { continue } // 排除词 if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 { for _, f := range v.excludeField { if val := util.ObjToString(tmp[f]); val != "" { for _, e1 := range v.excludeKeyReg { flag := false if e1.regs != nil && e1.regs.MatchString(val) { flag = true } else { // && 特殊处理 if strings.Contains(e1.keyStr, "&&") { for _, s := range strings.Split(e1.keyStr, "&&") { if strings.Contains(val, s) { flag = true break } } } } if flag { goto L } } } } } // 清理词 if len(v.clearKey) > 0 && len(v.matchField) > 0 { for _, s := range v.clearKey { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { tmp[f] = strings.ReplaceAll(val, s, "") } } } } // 关键词 if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { for _, r1 := range v.matchKeyReg { if r1.regs.MatchString(val) { if len(v.addField) > 0 && len(v.addKeyReg) > 0 { // 匹配附加词 isCt := false for _, f1 := range v.addField { if v1 := util.ObjToString(tmp[f1]); v1 != "" { for _, r2 := range v.addKeyReg { if r2.regs != nil && r2.regs.MatchString(v1) { isCt = true } else { // && 特殊处理 if strings.Contains(r2.keyStr, "&&") { flag := true for _, s := range strings.Split(r2.keyStr, "&&") { if !strings.Contains(v1, s) { flag = false break } } if flag { isCt = true } } } } } } if isCt { tag[v.tagName] = v.tagCode } } else { tag[v.tagName] = v.tagCode } } } } } } L: } } return tag } func taskFuc1(tmp map[string]interface{}) map[string]string { tag := make(map[string]string) // 打上的标签 for _, v := range StageCode { // 同个类型的标签如果存在,就不需要再打 if tag[v.tagName] != "" { continue } // 排除词 if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 { for _, f := range v.excludeField { if val := util.ObjToString(tmp[f]); val != "" { for _, e1 := range v.excludeKeyReg { flag := false if e1.regs != nil && e1.regs.MatchString(val) { flag = true } else { // && 特殊处理 if strings.Contains(e1.keyStr, "&&") { for _, s := range strings.Split(e1.keyStr, "&&") { if strings.Contains(val, s) { flag = true break } } } } if flag { goto L } } } } } // 清理词 if len(v.clearKey) > 0 && len(v.matchField) > 0 { for _, s := range v.clearKey { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { tmp[f] = strings.ReplaceAll(val, s, "") } } } } // 关键词 if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { for _, r1 := range v.matchKeyReg { if r1.regs.MatchString(val) { if len(v.addField) > 0 && len(v.addKeyReg) > 0 { // 匹配附加词 isCt := false for _, f1 := range v.addField { if v1 := util.ObjToString(tmp[f1]); v1 != "" { for _, r2 := range v.addKeyReg { if r2.regs != nil && r2.regs.MatchString(v1) { isCt = true } else { // && 特殊处理 if strings.Contains(r2.keyStr, "&&") { flag := true for _, s := range strings.Split(r2.keyStr, "&&") { if !strings.Contains(v1, s) { flag = false break } } if flag { isCt = true } } } } } } if isCt { tag[v.tagName] = v.tagCode } } else { tag[v.tagName] = v.tagCode } } } } } } L: } return tag } func UpdateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpdateBulk(config.Conf.DB.MongoP.ProposedColl, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpdateBulk(config.Conf.DB.MongoP.ProposedColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }