analysisManager.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package userAnalysis
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "github.com/gogf/gf/v2/util/gconv"
  8. "strings"
  9. )
  10. type (
  11. UserAnalysis struct {
  12. UserMapping map[string]BaseUserId //职位id、mongouserId对应的baseUserId
  13. //BaseUserClubMap map[string]BaseUserId //电销线索
  14. TelemarketingBaseUserMap map[string]BaseUserId //电销线索
  15. EntUserIdMapping map[int64]BaseUserId //ent_id对应的baseUserId
  16. FullBaseUserId map[BaseUserId]bool //全量BaseUserId
  17. BinPhone, BindMail map[BaseUserId]bool
  18. Vip, Vip15, Vip30 map[BaseUserId]bool
  19. VipExpired30 map[BaseUserId]bool //已到期30天
  20. VipGift7DayExpired map[BaseUserId]bool //赠送7天到期
  21. Member, MemberExpire map[BaseUserId]bool
  22. UnSubUser map[string]bool //取关
  23. NewUser map[int]map[BaseUserId]bool //新用户
  24. TestUser map[BaseUserId]bool
  25. }
  26. BaseUserId int64
  27. AnalysisRes struct {
  28. Name, Code string //标签名字
  29. Data map[BaseUserId]bool //数据
  30. SaveOldData bool //累计统计时需要设置为true 是否保留旧数据
  31. }
  32. )
  33. // UpdateTag 更新标签对应的用户Bitmap,因sql太长所以拆分成batchSize插入
  34. func (ar *AnalysisRes) UpdateTag(ctx context.Context) {
  35. const batchSize = 5000
  36. var (
  37. updateBatch = [][]string{}
  38. tmpArr = make([]string, 0, batchSize)
  39. total = len(ar.Data)
  40. index = 0
  41. )
  42. g.Log().Infof(ctx, "UpdateTag code %s 更新%d个开始", ar.Code, len(ar.Data))
  43. for id, ok := range ar.Data {
  44. if !ok {
  45. continue
  46. }
  47. tmpArr = append(tmpArr, fmt.Sprintf("toUInt64(%d)", id))
  48. if len(tmpArr) == batchSize {
  49. updateBatch = append(updateBatch, tmpArr)
  50. tmpArr = make([]string, 0, batchSize)
  51. }
  52. index++
  53. if index == total {
  54. updateBatch = append(updateBatch, tmpArr)
  55. }
  56. }
  57. if len(updateBatch) == 0 {
  58. if !ar.SaveOldData {
  59. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([toUInt64(0)]) WHERE code = '%v';`, ar.Code)
  60. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  61. g.Log().Errorf(ctx, "更新标签%s 滞空异常 %v", ar.Code, err)
  62. }
  63. }
  64. } else {
  65. for i, batch := range updateBatch {
  66. if i == 0 && !ar.SaveOldData {
  67. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapBuild([%v]) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  68. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  69. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  70. }
  71. } else {
  72. execSql := fmt.Sprintf(`ALTER TABLE dwd_d_tag UPDATE bitobj = bitmapOr(bitobj,bitmapBuild([%v])) WHERE code = '%v';`, strings.Join(batch, ","), ar.Code)
  73. if _, err := g.DB().Exec(ctx, execSql); err != nil {
  74. g.Log().Errorf(ctx, "更新标签%s [%d]异常 %v", ar.Code, i, err)
  75. }
  76. }
  77. }
  78. }
  79. g.Log().Infof(ctx, "UpdateTag code %s 更新%d个完成", ar.Code, len(ar.Data))
  80. }
  81. func NewManager() *UserAnalysis {
  82. return &UserAnalysis{
  83. UserMapping: map[string]BaseUserId{},
  84. EntUserIdMapping: map[int64]BaseUserId{},
  85. FullBaseUserId: map[BaseUserId]bool{},
  86. BinPhone: map[BaseUserId]bool{},
  87. BindMail: map[BaseUserId]bool{},
  88. Vip: map[BaseUserId]bool{},
  89. Vip15: map[BaseUserId]bool{},
  90. Vip30: map[BaseUserId]bool{},
  91. VipExpired30: map[BaseUserId]bool{}, //已到期30天
  92. VipGift7DayExpired: map[BaseUserId]bool{}, //赠送7天到期
  93. Member: map[BaseUserId]bool{},
  94. MemberExpire: map[BaseUserId]bool{},
  95. TestUser: map[BaseUserId]bool{},
  96. UnSubUser: map[string]bool{},
  97. NewUser: map[int]map[BaseUserId]bool{},
  98. }
  99. }
  100. func (ua *UserAnalysis) LoadMapping() error {
  101. ctx := gctx.New()
  102. //加载baseUserId对应关系
  103. data, err := g.DB().Query(ctx, "SELECT mgoUserId,positionId,baseUserId,phone FROM dwd_mgo_position")
  104. if err != nil {
  105. return err
  106. }
  107. var (
  108. newMapping = map[string]BaseUserId{}
  109. fullBaseUserId = map[BaseUserId]bool{}
  110. phoneBaseUserIdMapping = map[string]BaseUserId{}
  111. entIdBaseUserIdMapping = map[int64]BaseUserId{}
  112. telemarketingBaseUserMap = map[string]BaseUserId{}
  113. )
  114. for _, m := range data.List() {
  115. var (
  116. mgoUserId = gconv.String(m["mgoUserId"])
  117. positionId = gconv.String(m["positionId"])
  118. baseUserId = BaseUserId(gconv.Int64(m["baseUserId"]))
  119. phone = gconv.String(m["phone"])
  120. )
  121. newMapping[mgoUserId] = baseUserId
  122. newMapping[positionId] = baseUserId
  123. fullBaseUserId[baseUserId] = true
  124. if phone != "" {
  125. phoneBaseUserIdMapping[phone] = baseUserId
  126. }
  127. }
  128. ua.UserMapping = newMapping
  129. ua.FullBaseUserId = fullBaseUserId
  130. //加载ent_id和BaseUserId对应关系
  131. dataEntRes, err := g.DB("jianyu").Query(ctx, "SELECT id,phone FROM entniche_user")
  132. if err != nil {
  133. return err
  134. }
  135. for _, m := range dataEntRes {
  136. var (
  137. entId = gconv.Int64(m["id"])
  138. phone = gconv.String(m["phone"])
  139. )
  140. if bId, ok := phoneBaseUserIdMapping[phone]; ok {
  141. entIdBaseUserIdMapping[entId] = bId
  142. }
  143. }
  144. ua.EntUserIdMapping = entIdBaseUserIdMapping
  145. //加载线索
  146. dwdBaseinfoRes, err := g.DB("subjectdb").Query(ctx, "SELECT uid,base_user_id FROM dwd_f_userbase_baseinfo WHERE userid!=''")
  147. if err != nil {
  148. return err
  149. }
  150. for _, m := range dwdBaseinfoRes.List() {
  151. var (
  152. uid = gconv.String(m["uid"])
  153. base_user_id = BaseUserId(gconv.Int64(m["base_user_id"]))
  154. )
  155. if uid != "" && base_user_id != 0 {
  156. telemarketingBaseUserMap[uid] = base_user_id
  157. }
  158. }
  159. clueRes, err := g.DB("subjectdb").Query(ctx, "SELECT id,uid FROM dwd_f_crm_clue_info WHERE userid!=''")
  160. if err != nil {
  161. return err
  162. }
  163. for _, m := range clueRes.List() {
  164. if value, ok := telemarketingBaseUserMap[gconv.String(m["uid"])]; ok {
  165. telemarketingBaseUserMap[gconv.String(m["id"])] = value
  166. }
  167. }
  168. ua.TelemarketingBaseUserMap = telemarketingBaseUserMap
  169. return nil
  170. }