main.go 11 KB


  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/RoaringBitmap/roaring"
  6. "github.com/cespare/xxhash/v2"
  7. "io/ioutil"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "os"
  13. "os/signal"
  14. "regexp"
  15. "sort"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "syscall"
  20. "time"
  21. )
  22. var (
  23. dbfile = flag.String("dbfile", "./db", "数据库文件")
  24. cache = roaring.NewBitmap()
  25. cacheModify = false //控制10秒 定时写入文件
  26. mutex sync.Mutex // 互斥锁,用于保护 cache 的并发写入操作
  27. )
  28. func init() {
  29. _, err := os.Stat(*dbfile)
  30. if !os.IsNotExist(err) {
  31. bs, err := ioutil.ReadFile(*dbfile)
  32. if err != nil {
  33. log.Fatal(err)
  34. }
  35. if len(bs) > 0 {
  36. cache.FromBuffer(bs)
  37. }
  38. }
  39. //监听,写入文件保存
  40. go func() {
  41. for {
  42. time.Sleep(10 * time.Second)
  43. if cacheModify {
  44. saveDb()
  45. cacheModify = false
  46. }
  47. }
  48. }()
  49. }
  50. func main() {
  51. MgoB := &mongodb.MongodbSim{
  52. MongodbAddr: "172.17.4.85:27080",
  53. //MongodbAddr: "192.168.3.206:27002",
  54. Size: 10,
  55. DbName: "qfw",
  56. //UserName: "root",
  57. //Password: "root",
  58. }
  59. MgoB.InitPool()
  60. Es := &elastic.Elastic{
  61. //S_esurl: "http://192.168.3.149:9201",
  62. S_esurl: "http://172.17.4.184:19805",
  63. I_size: 5,
  64. Username: "es_all",
  65. Password: "TopJkO2E_d1x",
  66. }
  67. Es.InitElasticSize()
  68. sess := MgoB.GetMgoConn()
  69. defer MgoB.DestoryMongoConn(sess)
  70. //
  71. where := map[string]interface{}{
  72. "pici": map[string]interface{}{
  73. //"$gte": 0,
  74. //"$lte": 1710381363,
  75. "$gt": 1710381363,
  76. "$lte": 1710468406,
  77. },
  78. }
  79. query := sess.DB("qfw").C("projectset_20230904").Find(where).Select(nil).Iter()
  80. count := 0
  81. lastID := ""
  82. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  83. id := mongodb.BsonIdToSId(tmp["_id"])
  84. lastID = id
  85. if count%10000 == 0 {
  86. log.Println("current:", count, id)
  87. }
  88. name := getNewName(tmp)
  89. if name != "" {
  90. update := make(map[string]interface{})
  91. update["subtitle_projectname"] = name
  92. MgoB.UpdateById("projectset_20230904", id, map[string]interface{}{"$set": update})
  93. //Es.UpdateDocument("projectset", id, update)
  94. }
  95. tmp = make(map[string]interface{})
  96. }
  97. log.Println("结束", lastID)
  98. //监听异常退出信号;及时保存数据
  99. signalChan := make(chan os.Signal, 1)
  100. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  101. <-signalChan
  102. log.Println("程序退出")
  103. saveDb()
  104. }
  105. // getNewName 获取新的不重复名称
  106. func getNewName(tmp map[string]interface{}) string {
  107. projectName := util.ObjToString(tmp["projectname"])
  108. projectCode := util.ObjToString(tmp["projectcode"])
  109. buyer := util.ObjToString(tmp["buyer"])
  110. firsttime := util.Int64All(tmp["firsttime"])
  111. createtime := util.Int64All(tmp["createtime"])
  112. var projectDate, createDate string
  113. if firsttime > 0 {
  114. projectDate = time.Unix(firsttime, 0).Format("2006-01-02")
  115. }
  116. if createtime > 0 {
  117. createDate = time.Unix(createtime, 0).Format("2006-01-02")
  118. }
  119. var matchWords = make([]string, 0)
  120. if list, ok := tmp["list"].([]interface{}); ok {
  121. if len(list) > 0 {
  122. for _, v := range list {
  123. if da, ok := v.(map[string]interface{}); ok {
  124. title := util.ObjToString(da["title"])
  125. // 使用正则表达式进行匹配
  126. matches := GetPackages(title)
  127. for _, v := range matches {
  128. if !IsInStringArray(v, matchWords) {
  129. matchWords = append(matchWords, v)
  130. }
  131. }
  132. }
  133. }
  134. }
  135. }
  136. //pks := removeDuplicates(matchWords)
  137. packages := strings.Join(matchWords, "、")
  138. return RenameProjectName(projectName, projectCode, packages, projectDate, buyer, createDate)
  139. }
  140. // saveDb 文件写入
  141. func saveDb() {
  142. mutex.Lock()
  143. defer mutex.Unlock()
  144. // 如果 cache 为空,则无需执行写入操作
  145. if cache.GetCardinality() == 0 {
  146. return
  147. }
  148. fo, err := os.OpenFile(*dbfile, os.O_CREATE|os.O_RDWR|os.O_SYNC|os.O_TRUNC, 0777)
  149. if err != nil {
  150. log.Fatal(err)
  151. }
  152. defer fo.Close()
  153. cache.WriteTo(fo)
  154. }
  155. // hash 计算hash
  156. func hash(src string) uint64 {
  157. return xxhash.Sum64String(src)
  158. }
  159. // RenameProjectName 获取新的不重复的项目名称
  160. func RenameProjectName(projectName, projectCode, packages, projectDate, buyer, createDate string) (newName string) {
  161. //TODO 1.判断项目名称是否重复
  162. var id uint64
  163. defer func() {
  164. if id > 0 && newName != "" {
  165. cache.Add(uint32(id))
  166. cacheModify = true
  167. }
  168. }()
  169. //1.项目名称
  170. if projectName != "" {
  171. id = hash(projectName)
  172. if !cache.Contains(uint32(id)) {
  173. newName = projectName
  174. return projectName
  175. }
  176. }
  177. //TODO 2.1 项目名称+项目编码
  178. if projectCode != "" {
  179. newName = projectName + "_" + projectCode
  180. id = hash(newName)
  181. if !cache.Contains(uint32(id)) {
  182. return newName
  183. }
  184. }
  185. //TODO 2.2 项目名称+分包信息
  186. if packages != "" {
  187. newName = projectName + "_" + packages
  188. id = hash(newName)
  189. if !cache.Contains(uint32(id)) {
  190. return newName
  191. }
  192. }
  193. //TODO 2.3 项目名称+项目时间
  194. if projectDate != "" {
  195. newName = projectName + "_" + projectDate
  196. id = hash(newName)
  197. if !cache.Contains(uint32(id)) {
  198. return newName
  199. }
  200. }
  201. //TODO 2.4 项目名称+采购单位名称
  202. if buyer != "" {
  203. newName = projectName + "_" + buyer
  204. id = hash(newName)
  205. if !cache.Contains(uint32(id)) {
  206. return newName
  207. }
  208. }
  209. //TODO 3.1 项目名称+项目编码+分包信息
  210. if projectCode != "" && packages != "" {
  211. newName = projectName + "_" + projectCode + "_" + packages
  212. id = hash(newName)
  213. if !cache.Contains(uint32(id)) {
  214. return newName
  215. }
  216. }
  217. //TODO 3.2 项目名称+项目编码+项目时间
  218. if projectCode != "" && projectDate != "" {
  219. newName = projectName + "_" + projectCode + "_" + projectDate
  220. id = hash(newName)
  221. if !cache.Contains(uint32(id)) {
  222. return newName
  223. }
  224. }
  225. //TODO 3.3 项目名称+项目编码+采购单位
  226. if projectCode != "" && buyer != "" {
  227. newName = projectName + "_" + projectCode + "_" + buyer
  228. id = hash(newName)
  229. if !cache.Contains(uint32(id)) {
  230. return newName
  231. }
  232. }
  233. //TODO 3.4 项目名称+分包+项目时间
  234. if packages != "" && projectDate != "" {
  235. newName = projectName + "_" + packages + "_" + projectDate
  236. id = hash(newName)
  237. if !cache.Contains(uint32(id)) {
  238. return newName
  239. }
  240. }
  241. //TODO 3.5 项目名称+分包+采购单位
  242. if packages != "" && buyer != "" {
  243. newName = projectName + "_" + packages + "_" + buyer
  244. id = hash(newName)
  245. if !cache.Contains(uint32(id)) {
  246. return newName
  247. }
  248. }
  249. //TODO 3.6 项目名称+项目时间+采购单位
  250. if projectDate != "" && buyer != "" {
  251. newName = projectName + "_" + projectDate + "_" + buyer
  252. id = hash(newName)
  253. if !cache.Contains(uint32(id)) {
  254. return newName
  255. }
  256. }
  257. //TODO 4.1 项目名称+项目编码+分包信息+项目时间
  258. if projectCode != "" && packages != "" && projectDate != "" {
  259. newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate
  260. id = hash(newName)
  261. if !cache.Contains(uint32(id)) {
  262. return newName
  263. }
  264. }
  265. //TODO 4.2 项目名称+项目编码+分包信息+采购单位
  266. if projectCode != "" && packages != "" && buyer != "" {
  267. newName = projectName + "_" + projectCode + "_" + packages + "_" + buyer
  268. id = hash(newName)
  269. if !cache.Contains(uint32(id)) {
  270. return newName
  271. }
  272. }
  273. //TODO 5 项目名称+项目编码+分包信息+项目时间+采购单位
  274. if projectCode != "" && packages != "" && projectDate != "" && buyer != "" {
  275. newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate + "_" + buyer
  276. id = hash(newName)
  277. if !cache.Contains(uint32(id)) {
  278. return newName
  279. }
  280. } else {
  281. newName = projectName + "_" + projectCode + "_" + packages + "_" + projectDate + "_" + buyer + "_" + createDate
  282. id = hash(newName)
  283. if !cache.Contains(uint32(id)) {
  284. return newName
  285. } else {
  286. newName = ""
  287. }
  288. }
  289. return
  290. }
  291. // GetPackages 获取对应的分包
  292. func GetPackages(title string) (res []string) {
  293. // 定义正则表达式
  294. rea := regexp.MustCompile(`包\d{1,2}[-~、]\d{1,2}|\d{1,2}[-~、]\d{1,2}包`) //1-6包;01-06包;01、02包;包1、包2
  295. //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19"
  296. packages := rea.FindAllString(util.ObjToString(title), -1) //匹配的包
  297. if len(packages) > 0 {
  298. res = append(res, packages...)
  299. }
  300. reb := regexp.MustCompile(`(标段[1-9一二三四五六七八九]|[1-9一二三四五六七八九]标段|包[1-9一二三四五六七八九]?[0-9]|[1-9一二三四五六七八九]?[0-9]包|[a-kA-K]包)`) // 标题只有一个包2
  301. pgs := reb.FindAllString(title, -1)
  302. if len(pgs) > 0 {
  303. for _, v := range pgs {
  304. if !IsInStringArray(v, res) {
  305. res = append(res, v)
  306. }
  307. }
  308. }
  309. return res
  310. }
  311. // IsInStringArray 判断数组中是否存在字符串
  312. func IsInStringArray(str string, arr []string) bool {
  313. // 先对字符串数组进行排序
  314. sort.Strings(arr)
  315. // 使用二分查找算法查找字符串
  316. pos := sort.SearchStrings(arr, str)
  317. // 如果找到了则返回 true,否则返回 false
  318. return pos < len(arr) && arr[pos] == str
  319. }
  320. // removeDuplicates 去除数据中重复包,并合并连续数字包
  321. func removeDuplicates(data []string) []string {
  322. // 存储已存在的包号
  323. existingPackages := make(map[int]bool)
  324. // 存储包含包号信息的字符串
  325. packages := make(map[int]string)
  326. // 匹配包号的正则表达式
  327. re := regexp.MustCompile(`(包)(\d+)(?:-(\d+))?`)
  328. noexists := make([]string, 0)
  329. // 遍历数据
  330. for _, item := range data {
  331. // 提取包号信息
  332. matches := re.FindStringSubmatch(item)
  333. if len(matches) < 3 {
  334. noexists = append(noexists, item)
  335. continue
  336. }
  337. // 解析包号
  338. start, _ := strconv.Atoi(matches[2])
  339. end := start
  340. if len(matches[3]) > 0 {
  341. end, _ = strconv.Atoi(matches[3])
  342. }
  343. // 添加到已存在的包号中
  344. for i := start; i <= end; i++ {
  345. existingPackages[i] = true
  346. }
  347. // 将包含包号信息的字符串存储到 packages 中
  348. packages[start] = matches[0]
  349. }
  350. // 从 map 中提取去重后的包号并排序
  351. var uniquePackages []int
  352. for packageNum := range existingPackages {
  353. uniquePackages = append(uniquePackages, packageNum)
  354. }
  355. sort.Ints(uniquePackages)
  356. // 将连续的包号转换为包含范围的字符串
  357. var result []string
  358. var start, end int
  359. for i, num := range uniquePackages {
  360. if i == 0 {
  361. start = num
  362. end = num
  363. } else if num == end+1 {
  364. end = num
  365. } else {
  366. if start == end {
  367. result = append(result, packages[start])
  368. } else {
  369. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  370. }
  371. start = num
  372. end = num
  373. }
  374. }
  375. if start == end {
  376. result = append(result, packages[start])
  377. } else {
  378. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  379. }
  380. result = append(result, noexists...)
  381. return result
  382. }