package userTag import ( "context" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "log" "strings" ) var ( ctx = context.Background() ) func BaseUserTag() error { defer func() { //清理备份数据 g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_back;`) g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_tmp;`) }() if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp ( baseUserId UInt64, bitobj AggregateFunction(groupBitmap,UInt64) ) ENGINE = AggregatingMergeTree ORDER BY baseUserId SETTINGS index_granularity = 8192;`)); err != nil { g.Log().Errorf(ctx, "创建表异常%v", err) return err } //创建表数据 //统计插入user数据 var userTags []string tagUsers := make(map[int64][]string) tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag") if tagUser != nil { for _, re := range tagUser.List() { id := gconv.Int64(re["id"]) //user := make(map[int64]bool) for _, value := range gconv.Int64s(re["bitobj"]) { //user[value] = true tagUsers[value] = append(tagUsers[value], fmt.Sprintf(`toUInt64(%d)`, id)) } //tagUsers[id] = &user } } baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM dwd_mgo_position") if err == nil && !baseUser.IsEmpty() { log.Println("len===", len(tagUsers), baseUser.Len()) for _, m := range baseUser.List() { baseUserId := gconv.Int64(m["baseUserId"]) if vs, ok := tagUsers[baseUserId]; ok { userTags = append(userTags, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, baseUserId, strings.Join(vs, ","))) delete(tagUsers, baseUserId) } if len(userTags) == 2000 { //插入新表 if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp VALUES %s`, strings.Join(userTags, ","))); err != nil { log.Println(" 新插入 dwd_d_user_tag 表数据异常") return err } userTags = []string{} } } } if len(userTags) > 0 { //插入新表 if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp VALUES %s`, strings.Join(userTags, ","))); err != nil { log.Println(" 新插入 dwd_d_user_tag 表数据异常", err.Error()) return err } } _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag TO dwd_d_user_tag_back;") if err != nil { return err } _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag_tmp TO dwd_d_user_tag;") if err != nil { return err } return err }