analysisManager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package userAnalysis
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "strings"
  8. )
  9. type (
  10. UserAnalysis struct {
  11. UserMapping map[string]BaseUserId //职位id、mongouserId对应的baseUserId
  12. EntUserIdMapping map[int64]BaseUserId //ent_id对应的baseUserId
  13. FullBaseUserId map[BaseUserId]bool //全量BaseUserId
  14. BinPhone, BindMail map[BaseUserId]bool
  15. Vip, Vip15, Vip30 map[BaseUserId]bool
  16. Member, MemberExpire map[BaseUserId]bool
  17. UnSubUser map[string]bool //取关
  18. NewUser map[int]map[BaseUserId]bool //新用户
  19. TestUser map[BaseUserId]bool
  20. }
  21. BaseUserId int64
  22. AnalysisRes struct {
  23. Name, Code string //标签名字
  24. Data map[BaseUserId]bool //数据
  25. }
  26. )
  27. // UpdateTag 更新标签对应的用户Bitmap,因sql太长所以拆分成batchSize插入
  28. func (ar *AnalysisRes) UpdateTag(ctx context.Context) {
  29. const batchSize = 5000
  30. var (
  31. updateBatch = [][]string{}
  32. tmpArr = make([]string, 0, batchSize)
  33. total = len(ar.Data)
  34. index = 0
  35. )
  36. g.Log().Infof(ctx, "UpdateTag code %s 更新%d个开始", ar.Code, len(ar.Data))
  37. for id, ok := range ar.Data {
  38. if !ok {
  39. continue
  40. }
  41. tmpArr = append(tmpArr, fmt.Sprintf("toUInt64(%d)", id))
  42. if len(tmpArr) == batchSize {
  43. updateBatch = append(updateBatch, tmpArr)
  44. tmpArr = make([]string, 0, batchSize)
  45. }
  46. index++
  47. if index == total {
  48. updateBatch = append(updateBatch, tmpArr)
  49. }
  50. }
  51. if len(updateBatch) == 0 {
  52. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([toUInt64(0)]) WHERE code = '%v';`, ar.Code)
  53. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  54. g.Log().Errorf(ctx, "更新标签%s 滞空异常 %v", ar.Code, err)
  55. }
  56. } else {
  57. for i, batch := range updateBatch {
  58. if i == 0 {
  59. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([%v]) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  60. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  61. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  62. }
  63. } else {
  64. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapOr(bitobj,bitmapBuild([%v])) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  65. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  66. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  67. }
  68. }
  69. }
  70. }
  71. g.Log().Infof(ctx, "UpdateTag code %s 更新%d个完成", ar.Code, len(ar.Data))
  72. }
  73. func NewManager() *UserAnalysis {
  74. return &UserAnalysis{
  75. UserMapping: map[string]BaseUserId{},
  76. EntUserIdMapping: map[int64]BaseUserId{},
  77. FullBaseUserId: map[BaseUserId]bool{},
  78. BinPhone: map[BaseUserId]bool{},
  79. BindMail: map[BaseUserId]bool{},
  80. Vip: map[BaseUserId]bool{},
  81. Vip15: map[BaseUserId]bool{},
  82. Vip30: map[BaseUserId]bool{},
  83. Member: map[BaseUserId]bool{},
  84. MemberExpire: map[BaseUserId]bool{},
  85. TestUser: map[BaseUserId]bool{},
  86. UnSubUser: map[string]bool{},
  87. NewUser: map[int]map[BaseUserId]bool{},
  88. }
  89. }
  90. func (ua *UserAnalysis) LoadMapping() error {
  91. //加载baseUserId对应关系
  92. data, err := g.DB().Query(ctx, "SELECT mgoUserId,positionId,baseUserId,phone FROM dwd_mgo_position")
  93. if err != nil {
  94. return err
  95. }
  96. var (
  97. newMapping = map[string]BaseUserId{}
  98. fullBaseUserId = map[BaseUserId]bool{}
  99. phoneBaseUserIdMapping = map[string]BaseUserId{}
  100. entIdBaseUserIdMapping = map[int64]BaseUserId{}
  101. )
  102. for _, m := range data.List() {
  103. var (
  104. mgoUserId = gconv.String(m["mgoUserId"])
  105. positionId = gconv.String(m["positionId"])
  106. baseUserId = BaseUserId(gconv.Int64(m["baseUserId"]))
  107. phone = gconv.String(m["phone"])
  108. )
  109. newMapping[mgoUserId] = baseUserId
  110. newMapping[positionId] = baseUserId
  111. fullBaseUserId[baseUserId] = true
  112. if phone != "" {
  113. phoneBaseUserIdMapping[phone] = baseUserId
  114. }
  115. }
  116. ua.UserMapping = newMapping
  117. ua.FullBaseUserId = fullBaseUserId
  118. //加载ent_id和BaseUserId对应关系
  119. dataEntRes, err := g.DB("jianyu").Query(ctx, "SELECT id,phone FROM entniche_user")
  120. if err != nil {
  121. return err
  122. }
  123. for _, m := range dataEntRes {
  124. var (
  125. entId = gconv.Int64(m["id"])
  126. phone = gconv.String(m["phone"])
  127. )
  128. if bId, ok := phoneBaseUserIdMapping[phone]; ok {
  129. entIdBaseUserIdMapping[entId] = bId
  130. }
  131. }
  132. ua.EntUserIdMapping = entIdBaseUserIdMapping
  133. return nil
  134. }