123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- package userTag
- import (
- "context"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/util/gconv"
- "log"
- "strings"
- )
- var (
- ctx = context.Background()
- )
- func BaseUserTag() error {
- defer func() {
- //清理备份数据
- g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_back;`)
- g.DB().Exec(ctx, `DROP TABLE dwd_d_user_tag_tmp;`)
- }()
- if _, err := g.DB().Exec(ctx, fmt.Sprintf(`CREATE TABLE pub_tags.dwd_d_user_tag_tmp
- (
- baseUserId UInt64,
- bitobj AggregateFunction(groupBitmap,UInt64)
- )
- ENGINE = AggregatingMergeTree
- ORDER BY baseUserId
- SETTINGS index_granularity = 8192;`)); err != nil {
- g.Log().Errorf(ctx, "创建表异常%v", err)
- return err
- } //创建表数据
- //统计插入user数据
- var userTags []string
- tagUsers := make(map[int64][]string)
- tagUser, _ := g.DB().Query(ctx, "SELECT id,bitmapToArray(bitobj) as bitobj FROM dwd_d_tag")
- if tagUser != nil {
- for _, re := range tagUser.List() {
- id := gconv.Int64(re["id"])
- //user := make(map[int64]bool)
- for _, value := range gconv.Int64s(re["bitobj"]) {
- //user[value] = true
- tagUsers[value] = append(tagUsers[value], fmt.Sprintf(`toUInt64(%d)`, id))
- }
- //tagUsers[id] = &user
- }
- }
- baseUser, err := g.DB().Query(ctx, "SELECT DISTINCT(baseUserId) as baseUserId FROM dwd_mgo_position")
- if err == nil && !baseUser.IsEmpty() {
- log.Println("len===", len(tagUsers), baseUser.Len())
- for _, m := range baseUser.List() {
- baseUserId := gconv.Int64(m["baseUserId"])
- if vs, ok := tagUsers[baseUserId]; ok {
- userTags = append(userTags, fmt.Sprintf(`(%d,bitmapBuild([%s]))`, baseUserId, strings.Join(vs, ",")))
- delete(tagUsers, baseUserId)
- }
- if len(userTags) == 2000 { //插入新表
- if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
- VALUES %s`, strings.Join(userTags, ","))); err != nil {
- log.Println(" 新插入 dwd_d_user_tag 表数据异常")
- return err
- }
- userTags = []string{}
- }
- }
- }
- if len(userTags) > 0 { //插入新表
- if _, err = g.DB().Exec(ctx, fmt.Sprintf(`INSERT INTO dwd_d_user_tag_tmp
- VALUES %s`, strings.Join(userTags, ","))); err != nil {
- log.Println(" 新插入 dwd_d_user_tag 表数据异常", err.Error())
- return err
- }
- }
- _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag TO dwd_d_user_tag_back;")
- if err != nil {
- return err
- }
- _, err = g.DB().Exec(ctx, "RENAME TABLE dwd_d_user_tag_tmp TO dwd_d_user_tag;")
- if err != nil {
- return err
- }
- return err
- }
|