package main import ( "flag" "fmt" "github.com/RoaringBitmap/roaring" "github.com/cespare/xxhash/v2" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "os" "os/signal" "regexp" "sort" "strconv" "strings" "sync" "syscall" "time" ) var ( dbfile = flag.String("dbfile", "./db", "数据库文件") cache = roaring.NewBitmap() cacheModify = false //控制10秒 定时写入文件 mutex sync.Mutex // 互斥锁,用于保护 cache 的并发写入操作 ) func init() { _, err := os.Stat(*dbfile) if !os.IsNotExist(err) { bs, err := ioutil.ReadFile(*dbfile) if err != nil { log.Fatal(err) } if len(bs) > 0 { cache.FromBuffer(bs) } } //监听,写入文件保存 go func() { for { time.Sleep(10 * time.Second) if cacheModify { saveDb() cacheModify = false } } }() } func main() { MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.85:27080", //MongodbAddr: "192.168.3.206:27002", Size: 10, DbName: "qfw", //UserName: "root", //Password: "root", } MgoB.InitPool() Es := &elastic.Elastic{ //S_esurl: "http://192.168.3.149:9201", S_esurl: "http://172.17.4.184:19805", I_size: 5, Username: "es_all", Password: "TopJkO2E_d1x", } Es.InitElasticSize() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) // where := map[string]interface{}{ "pici": map[string]interface{}{ //"$gte": 0, //"$lte": 1710381363, "$gt": 1710381363, "$lte": 1710468406, }, } query := sess.DB("qfw").C("projectset_20230904").Find(where).Select(nil).Iter() count := 0 lastID := "" for tmp := make(map[string]interface{}); query.Next(tmp); count++ { id := mongodb.BsonIdToSId(tmp["_id"]) lastID = id if count%10000 == 0 { log.Println("current:", count, id) } name := getNewName(tmp) if name != "" { update := make(map[string]interface{}) update["subtitle_projectname"] = name MgoB.UpdateById("projectset_20230904", id, map[string]interface{}{"$set": update}) //Es.UpdateDocument("projectset", id, update) } tmp = make(map[string]interface{}) } log.Println("结束", lastID) //监听异常退出信号;及时保存数据 signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) <-signalChan log.Println("程序退出") saveDb() } // getNewName 获取新的不重复名称 func getNewName(tmp map[string]interface{}) string { projectName := util.ObjToString(tmp["projectname"]) projectCode := util.ObjToString(tmp["projectcode"]) buyer := util.ObjToString(tmp["buyer"]) firsttime := util.Int64All(tmp["firsttime"]) createtime := util.Int64All(tmp["createtime"]) var projectDate, createDate string if firsttime > 0 { projectDate = time.Unix(firsttime, 0).Format("2006-01-02") } if createtime > 0 { createDate = time.Unix(createtime, 0).Format("2006-01-02") } var matchWords = make([]string, 0) if list, ok := tmp["list"].([]interface{}); ok { if len(list) > 0 { for _, v := range list { if da, ok := v.(map[string]interface{}); ok { title := util.ObjToString(da["title"]) // 使用正则表达式进行匹配 matches := GetPackages(title) for _, v := range matches { if !IsInStringArray(v, matchWords) { matchWords = append(matchWords, v) } } } } } } //pks := removeDuplicates(matchWords) packages := strings.Join(matchWords, "、") return RenameProjectName(projectName, projectCode, packages, projectDate, buyer, createDate) } // saveDb 文件写入 func saveDb() { mutex.Lock() defer mutex.Unlock() // 如果 cache 为空,则无需执行写入操作 if cache.GetCardinality() == 0 { return } fo, err := os.OpenFile(*dbfile, os.O_CREATE|os.O_RDWR|os.O_SYNC|os.O_TRUNC, 0777) if err != nil { log.Fatal(err) } defer fo.Close() cache.WriteTo(fo) } // hash 计算hash func hash(src string) uint64 { return xxhash.Sum64String(src) } // RenameProjectName 获取新的不重复的项目名称 func RenameProjectName(projectName, projectCode, packages, projectDate, buyer, createDate string) (newName string) { //TODO 1.判断项目名称是否重复 var id uint64 defer func() { if id > 0 && newName != "" { cache.Add(uint32(id)) cacheModify = true } }() //1.项目名称 if projectName != "" { id = hash(projectName) if !cache.Contains(uint32(id)) { newName = projectName return projectName } } //TODO 2.1 项目名称+项目编码 if projectCode != "" { newName = projectName + "_" + projectCode id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 2.2 项目名称+分包信息 if packages != "" { newName = projectName + "_" + packages id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 2.3 项目名称+项目时间 if projectDate != "" { newName = projectName + "_" + projectDate id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 2.4 项目名称+采购单位名称 if buyer != "" { newName = projectName + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.1 项目名称+项目编码+分包信息 if projectCode != "" && packages != "" { newName = projectName + "_" + projectCode + "_" + packages id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.2 项目名称+项目编码+项目时间 if projectCode != "" && projectDate != "" { newName = projectName + "_" + projectCode + "_" + projectDate id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.3 项目名称+项目编码+采购单位 if projectCode != "" && buyer != "" { newName = projectName + "_" + projectCode + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.4 项目名称+分包+项目时间 if packages != "" && projectDate != "" { newName = projectName + "_" + packages + "_" + projectDate id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.5 项目名称+分包+采购单位 if packages != "" && buyer != "" { newName = projectName + "_" + packages + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 3.6 项目名称+项目时间+采购单位 if projectDate != "" && buyer != "" { newName = projectName + "_" + projectDate + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 4.1 项目名称+项目编码+分包信息+项目时间 if projectCode != "" && packages != "" && projectDate != "" { newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 4.2 项目名称+项目编码+分包信息+采购单位 if projectCode != "" && packages != "" && buyer != "" { newName = projectName + "_" + projectCode + "_" + packages + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } //TODO 5 项目名称+项目编码+分包信息+项目时间+采购单位 if projectCode != "" && packages != "" && projectDate != "" && buyer != "" { newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate + "_" + buyer id = hash(newName) if !cache.Contains(uint32(id)) { return newName } } else { newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate + "_" + buyer + "_" + createDate id = hash(newName) if !cache.Contains(uint32(id)) { return newName } else { newName = "" } } return } // GetPackages 获取对应的分包 func GetPackages(title string) (res []string) { // 定义正则表达式 rea := regexp.MustCompile(`包\d{1,2}[-~、]\d{1,2}|\d{1,2}[-~、]\d{1,2}包`) //1-6包;01-06包;01、02包;包1、包2 //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19" packages := rea.FindAllString(util.ObjToString(title), -1) //匹配的包 if len(packages) > 0 { res = append(res, packages...) } reb := regexp.MustCompile(`(标段[1-9一二三四五六七八九]|[1-9一二三四五六七八九]标段|包[1-9一二三四五六七八九]?[0-9]|[1-9一二三四五六七八九]?[0-9]包|[a-kA-K]包)`) // 标题只有一个包2 pgs := reb.FindAllString(title, -1) if len(pgs) > 0 { for _, v := range pgs { if !IsInStringArray(v, res) { res = append(res, v) } } } return res } // IsInStringArray 判断数组中是否存在字符串 func IsInStringArray(str string, arr []string) bool { // 先对字符串数组进行排序 sort.Strings(arr) // 使用二分查找算法查找字符串 pos := sort.SearchStrings(arr, str) // 如果找到了则返回 true,否则返回 false return pos < len(arr) && arr[pos] == str } // removeDuplicates 去除数据中重复包,并合并连续数字包 func removeDuplicates(data []string) []string { // 存储已存在的包号 existingPackages := make(map[int]bool) // 存储包含包号信息的字符串 packages := make(map[int]string) // 匹配包号的正则表达式 re := regexp.MustCompile(`(包)(\d+)(?:-(\d+))?`) noexists := make([]string, 0) // 遍历数据 for _, item := range data { // 提取包号信息 matches := re.FindStringSubmatch(item) if len(matches) < 3 { noexists = append(noexists, item) continue } // 解析包号 start, _ := strconv.Atoi(matches[2]) end := start if len(matches[3]) > 0 { end, _ = strconv.Atoi(matches[3]) } // 添加到已存在的包号中 for i := start; i <= end; i++ { existingPackages[i] = true } // 将包含包号信息的字符串存储到 packages 中 packages[start] = matches[0] } // 从 map 中提取去重后的包号并排序 var uniquePackages []int for packageNum := range existingPackages { uniquePackages = append(uniquePackages, packageNum) } sort.Ints(uniquePackages) // 将连续的包号转换为包含范围的字符串 var result []string var start, end int for i, num := range uniquePackages { if i == 0 { start = num end = num } else if num == end+1 { end = num } else { if start == end { result = append(result, packages[start]) } else { result = append(result, fmt.Sprintf("包%d-%d", start, end)) } start = num end = num } } if start == end { result = append(result, packages[start]) } else { result = append(result, fmt.Sprintf("包%d-%d", start, end)) } result = append(result, noexists...) return result }