analysisManager.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package userAnalysis
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "strings"
  8. )
  9. type (
  10. UserAnalysis struct {
  11. UserMapping map[string]BaseUserId //职位id、mongouserId对应的baseUserId
  12. BinPhone, BindMail map[BaseUserId]bool
  13. Vip, Vip15, Vip30 map[BaseUserId]bool
  14. Member, MemberExpire map[BaseUserId]bool
  15. }
  16. BaseUserId int64
  17. AnalysisRes struct {
  18. Name, Code string //标签名字
  19. Data map[BaseUserId]bool //数据
  20. }
  21. )
  22. // UpdateTag 更新标签对应的用户Bitmap,因sql太长所以拆分成batchSize插入
  23. func (ar *AnalysisRes) UpdateTag(ctx context.Context) {
  24. const batchSize = 5000
  25. var (
  26. updateBatch = [][]string{}
  27. tmpArr = make([]string, 0, batchSize)
  28. total = len(ar.Data)
  29. index = 0
  30. )
  31. for id, ok := range ar.Data {
  32. if !ok {
  33. continue
  34. }
  35. tmpArr = append(tmpArr, fmt.Sprintf("toUInt64(%d)", id))
  36. if len(tmpArr) == batchSize {
  37. updateBatch = append(updateBatch, tmpArr)
  38. tmpArr = make([]string, 0, batchSize)
  39. }
  40. index++
  41. if index == total {
  42. updateBatch = append(updateBatch, tmpArr)
  43. }
  44. }
  45. for i, batch := range updateBatch {
  46. if i == 0 {
  47. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([%v]) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  48. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  49. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  50. }
  51. } else {
  52. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapOr(bitobj,bitmapBuild([%v])) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  53. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  54. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  55. }
  56. }
  57. }
  58. g.Log().Infof(ctx, "code %s 更新%d个完成", ar.Code, len(ar.Data))
  59. }
  60. func NewManager() *UserAnalysis {
  61. return &UserAnalysis{
  62. UserMapping: map[string]BaseUserId{},
  63. BinPhone: map[BaseUserId]bool{},
  64. BindMail: map[BaseUserId]bool{},
  65. Vip: map[BaseUserId]bool{},
  66. Vip15: map[BaseUserId]bool{},
  67. Vip30: map[BaseUserId]bool{},
  68. Member: map[BaseUserId]bool{},
  69. MemberExpire: map[BaseUserId]bool{},
  70. }
  71. }
  72. func (ua *UserAnalysis) LoadMapping() error {
  73. data, err := g.DB().Query(ctx, "SELECT mgoUserId,positionId,baseUserId FROM dwd_mgo_position")
  74. if err != nil {
  75. return err
  76. }
  77. newMapping := map[string]BaseUserId{}
  78. for _, m := range data.List() {
  79. var (
  80. mgoUserId = gconv.String(m["mgoUserId"])
  81. positionId = gconv.String(m["positionId"])
  82. baseUserId = BaseUserId(gconv.Int64(m["baseUserId"]))
  83. )
  84. newMapping[mgoUserId] = baseUserId
  85. newMapping[positionId] = baseUserId
  86. }
  87. ua.UserMapping = newMapping
  88. return nil
  89. }