|
@@ -3,76 +3,90 @@ package userTag
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
+ "github.com/gogf/gf/v2/database/gdb"
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
|
"log"
|
|
|
"strings"
|
|
|
)
|
|
|
|
|
|
-func BaseUserTag(ctx context.Context) {
|
|
|
- execError := func() error {
|
|
|
- if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp
|
|
|
-(
|
|
|
+var (
|
|
|
+ ctx = context.Background()
|
|
|
+)
|
|
|
|
|
|
- baseUserId UInt64,
|
|
|
+func BaseUserTag() error {
|
|
|
+ if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp
|
|
|
+ (
|
|
|
|
|
|
- bitobj AggregateFunction(groupBitmap,
|
|
|
- UInt64)
|
|
|
-)
|
|
|
-ENGINE = AggregatingMergeTree
|
|
|
-ORDER BY baseUserId
|
|
|
-SETTINGS index_granularity = 8192;`)); err != nil {
|
|
|
- g.Log().Errorf(ctx, "创建表异常%v", err)
|
|
|
- return err
|
|
|
- }
|
|
|
- // 迁移数据
|
|
|
- if _, err := g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
|
|
|
-SELECT * FROM dwd_d_user_tag;`)); err != nil {
|
|
|
- g.Log().Errorf(ctx, "数据迁移异常%v", err)
|
|
|
- return err
|
|
|
- }
|
|
|
- userTags := make(map[int64][]string)
|
|
|
- tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag")
|
|
|
- baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM user_log_byHour")
|
|
|
- if err == nil && !baseUser.IsEmpty() {
|
|
|
- for _, m := range baseUser.List() {
|
|
|
- baseUserId := gconv.Int64(m["baseUserId"])
|
|
|
- for _, re := range tagUser.List() {
|
|
|
- id := gconv.String(re["id"])
|
|
|
- for _, value := range gconv.Int64s(re["bitobj"]) {
|
|
|
- if value == baseUserId {
|
|
|
- userTags[baseUserId] = append(userTags[baseUserId], fmt.Sprintf(`toUInt64(%s)`, id))
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if len(userTags) == 500 { //插入新表
|
|
|
- var sql []string
|
|
|
- for i, i2 := range userTags {
|
|
|
- sql = append(sql, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, i, strings.Join(i2, ",")))
|
|
|
- }
|
|
|
- if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag
|
|
|
-VALUES %s`, strings.Join(sql, ","))); err != nil {
|
|
|
- log.Println(" 新插入 dwd_d_user_tag 表数据异常")
|
|
|
- }
|
|
|
- userTags = map[int64][]string{}
|
|
|
- }
|
|
|
+ baseUserId UInt64,
|
|
|
+
|
|
|
+ bitobj AggregateFunction(groupBitmap,UInt64)
|
|
|
+ )
|
|
|
+ ENGINE = AggregatingMergeTree
|
|
|
+ ORDER BY baseUserId
|
|
|
+ SETTINGS index_granularity = 8192;`)); err != nil {
|
|
|
+ g.Log().Errorf(ctx, "创建表异常%v", err)
|
|
|
+ return err
|
|
|
+ } //创建表数据
|
|
|
+ defer func() {
|
|
|
+ g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_tmp;`)
|
|
|
+ }()
|
|
|
+ //统计插入user数据
|
|
|
+ var userTags []string
|
|
|
+ tagUsers := make(map[int64][]string)
|
|
|
+ tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag")
|
|
|
+ if tagUser != nil {
|
|
|
+ for _, re := range tagUser.List() {
|
|
|
+ id := gconv.Int64(re["id"])
|
|
|
+ //user := make(map[int64]bool)
|
|
|
+ for _, value := range gconv.Int64s(re["bitobj"]) {
|
|
|
+ //user[value] = true
|
|
|
+ tagUsers[value] = append(tagUsers[value], fmt.Sprintf(`toUInt64(%d)`, id))
|
|
|
}
|
|
|
+ //tagUsers[id] = &user
|
|
|
}
|
|
|
- if len(userTags) > 0 { //插入新表
|
|
|
- var sql []string
|
|
|
- for i, i2 := range userTags {
|
|
|
- sql = append(sql, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, i, strings.Join(i2, ",")))
|
|
|
+ }
|
|
|
+
|
|
|
+ baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM dwd_mgo_position")
|
|
|
+ if err == nil && !baseUser.IsEmpty() {
|
|
|
+ log.Println("len===", len(tagUsers), baseUser.Len())
|
|
|
+ for _, m := range baseUser.List() {
|
|
|
+ baseUserId := gconv.Int64(m["baseUserId"])
|
|
|
+ if vs, ok := tagUsers[baseUserId]; ok {
|
|
|
+ userTags = append(userTags, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, baseUserId, strings.Join(vs, ",")))
|
|
|
+ delete(tagUsers, baseUserId)
|
|
|
}
|
|
|
- if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag
|
|
|
-VALUES %s`, strings.Join(sql, ","))); err != nil {
|
|
|
- log.Println(" 新插入 dwd_d_user_tag 表数据异常")
|
|
|
+
|
|
|
+ if len(userTags) == 2000 { //插入新表
|
|
|
+ if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
|
|
|
+VALUES %s`, strings.Join(userTags, ","))); err != nil {
|
|
|
+ log.Println(" 新插入 dwd_d_user_tag 表数据异常")
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ userTags = []string{}
|
|
|
}
|
|
|
}
|
|
|
- return nil
|
|
|
}
|
|
|
- if execError != nil {
|
|
|
- g.DB().Exec(ctx, "DROP TABLE dwd_d_user_tag_tmp;")
|
|
|
- return
|
|
|
+ if len(userTags) > 0 { //插入新表
|
|
|
+ if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
|
|
|
+VALUES %s`, strings.Join(userTags, ","))); err != nil {
|
|
|
+ log.Println(" 新插入 dwd_d_user_tag 表数据异常", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ //事务操作修改表名
|
|
|
+ err = g.DB().Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
|
|
+ _, err = g.DB().Exec(ctx, "DROP TABLE dwd_d_user_tag;")
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag_tmp TO dwd_d_user_tag;")
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
+ return err
|
|
|
}
|