|
@@ -0,0 +1,78 @@
|
|
|
+package userTag
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "github.com/gogf/gf/v2/frame/g"
|
|
|
+ "github.com/gogf/gf/v2/util/gconv"
|
|
|
+ "log"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+func BaseUserTag(ctx context.Context) {
|
|
|
+ execError := func() error {
|
|
|
+ if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp
|
|
|
+(
|
|
|
+
|
|
|
+ baseUserId UInt64,
|
|
|
+
|
|
|
+ bitobj AggregateFunction(groupBitmap,
|
|
|
+ UInt64)
|
|
|
+)
|
|
|
+ENGINE = AggregatingMergeTree
|
|
|
+ORDER BY baseUserId
|
|
|
+SETTINGS index_granularity = 8192;`)); err != nil {
|
|
|
+ g.Log().Errorf(ctx, "创建表异常%v", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ // 迁移数据
|
|
|
+ if _, err := g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
|
|
|
+SELECT * FROM dwd_d_user_tag;`)); err != nil {
|
|
|
+ g.Log().Errorf(ctx, "数据迁移异常%v", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ userTags := make(map[int64][]string)
|
|
|
+ tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag")
|
|
|
+ baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM user_log_byHour")
|
|
|
+ if err == nil && !baseUser.IsEmpty() {
|
|
|
+ for _, m := range baseUser.List() {
|
|
|
+ baseUserId := gconv.Int64(m["baseUserId"])
|
|
|
+ for _, re := range tagUser.List() {
|
|
|
+ id := gconv.String(re["id"])
|
|
|
+ for _, value := range gconv.Int64s(re["bitobj"]) {
|
|
|
+ if value == baseUserId {
|
|
|
+ userTags[baseUserId] = append(userTags[baseUserId], fmt.Sprintf(`toUInt64(%s)`, id))
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(userTags) == 500 { //插入新表
|
|
|
+ var sql []string
|
|
|
+ for i, i2 := range userTags {
|
|
|
+ sql = append(sql, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, i, strings.Join(i2, ",")))
|
|
|
+ }
|
|
|
+ if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag
|
|
|
+VALUES %s`, strings.Join(sql, ","))); err != nil {
|
|
|
+ log.Println(" 新插入 dwd_d_user_tag 表数据异常")
|
|
|
+ }
|
|
|
+ userTags = map[int64][]string{}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(userTags) > 0 { //插入新表
|
|
|
+ var sql []string
|
|
|
+ for i, i2 := range userTags {
|
|
|
+ sql = append(sql, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, i, strings.Join(i2, ",")))
|
|
|
+ }
|
|
|
+ if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag
|
|
|
+VALUES %s`, strings.Join(sql, ","))); err != nil {
|
|
|
+ log.Println(" 新插入 dwd_d_user_tag 表数据异常")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if execError != nil {
|
|
|
+ g.DB().Exec(ctx, "DROP TABLE dwd_d_user_tag_tmp;")
|
|
|
+ return
|
|
|
+ }
|
|
|
+}
|