package service import ( "app.yhyue.com/moapp/jyMarketing/entity" "app.yhyue.com/moapp/jybase/common" "context" "fmt" "github.com/zeromicro/go-zero/core/logx" "log" "strings" ) const ( logicalOperatorNormal = 0 // 正常运算标签 logicalOperatorNot = 1 // 非运算标签 tagOperatorAnd = 1 // 且 tagOperatorOr = 2 // 或 Tabledwd_d_tag = "pub_tags.dwd_d_tag" // 标签用户表 todo 后边调整 Tabledwd_mgo_position = "pub_tags.dwd_mgo_position" // base_user_id 对应的mgoid FullUserTagSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id=2017` andSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) ` orSql = `SELECT groupBitmapOrState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) ` hasAllSql = ` bitmapHasAll( ddut.bitobj,bitmapBuild([%s])) ` hasAnySql = ` bitmapHasAny( ddut.bitobj,bitmapBuild([%s])) ` notHasAllSql = ` not bitmapHasAll( ddut.bitobj,bitmapBuild([%s])) ` notHasAnySql = ` not bitmapHasAny( ddut.bitobj,bitmapBuild([%s])) ` countUserSql = `SELECT COUNT(1) as count FROM pub_tags.dwd_d_user_tag ddut LEFT JOIN pub_tags.dwd_mgo_position dmp ON (ddut.baseUserId = toUInt64(dmp.baseUserId)) WHERE dmp.mgoUserId = '%s' AND dmp.type= 0 AND (%s)` ) // UserIdConstructor 用户群组标签转换 type UserIdConstructor struct { groupFilter []int64 // 群组过滤条件 userGtFilter int64 // 用户过滤条件 暂停发消息时 用的 userGroupTagList []*UserGroupTag // 用户群组标签列表 (整理后的) baseQuerySQL string // 查询群组下base_user_id 的sql countUserSQL string // 查询用户标签是否符合群组标签的sql } type UserGroupTag struct { GroupId int64 // 群组id TagOperator int64 // 群组内关系 NormalTag []int64 // 正常标签 NotTag []int64 // 非标签 } func NewUserIdConstructor(groupFilter []int64, userGtFilter int64) (u *UserIdConstructor) { u = &UserIdConstructor{ groupFilter: groupFilter, userGtFilter: userGtFilter, userGroupTagList: []*UserGroupTag{}, } return } // GetGroupTags 获取用户群组标签信息 func (u *UserIdConstructor) getGroupTags() *[]map[string]interface{} { groupIdFilter := []string{} groupIdValue := []interface{}{} where := "" for i := 0; i < len(u.groupFilter); i++ { groupIdFilter = append(groupIdFilter, "?") groupIdValue = append(groupIdValue, u.groupFilter[i]) } where = fmt.Sprintf("where ugt.group_id in (%s)", strings.Join(groupIdFilter, ",")) query := fmt.Sprintf(`SELECT ugt.group_id,ug.tag_operator,ugt.tag_id,ugt.logical_operator FROM convertlabsync.user_group_tag ugt left join user_group ug on (ugt.group_id=ug.id) %s`, where) rs := entity.ConvertlabTidb.SelectBySql(query, groupIdValue...) return rs } // InitTagList 处理成方便用的数组 func (u *UserIdConstructor) InitTagList() bool { rs := u.getGroupTags() if rs == nil || len(*rs) == 0 { return false } groupMap := map[int64]*UserGroupTag{} for i := 0; i < len(*rs); i++ { groupId := common.Int64All((*rs)[i]["group_id"]) tagOperator := common.Int64All((*rs)[i]["tag_operator"]) tagId := common.Int64All((*rs)[i]["tag_id"]) logicalOperator := common.IntAll((*rs)[i]["logical_operator"]) if _, ok := groupMap[groupId]; !ok { groupMap[groupId] = &UserGroupTag{ GroupId: groupId, TagOperator: tagOperator, NormalTag: []int64{}, NotTag: []int64{}, } } // 追加 switch logicalOperator { case logicalOperatorNormal: groupMap[groupId].NormalTag = append(groupMap[groupId].NormalTag, tagId) case logicalOperatorNot: groupMap[groupId].NotTag = append(groupMap[groupId].NotTag, tagId) } } for _, v := range groupMap { u.userGroupTagList = append(u.userGroupTagList, v) } return true } // 转换成sql 这需要判断 // '正常标签'这里指不是非运算 // toBaseQuerySQL 转换成查询baseUserId 的sql func (u *UserIdConstructor) toBaseQuerySQL() string { sqlList := []string{} // 包含多个群组的sql for i := 0; i < len(u.userGroupTagList); i++ { // 拼接群组内sql groupTag := u.userGroupTagList[i] normalTagSQL, notTagSQL := "", "" tagSql := "" switch groupTag.TagOperator { case tagOperatorAnd: if len(groupTag.NormalTag) > 0 { // 正常标签 normalTagList := []string{} for j := 0; j < len(groupTag.NormalTag); j++ { normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j])) } normalTagSQL = fmt.Sprintf(andSql, strings.Join(normalTagList, ",")) } if len(groupTag.NotTag) > 0 { // 非标签 notTagList := []string{} for j := 0; j < len(groupTag.NotTag); j++ { notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j])) } notTagSQL = fmt.Sprintf(orSql, strings.Join(notTagList, ",")) } // 同时有: 正常标签 - 非标签 if normalTagSQL != "" && notTagSQL != "" { tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", normalTagSQL, notTagSQL) } else if normalTagSQL != "" { // 只有正常标签 : 正常标签 tagSql = normalTagSQL } else if notTagSQL != "" { // 只有非标签 : 全量标签-非标签 tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL) } case tagOperatorOr: if len(groupTag.NormalTag) > 0 { // 正常标签 normalTagList := []string{} for j := 0; j < len(groupTag.NormalTag); j++ { normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j])) } normalTagSQL = fmt.Sprintf(orSql, strings.Join(normalTagList, ",")) } if len(groupTag.NotTag) > 0 { // 非标签 notTagList := []string{} for j := 0; j < len(groupTag.NotTag); j++ { notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j])) } notTagSQL = fmt.Sprintf(andSql, strings.Join(notTagList, ",")) } // 同时有: 正常标签 ∪ (U-(B∩C∩D....)) U:全量标签 B、C、D... 非标签 if normalTagSQL != "" && notTagSQL != "" { tmpNotTagSql := fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL) tagSql = fmt.Sprintf("SELECT bitmapOr((%s),(%s)) as userIds", FullUserTagSql, tmpNotTagSql) } else if normalTagSQL != "" { // 只有正常标签 tagSql = normalTagSQL } else if notTagSQL != "" { // 只有非标签: U-(B∩C∩D....) U:全量标签 B、C、D... 非标签 tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL) } } sqlList = append(sqlList, tagSql) } // 如果用户有过滤 if u.userGtFilter > 0 { u.baseQuerySQL = fmt.Sprintf("SELECT arrayFilter(x -> x >%v,bitmapToArray( groupBitmapOrState(userIds))) as userIds from (%s)", u.userGtFilter, strings.Join(sqlList, " UNION DISTINCT ")) } else { u.baseQuerySQL = fmt.Sprintf("SELECT bitmapToArray( groupBitmapOrState( userIds)) as userIds from (%s)", strings.Join(sqlList, " UNION DISTINCT ")) } log.Println("baseQuerySQL:", u.baseQuerySQL) return u.baseQuerySQL } // 从数据库查询 func (u *UserIdConstructor) QueryBaseUserIdList() (userList []int64) { if !u.InitTagList() { return []int64{} } rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toBaseQuerySQL()) if err := rows.Scan(&userList); err != nil { log.Println("QueryBaseUserIdList err:", err) return } return userList } // 判断活动群组id 和 用户身上的的标签是否匹配 // 分组之间用 or 连接 // 分组内 // 且: 正常标签: bitmapHasAll and (not bitmapHasAny ) // 或:bitmapHasAny or (not bitmapHasAny()) func (u *UserIdConstructor) toCountUserSQL(userId string) string { sqlList := []string{} // 包含多个群组的sql for i := 0; i < len(u.userGroupTagList); i++ { // 拼接群组内sql groupTag := u.userGroupTagList[i] normalTagSQL, notTagSQL := "", "" tagSql := "" switch groupTag.TagOperator { case tagOperatorAnd: if len(groupTag.NormalTag) > 0 { // 正常标签 normalTagList := []string{} for j := 0; j < len(groupTag.NormalTag); j++ { normalTagList = append(normalTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NormalTag[j])) } normalTagSQL = fmt.Sprintf(hasAllSql, strings.Join(normalTagList, ",")) } if len(groupTag.NotTag) > 0 { // 非标签 notTagList := []string{} for j := 0; j < len(groupTag.NotTag); j++ { notTagList = append(notTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NotTag[j])) } notTagSQL = fmt.Sprintf(notHasAnySql, strings.Join(notTagList, ",")) } // 同时有: 正常标签 and 非标签 if normalTagSQL != "" && notTagSQL != "" { tagSql = fmt.Sprintf("(%s and %s)", normalTagSQL, notTagSQL) } else if normalTagSQL != "" { // 只有正常标签 : 正常标签 tagSql = fmt.Sprintf("(%s)", normalTagSQL) } else if notTagSQL != "" { // 只有非标签 : tagSql = fmt.Sprintf("(%s)", notTagSQL) } case tagOperatorOr: if len(groupTag.NormalTag) > 0 { // 正常标签 normalTagList := []string{} for j := 0; j < len(groupTag.NormalTag); j++ { normalTagList = append(normalTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NormalTag[j])) } normalTagSQL = fmt.Sprintf(hasAnySql, strings.Join(normalTagList, ",")) } if len(groupTag.NotTag) > 0 { // 非标签 notTagList := []string{} for j := 0; j < len(groupTag.NotTag); j++ { notTagList = append(notTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NotTag[j])) } notTagSQL = fmt.Sprintf(notHasAllSql, strings.Join(notTagList, ",")) } // 同时有: 正常标签 or 非标签 if normalTagSQL != "" && notTagSQL != "" { tagSql = fmt.Sprintf(" (%s or %s) ", normalTagSQL, notTagSQL) } else if normalTagSQL != "" { // 只有正常标签 : 正常标签 tagSql = fmt.Sprintf(" (%s) ", normalTagSQL) } else if notTagSQL != "" { // 只有非标签 : tagSql = fmt.Sprintf(" (%s) ", notTagSQL) } } sqlList = append(sqlList, tagSql) } u.countUserSQL = fmt.Sprintf(countUserSql, userId, strings.Join(sqlList, " or ")) logx.Info("toCountUserSQL:", u.countUserSQL) return u.countUserSQL } // 从数据库查询 func (u *UserIdConstructor) CountUser(userId string) (count uint64) { if !u.InitTagList() { logx.Info("InitTagList 异常") return 0 } rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toCountUserSQL(userId)) if err := rows.Scan(&count); err != nil { logx.Info("QueryBaseUserIdList err:", err) return } return count }