package main import ( "context" "encoding/json" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "math" "strings" "sync" "time" ) var ( MatchArr []TagMatching BrandArr []Brand KeyRegs []*RegexpInfo //关键词正则 KeyWords = "课桌椅,办公桌,会议桌,讲桌,办公桌椅,木质课桌,电脑桌,折叠桌,课桌凳,学生桌,教师桌,乒乓球桌,幼儿桌子,午休椅,讲桌,木制桌,休闲桌椅,多功能桌,多功能椅,教室桌,培训桌,桌椅板凳,午休卓,办公椅,礼堂椅,会客椅" ) type Brand struct { Name string `json:"name" bson:"name"` Tags []string `json:"tags" bson:"tags"` } var ( MgoBid *mongodb.MongodbSim Thread int //配置项线程数 saveSize int savePool chan map[string]interface{} saveSp chan bool saveTmpPool chan map[string]interface{} //存储itemname过滤的数据,存在临时表 saveTmpSp chan bool ) func InitDB() *mongo.Client { clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27017") MongoDBClient, err := mongo.Connect(context.TODO(), clientOptions) // 连接数据库 if err != nil { fmt.Println("err =>", err) } err = MongoDBClient.Ping(context.TODO(), nil) // 检查数据库是否连接成功 if err != nil { fmt.Println("err =>", err) } fmt.Println("链接success") return MongoDBClient } func InitMgo() { MgoBid = &mongodb.MongodbSim{ MongodbAddr: "127.0.0.1:27017", DbName: "ldx", Size: 10, UserName: "", Password: "", } MgoBid.InitPool() Thread = 1 saveSize = 200 savePool = make(chan map[string]interface{}, 1000) saveSp = make(chan bool, 4) saveTmpPool = make(chan map[string]interface{}, 1000) saveTmpSp = make(chan bool, 2) } func main() { //client := InitDB() InitMgo() InitRule() InitBrand() go saveMethod() go saveTmpMethod() taskRun() c := make(chan bool, 1) <-c // //f, err := excelize.OpenFile("ldx-tmp.xlsx") //if err != nil { // fmt.Println(err) // return //} //defer func() { // if err := f.Close(); err != nil { // fmt.Println(err) // } //}() // //datas := []interface{}{} //// 获取 Sheet1 上所有单元格 //rows, err := f.GetRows("ldx-tmp") //if err != nil { // fmt.Println(err) // return //} // //for k, row := range rows[1:] { // if len(row) < 3 { // continue // } // pinpai := PinPai{ // Name: strings.TrimSpace(row[1]), // Tags: strings.Split(row[2], "/"), // } // fmt.Println("k ==>", k) // // datas = append(datas, pinpai) // //client.Database("ldx").Collection("pinpai").InsertOne(context.TODO(), pinpai) // //} //client.Database("ldx").Collection("pinpai").InsertMany(context.TODO(), datas) // //fmt.Println("success~~~~~~") } // InitRule 初始化规则 func InitRule() { info, _ := MgoBid.Find("productclass_rule", nil, `{"_id": 1}`, nil, false, -1, -1) for _, m := range *info { tag := TagMatching{} tag.tagName = util.ObjToString(m["label_name"]) tag.tagCode = util.ObjToString(m["label_code"]) tag.buyerclass = util.ObjToString(m["buyerclass"]) // 关键词 if f := util.ObjToString(m["match_keyword"]); f != "" { tag.matchField = strings.Split(f, ",") if v := util.ObjToString(m["keyword"]); v != "" { tag.matchKey = util.ObjToString(m["keyword"]) tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"])) } } // 附件词 if f := util.ObjToString(m["match_fjword"]); f != "" { tag.addField = strings.Split(f, ",") if v := util.ObjToString(m["fjword"]); v != "" { tag.addKey = util.ObjToString(m["fjword"]) tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"])) } } // 排除词 if f := util.ObjToString(m["match_pcword"]); f != "" { tag.excludeField = strings.Split(f, ",") if v := util.ObjToString(m["pcword"]); v != "" { tag.excludeKey = util.ObjToString(m["pcword"]) tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"])) } } // 清理词 if v := util.ObjToString(m["qlword"]); v != "" { tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",") } MatchArr = append(MatchArr, tag) } fmt.Println("len InitRule ==>", len(MatchArr)) //log.Info("InitRule", zap.Any("MatchArr", len(MatchArr))) } func InitBrand() { info, _ := MgoBid.Find("brand_name", nil, `{"_id": 1}`, nil, false, -1, -1) brands, err := json.Marshal(*info) if err != nil { fmt.Println("InitBrand Marshal err =>", err) } err = json.Unmarshal(brands, &BrandArr) if err != nil { fmt.Println("InitBrand Unmarshal err =>", err) } fmt.Println("len brand ==>", len(BrandArr)) //log.Info("InitBrand", zap.Int("BrandArr", len(BrandArr))) KeyRegs = GetRegex(KeyWords) fmt.Println("len(KeyRegs)->", len(KeyRegs)) } func taskRun() { sess := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(sess) ch := make(chan bool, Thread) wg := &sync.WaitGroup{} //query := sess.DB("ldx").C("20221122Ldx_kzy").FindId("637c7cd709f6fc229744825b").Select(nil).Iter() f := bson.M{"purchasinglist": 1, "area": 1, "city": 1, "district": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "agency": 1, "matchkey": 1, "publishtime": 1, "projectname": 1, "budget": 1, "bidamount": 1, "id": 1} query := sess.DB("ldx").C("20221122Ldx_kzy").Find(nil).Select(f).Iter() count := 0 //tmp, _ := MgoBid.FindById("20221122Ldx_kzy", "637c7cd709f6fc2297448265", nil) //go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // tag := taskFuc(tmp) //打标签, 返回 对应TagMatching, // //处理数据 // start := time.Now() // dealTmp(tmp, tag) // elapsed := time.Since(start) // fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed) // //}(*tmp) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%saveSize == 0 { //log.Info(fmt.Sprintf("current --- %d", count)) log.Println("current ----->", count) //fmt.Println(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() tag := TaskFuc(tmp) //打标签, 返回 对应TagMatching, //处理数据 //start := time.Now() dealTmp(tmp, tag) //elapsed := time.Since(start) //fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() fmt.Println("over ---> ", count) } func TaskFuc(tmp map[string]interface{}) (tag TagMatching) { L: for _, v := range MatchArr { // 同个类型的标签如果存在,就不需要再打 if v.buyerclass == "" || (v.buyerclass != "" && strings.Contains(v.buyerclass, util.ObjToString(tmp["buyerclass"]))) { // 排除词 if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 { for _, f := range v.excludeField { if val := util.ObjToString(tmp[f]); val != "" { for _, e1 := range v.excludeKeyReg { if e1.regs != nil && e1.regs.MatchString(val) { break L } else { // && 特殊处理 if strings.Contains(e1.keyStr, "&&") { flag := true for _, s := range strings.Split(e1.keyStr, "&&") { if !strings.Contains(val, s) { flag = false break } } if flag { break L } } } } } } } // 清理词 if len(v.clearKey) > 0 && len(v.matchField) > 0 { for _, s := range v.clearKey { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { tmp[f] = strings.ReplaceAll(val, s, "") } } } } // 关键词 if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 { for _, f := range v.matchField { if val := util.ObjToString(tmp[f]); val != "" { for _, r1 := range v.matchKeyReg { if r1.regs.MatchString(val) { if len(v.addField) > 0 && len(v.addKeyReg) > 0 { // 匹配附加词 isCt := false for _, f1 := range v.addField { if v1 := util.ObjToString(tmp[f1]); v1 != "" { for _, r2 := range v.addKeyReg { if r2.regs != nil && r2.regs.MatchString(v1) { isCt = true } else { // && 特殊处理 if strings.Contains(r2.keyStr, "&&") { flag := true for _, s := range strings.Split(r2.keyStr, "&&") { if !strings.Contains(v1, s) { flag = false break } } if flag { isCt = true } } } } } } if isCt { return v } } else { return v } } } } } } } } return tag } // dealTmp 处理tmp 新增字段,拆分purchasinglist,补充里面单价数量 func dealTmp(tmp map[string]interface{}, tag TagMatching) { tmp["infoid"] = util.ObjToString(tmp["id"]) //满足打标签,添加新字段 if tag.tagName != "" { tmp["productclass_val"] = tag.tagName tmp[tag.tagName] = tag.tagCode } //包含二个字段时,进行计算补充第三个字段 fields := []string{"number", "totalprice", "unitprice"} // 1.循环purchasinglist,匹配里面的brandname 值,然后替换 //2.计算补充里 数量总价 if purchasinglist, ok := tmp["purchasinglist"]; ok { if list, ok1 := purchasinglist.([]interface{}); ok1 { if len(list) > 0 { for _, item := range list { newTmp := util.DeepCopy(tmp).(map[string]interface{}) skipTmp := util.DeepCopy(tmp).(map[string]interface{}) var values []string item1 := item.(map[string]interface{}) if brandname := util.ObjToString(item1["brandname"]); brandname != "" { values = append(values, util.ObjToString(brandname)) } if itemname := util.ObjToString(item1["itemname"]); itemname != "" { if !MatchField(strings.Split(KeyWords, ","), []string{itemname}) { for kk, vv := range item1 { skipTmp[kk] = vv } delete(skipTmp, "purchasinglist") delete(skipTmp, "_id") saveTmpPool <- skipTmp continue } values = append(values, util.ObjToString(itemname)) } if model := util.ObjToString(item1["model"]); model != "" { values = append(values, util.ObjToString(model)) } var matched bool //匹配 brandname,匹配到就替换原来的值 for _, brand := range BrandArr { matched = MatchField(brand.Tags, values) if matched { item1["brandname"] = brand.Name break } } //匹配不上,直接用其他 if !matched && util.ObjToString(item1["brandname"]) != "" { item1["brandname"] = "其他" } // 判断补充 purchasinglist 节点里的数量关系 if SumFields(fields, item1) > 1 { number, okn := item1["number"] unitprice, oku := item1["unitprice"] totalprice, okt := item1["totalprice"] if okn && oku && !okt { //只有数量和单价 item1["totalprice"] = number.(float64) * unitprice.(float64) } else if okn && okt && !oku { // 只有数量和总价 item1["unitprice"] = totalprice.(float64) / number.(float64) } else if okt && oku && !okn { //只有总价和单价 number = totalprice.(float64) / unitprice.(float64) number = math.Ceil(number.(float64)) item1["number"] = number } } for kk, vv := range item1 { newTmp[kk] = vv } delete(newTmp, "purchasinglist") delete(newTmp, "_id") savePool <- newTmp } } } } } func saveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } // saveTmpMethod 过滤的数据进一个临时表 func saveTmpMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveTmpPool: arru[indexu] = v indexu++ if indexu == saveSize { saveTmpSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveTmpSp }() MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveTmpSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveTmpSp }() MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }