userTag.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package userTag
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "log"
  8. "strings"
  9. )
  10. var (
  11. ctx = context.Background()
  12. )
  13. func BaseUserTag() error {
  14. defer func() {
  15. //清理备份数据
  16. g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_back;`)
  17. g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_tmp;`)
  18. }()
  19. if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp
  20. (
  21. baseUserId UInt64,
  22. bitobj AggregateFunction(groupBitmap,UInt64)
  23. )
  24. ENGINE = AggregatingMergeTree
  25. ORDER BY baseUserId
  26. SETTINGS index_granularity = 8192;`)); err != nil {
  27. g.Log().Errorf(ctx, "创建表异常%v", err)
  28. return err
  29. } //创建表数据
  30. //统计插入user数据
  31. var userTags []string
  32. tagUsers := make(map[int64][]string)
  33. tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag")
  34. if tagUser != nil {
  35. for _, re := range tagUser.List() {
  36. id := gconv.Int64(re["id"])
  37. //user := make(map[int64]bool)
  38. for _, value := range gconv.Int64s(re["bitobj"]) {
  39. //user[value] = true
  40. tagUsers[value] = append(tagUsers[value], fmt.Sprintf(`toUInt64(%d)`, id))
  41. }
  42. //tagUsers[id] = &user
  43. }
  44. }
  45. baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM dwd_mgo_position")
  46. if err == nil && !baseUser.IsEmpty() {
  47. log.Println("len===", len(tagUsers), baseUser.Len())
  48. for _, m := range baseUser.List() {
  49. baseUserId := gconv.Int64(m["baseUserId"])
  50. if vs, ok := tagUsers[baseUserId]; ok {
  51. userTags = append(userTags, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, baseUserId, strings.Join(vs, ",")))
  52. delete(tagUsers, baseUserId)
  53. }
  54. if len(userTags) == 2000 { //插入新表
  55. if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
  56. VALUES %s`, strings.Join(userTags, ","))); err != nil {
  57. log.Println(" 新插入 dwd_d_user_tag 表数据异常")
  58. return err
  59. }
  60. userTags = []string{}
  61. }
  62. }
  63. }
  64. if len(userTags) > 0 { //插入新表
  65. if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
  66. VALUES %s`, strings.Join(userTags, ","))); err != nil {
  67. log.Println(" 新插入 dwd_d_user_tag 表数据异常", err.Error())
  68. return err
  69. }
  70. }
  71. _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag TO dwd_d_user_tag_back;")
  72. if err != nil {
  73. return err
  74. }
  75. _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag_tmp TO dwd_d_user_tag;")
  76. if err != nil {
  77. return err
  78. }
  79. return err
  80. }