analysisManager.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package userAnalysis
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "strings"
  8. )
  9. type (
  10. UserAnalysis struct {
  11. UserMapping map[string]BaseUserId //职位id、mongouserId对应的baseUserId
  12. EntUserIdMapping map[int64]BaseUserId //ent_id对应的baseUserId
  13. BinPhone, BindMail map[BaseUserId]bool
  14. Vip, Vip15, Vip30 map[BaseUserId]bool
  15. Member, MemberExpire map[BaseUserId]bool
  16. }
  17. BaseUserId int64
  18. AnalysisRes struct {
  19. Name, Code string //标签名字
  20. Data map[BaseUserId]bool //数据
  21. }
  22. )
  23. // UpdateTag 更新标签对应的用户Bitmap,因sql太长所以拆分成batchSize插入
  24. func (ar *AnalysisRes) UpdateTag(ctx context.Context) {
  25. const batchSize = 5000
  26. var (
  27. updateBatch = [][]string{}
  28. tmpArr = make([]string, 0, batchSize)
  29. total = len(ar.Data)
  30. index = 0
  31. )
  32. for id, ok := range ar.Data {
  33. if !ok {
  34. continue
  35. }
  36. tmpArr = append(tmpArr, fmt.Sprintf("toUInt64(%d)", id))
  37. if len(tmpArr) == batchSize {
  38. updateBatch = append(updateBatch, tmpArr)
  39. tmpArr = make([]string, 0, batchSize)
  40. }
  41. index++
  42. if index == total {
  43. updateBatch = append(updateBatch, tmpArr)
  44. }
  45. }
  46. for i, batch := range updateBatch {
  47. if i == 0 {
  48. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([%v]) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  49. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  50. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  51. }
  52. } else {
  53. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapOr(bitobj,bitmapBuild([%v])) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  54. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  55. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  56. }
  57. }
  58. }
  59. g.Log().Infof(ctx, "code %s 更新%d个完成", ar.Code, len(ar.Data))
  60. }
  61. func NewManager() *UserAnalysis {
  62. return &UserAnalysis{
  63. UserMapping: map[string]BaseUserId{},
  64. BinPhone: map[BaseUserId]bool{},
  65. BindMail: map[BaseUserId]bool{},
  66. Vip: map[BaseUserId]bool{},
  67. Vip15: map[BaseUserId]bool{},
  68. Vip30: map[BaseUserId]bool{},
  69. Member: map[BaseUserId]bool{},
  70. MemberExpire: map[BaseUserId]bool{},
  71. }
  72. }
  73. func (ua *UserAnalysis) LoadMapping() error {
  74. //加载baseUserId对应关系
  75. data, err := g.DB().Query(ctx, "SELECT mgoUserId,positionId,baseUserId,phone FROM dwd_mgo_position")
  76. if err != nil {
  77. return err
  78. }
  79. var (
  80. newMapping = map[string]BaseUserId{}
  81. phoneBaseUserIdMapping = map[string]BaseUserId{}
  82. entIdBaseUserIdMapping = map[int64]BaseUserId{}
  83. )
  84. for _, m := range data.List() {
  85. var (
  86. mgoUserId = gconv.String(m["mgoUserId"])
  87. positionId = gconv.String(m["positionId"])
  88. baseUserId = BaseUserId(gconv.Int64(m["baseUserId"]))
  89. phone = gconv.String(m["phone"])
  90. )
  91. newMapping[mgoUserId] = baseUserId
  92. newMapping[positionId] = baseUserId
  93. if phone != "" {
  94. phoneBaseUserIdMapping[phone] = baseUserId
  95. }
  96. }
  97. ua.UserMapping = newMapping
  98. //加载ent_id和BaseUserId对应关系
  99. dataEntRes, err := g.DB("jianyu").Query(ctx, "SELECT id,phone FROM entniche_user")
  100. if err != nil {
  101. return err
  102. }
  103. for _, m := range dataEntRes {
  104. var (
  105. entId = gconv.Int64(m["id"])
  106. phone = gconv.String(m["phone"])
  107. )
  108. if bId, ok := phoneBaseUserIdMapping[phone]; ok {
  109. entIdBaseUserIdMapping[entId] = bId
  110. }
  111. }
  112. ua.EntUserIdMapping = entIdBaseUserIdMapping
  113. return nil
  114. }