123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "math"
- "strings"
- "sync"
- "time"
- )
- var (
- MatchArr []TagMatching
- BrandArr []Brand
- KeyRegs []*RegexpInfo //关键词正则
- KeyWords = "课桌椅,办公桌,会议桌,讲桌,办公桌椅,木质课桌,电脑桌,折叠桌,课桌凳,学生桌,教师桌,乒乓球桌,幼儿桌子,午休椅,讲桌,木制桌,休闲桌椅,多功能桌,多功能椅,教室桌,培训桌,桌椅板凳,午休卓,办公椅,礼堂椅,会客椅"
- )
- type Brand struct {
- Name string `json:"name" bson:"name"`
- Tags []string `json:"tags" bson:"tags"`
- }
- var (
- MgoBid *mongodb.MongodbSim
- Thread int //配置项线程数
- saveSize int
- savePool chan map[string]interface{}
- saveSp chan bool
- saveTmpPool chan map[string]interface{} //存储itemname过滤的数据,存在临时表
- saveTmpSp chan bool
- )
- func InitDB() *mongo.Client {
- clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27017")
- MongoDBClient, err := mongo.Connect(context.TODO(), clientOptions) // 连接数据库
- if err != nil {
- fmt.Println("err =>", err)
- }
- err = MongoDBClient.Ping(context.TODO(), nil) // 检查数据库是否连接成功
- if err != nil {
- fmt.Println("err =>", err)
- }
- fmt.Println("链接success")
- return MongoDBClient
- }
- func InitMgo() {
- MgoBid = &mongodb.MongodbSim{
- MongodbAddr: "127.0.0.1:27017",
- DbName: "ldx",
- Size: 10,
- UserName: "",
- Password: "",
- }
- MgoBid.InitPool()
- Thread = 1
- saveSize = 200
- savePool = make(chan map[string]interface{}, 1000)
- saveSp = make(chan bool, 4)
- saveTmpPool = make(chan map[string]interface{}, 1000)
- saveTmpSp = make(chan bool, 2)
- }
- func main() {
- //client := InitDB()
- InitMgo()
- InitRule()
- InitBrand()
- go saveMethod()
- go saveTmpMethod()
- taskRun()
- c := make(chan bool, 1)
- <-c
- //
- //f, err := excelize.OpenFile("ldx-tmp.xlsx")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //defer func() {
- // if err := f.Close(); err != nil {
- // fmt.Println(err)
- // }
- //}()
- //
- //datas := []interface{}{}
- //// 获取 Sheet1 上所有单元格
- //rows, err := f.GetRows("ldx-tmp")
- //if err != nil {
- // fmt.Println(err)
- // return
- //}
- //
- //for k, row := range rows[1:] {
- // if len(row) < 3 {
- // continue
- // }
- // pinpai := PinPai{
- // Name: strings.TrimSpace(row[1]),
- // Tags: strings.Split(row[2], "/"),
- // }
- // fmt.Println("k ==>", k)
- //
- // datas = append(datas, pinpai)
- // //client.Database("ldx").Collection("pinpai").InsertOne(context.TODO(), pinpai)
- //
- //}
- //client.Database("ldx").Collection("pinpai").InsertMany(context.TODO(), datas)
- //
- //fmt.Println("success~~~~~~")
- }
- // InitRule 初始化规则
- func InitRule() {
- info, _ := MgoBid.Find("productclass_rule", nil, `{"_id": 1}`, nil, false, -1, -1)
- for _, m := range *info {
- tag := TagMatching{}
- tag.tagName = util.ObjToString(m["label_name"])
- tag.tagCode = util.ObjToString(m["label_code"])
- tag.buyerclass = util.ObjToString(m["buyerclass"])
- // 关键词
- if f := util.ObjToString(m["match_keyword"]); f != "" {
- tag.matchField = strings.Split(f, ",")
- if v := util.ObjToString(m["keyword"]); v != "" {
- tag.matchKey = util.ObjToString(m["keyword"])
- tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
- }
- }
- // 附件词
- if f := util.ObjToString(m["match_fjword"]); f != "" {
- tag.addField = strings.Split(f, ",")
- if v := util.ObjToString(m["fjword"]); v != "" {
- tag.addKey = util.ObjToString(m["fjword"])
- tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
- }
- }
- // 排除词
- if f := util.ObjToString(m["match_pcword"]); f != "" {
- tag.excludeField = strings.Split(f, ",")
- if v := util.ObjToString(m["pcword"]); v != "" {
- tag.excludeKey = util.ObjToString(m["pcword"])
- tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
- }
- }
- // 清理词
- if v := util.ObjToString(m["qlword"]); v != "" {
- tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
- }
- MatchArr = append(MatchArr, tag)
- }
- fmt.Println("len InitRule ==>", len(MatchArr))
- //log.Info("InitRule", zap.Any("MatchArr", len(MatchArr)))
- }
- func InitBrand() {
- info, _ := MgoBid.Find("brand_name", nil, `{"_id": 1}`, nil, false, -1, -1)
- brands, err := json.Marshal(*info)
- if err != nil {
- fmt.Println("InitBrand Marshal err =>", err)
- }
- err = json.Unmarshal(brands, &BrandArr)
- if err != nil {
- fmt.Println("InitBrand Unmarshal err =>", err)
- }
- fmt.Println("len brand ==>", len(BrandArr))
- //log.Info("InitBrand", zap.Int("BrandArr", len(BrandArr)))
- KeyRegs = GetRegex(KeyWords)
- fmt.Println("len(KeyRegs)->", len(KeyRegs))
- }
- func taskRun() {
- sess := MgoBid.GetMgoConn()
- defer MgoBid.DestoryMongoConn(sess)
- ch := make(chan bool, Thread)
- wg := &sync.WaitGroup{}
- //query := sess.DB("ldx").C("20221122Ldx_kzy").FindId("637c7cd709f6fc229744825b").Select(nil).Iter()
- f := bson.M{"purchasinglist": 1, "area": 1, "city": 1, "district": 1, "buyerclass": 1, "winner": 1, "buyer": 1,
- "agency": 1, "matchkey": 1, "publishtime": 1, "projectname": 1, "budget": 1, "bidamount": 1, "id": 1}
- query := sess.DB("ldx").C("20221122Ldx_kzy").Find(nil).Select(f).Iter()
- count := 0
- //tmp, _ := MgoBid.FindById("20221122Ldx_kzy", "637c7cd709f6fc2297448265", nil)
- //go func(tmp map[string]interface{}) {
- // defer func() {
- // <-ch
- // wg.Done()
- // }()
- // tag := taskFuc(tmp) //打标签, 返回 对应TagMatching,
- // //处理数据
- // start := time.Now()
- // dealTmp(tmp, tag)
- // elapsed := time.Since(start)
- // fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed)
- //
- //}(*tmp)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%saveSize == 0 {
- //log.Info(fmt.Sprintf("current --- %d", count))
- log.Println("current ----->", count)
- //fmt.Println(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- tag := TaskFuc(tmp) //打标签, 返回 对应TagMatching,
- //处理数据
- //start := time.Now()
- dealTmp(tmp, tag)
- //elapsed := time.Since(start)
- //fmt.Println("dealTmp -- 该函数执行完成耗时:", elapsed)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- fmt.Println("over ---> ", count)
- }
- func TaskFuc(tmp map[string]interface{}) (tag TagMatching) {
- L:
- for _, v := range MatchArr {
- // 同个类型的标签如果存在,就不需要再打
- if v.buyerclass == "" || (v.buyerclass != "" && strings.Contains(v.buyerclass, util.ObjToString(tmp["buyerclass"]))) {
- // 排除词
- if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 {
- for _, f := range v.excludeField {
- if val := util.ObjToString(tmp[f]); val != "" {
- for _, e1 := range v.excludeKeyReg {
- if e1.regs != nil && e1.regs.MatchString(val) {
- break L
- } else {
- // && 特殊处理
- if strings.Contains(e1.keyStr, "&&") {
- flag := true
- for _, s := range strings.Split(e1.keyStr, "&&") {
- if !strings.Contains(val, s) {
- flag = false
- break
- }
- }
- if flag {
- break L
- }
- }
- }
- }
- }
- }
- }
- // 清理词
- if len(v.clearKey) > 0 && len(v.matchField) > 0 {
- for _, s := range v.clearKey {
- for _, f := range v.matchField {
- if val := util.ObjToString(tmp[f]); val != "" {
- tmp[f] = strings.ReplaceAll(val, s, "")
- }
- }
- }
- }
- // 关键词
- if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 {
- for _, f := range v.matchField {
- if val := util.ObjToString(tmp[f]); val != "" {
- for _, r1 := range v.matchKeyReg {
- if r1.regs.MatchString(val) {
- if len(v.addField) > 0 && len(v.addKeyReg) > 0 {
- // 匹配附加词
- isCt := false
- for _, f1 := range v.addField {
- if v1 := util.ObjToString(tmp[f1]); v1 != "" {
- for _, r2 := range v.addKeyReg {
- if r2.regs != nil && r2.regs.MatchString(v1) {
- isCt = true
- } else {
- // && 特殊处理
- if strings.Contains(r2.keyStr, "&&") {
- flag := true
- for _, s := range strings.Split(r2.keyStr, "&&") {
- if !strings.Contains(v1, s) {
- flag = false
- break
- }
- }
- if flag {
- isCt = true
- }
- }
- }
- }
- }
- }
- if isCt {
- return v
- }
- } else {
- return v
- }
- }
- }
- }
- }
- }
- }
- }
- return tag
- }
- // dealTmp 处理tmp 新增字段,拆分purchasinglist,补充里面单价数量
- func dealTmp(tmp map[string]interface{}, tag TagMatching) {
- tmp["infoid"] = util.ObjToString(tmp["id"])
- //满足打标签,添加新字段
- if tag.tagName != "" {
- tmp["productclass_val"] = tag.tagName
- tmp[tag.tagName] = tag.tagCode
- }
- //包含二个字段时,进行计算补充第三个字段
- fields := []string{"number", "totalprice", "unitprice"}
- // 1.循环purchasinglist,匹配里面的brandname 值,然后替换
- //2.计算补充里 数量总价
- if purchasinglist, ok := tmp["purchasinglist"]; ok {
- if list, ok1 := purchasinglist.([]interface{}); ok1 {
- if len(list) > 0 {
- for _, item := range list {
- newTmp := util.DeepCopy(tmp).(map[string]interface{})
- skipTmp := util.DeepCopy(tmp).(map[string]interface{})
- var values []string
- item1 := item.(map[string]interface{})
- if brandname := util.ObjToString(item1["brandname"]); brandname != "" {
- values = append(values, util.ObjToString(brandname))
- }
- if itemname := util.ObjToString(item1["itemname"]); itemname != "" {
- if !MatchField(strings.Split(KeyWords, ","), []string{itemname}) {
- for kk, vv := range item1 {
- skipTmp[kk] = vv
- }
- delete(skipTmp, "purchasinglist")
- delete(skipTmp, "_id")
- saveTmpPool <- skipTmp
- continue
- }
- values = append(values, util.ObjToString(itemname))
- }
- if model := util.ObjToString(item1["model"]); model != "" {
- values = append(values, util.ObjToString(model))
- }
- var matched bool
- //匹配 brandname,匹配到就替换原来的值
- for _, brand := range BrandArr {
- matched = MatchField(brand.Tags, values)
- if matched {
- item1["brandname"] = brand.Name
- break
- }
- }
- //匹配不上,直接用其他
- if !matched && util.ObjToString(item1["brandname"]) != "" {
- item1["brandname"] = "其他"
- }
- // 判断补充 purchasinglist 节点里的数量关系
- if SumFields(fields, item1) > 1 {
- number, okn := item1["number"]
- unitprice, oku := item1["unitprice"]
- totalprice, okt := item1["totalprice"]
- if okn && oku && !okt { //只有数量和单价
- item1["totalprice"] = number.(float64) * unitprice.(float64)
- } else if okn && okt && !oku { // 只有数量和总价
- item1["unitprice"] = totalprice.(float64) / number.(float64)
- } else if okt && oku && !okn { //只有总价和单价
- number = totalprice.(float64) / unitprice.(float64)
- number = math.Ceil(number.(float64))
- item1["number"] = number
- }
- }
- for kk, vv := range item1 {
- newTmp[kk] = vv
- }
- delete(newTmp, "purchasinglist")
- delete(newTmp, "_id")
- savePool <- newTmp
- }
- }
- }
- }
- }
- func saveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MgoBid.SaveBulk("20221122Ldx_kzy_new", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- // saveTmpMethod 过滤的数据进一个临时表
- func saveTmpMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveTmpPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveTmpSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveTmpSp
- }()
- MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveTmpSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveTmpSp
- }()
- MgoBid.SaveBulk("20221122Ldx_kzy_skip", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|