|
@@ -0,0 +1,283 @@
|
|
|
|
+package service
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "app.yhyue.com/moapp/jyMarketing/entity"
|
|
|
|
+ "app.yhyue.com/moapp/jybase/common"
|
|
|
|
+ "context"
|
|
|
|
+ "fmt"
|
|
|
|
+ "log"
|
|
|
|
+ "strings"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+const (
|
|
|
|
+ logical_operator_normal = 0 // 正常运算标签
|
|
|
|
+ logical_operator_not = 1 // 非运算标签
|
|
|
|
+ tag_operator_and = 1 // 且
|
|
|
|
+ tag_operator_or = 2 // 或
|
|
|
|
+ Tabledwd_d_tag = "pub_tags.dwd_d_tag" // 标签用户表 todo 后边调整
|
|
|
|
+ Tabledwd_mgo_position = "pub_tags.dwd_mgo_position" // base_user_id 对应的mgoid
|
|
|
|
+ FullUserTagSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id=2017`
|
|
|
|
+ andSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
|
|
|
|
+ orSql = `SELECT groupBitmapOrState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
|
|
|
|
+ hasAllSql = ` bitmapHasAll( ddut.bitobj,bitmapBuild(%s)) `
|
|
|
|
+ hasAnySql = ` bitmapHasAny( ddut.bitobj,bitmapBuild(%s)) `
|
|
|
|
+ notHasAllSql = ` not bitmapHasAll( ddut.bitobj,bitmapBuild(%s)) `
|
|
|
|
+ notHasAnySql = ` not bitmapHasAny( ddut.bitobj,bitmapBuild(%s)) `
|
|
|
|
+ countUserSql = `SELECT
|
|
|
|
+ COUNT(1) as count
|
|
|
|
+FROM
|
|
|
|
+ pub_tags.dwd_d_user_tag ddut
|
|
|
|
+LEFT JOIN pub_tags.dwd_mgo_position dmp ON
|
|
|
|
+ (ddut.baseUserId = toUInt64(dmp.baseUserId))
|
|
|
|
+WHERE
|
|
|
|
+ dmp.mgoUserId = '%s'
|
|
|
|
+ AND dmp.type= 0 AND (%s)`
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+// UserIdConstructor 用户群组标签转换
|
|
|
|
+type UserIdConstructor struct {
|
|
|
|
+ groupFilter []int64 // 群组过滤条件
|
|
|
|
+ userGtFilter int64 // 用户过滤条件 暂停发消息时 用的
|
|
|
|
+ userGroupTagList []*UserGroupTag // 用户群组标签列表 (整理后的)
|
|
|
|
+ baseQuerySQL string // 查询群组下base_user_id 的sql
|
|
|
|
+ countUserSQL string // 查询用户标签是否符合群组标签的sql
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type UserGroupTag struct {
|
|
|
|
+ GroupId int64 // 群组id
|
|
|
|
+ TagOperator int64 // 群组内关系
|
|
|
|
+ NormalTag []int64 // 正常标签
|
|
|
|
+ NotTag []int64 // 非标签
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func NewUserIdConstructor(groupFilter []int64, userGtFilter int64) (u *UserIdConstructor) {
|
|
|
|
+ u = &UserIdConstructor{
|
|
|
|
+ groupFilter: groupFilter,
|
|
|
|
+ userGtFilter: userGtFilter,
|
|
|
|
+ userGroupTagList: []*UserGroupTag{},
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// GetGroupTags 获取用户群组标签信息
|
|
|
|
+func (u *UserIdConstructor) getGroupTags() *[]map[string]interface{} {
|
|
|
|
+ groupIdFilter := []string{}
|
|
|
|
+ groupIdValue := []interface{}{}
|
|
|
|
+ where := ""
|
|
|
|
+ for i := 0; i < len(u.groupFilter); i++ {
|
|
|
|
+ groupIdFilter = append(groupIdFilter, "?")
|
|
|
|
+ groupIdValue = append(groupIdValue, u.groupFilter[i])
|
|
|
|
+ }
|
|
|
|
+ where = fmt.Sprintf("where ugt.group_id in (%s)", strings.Join(groupIdFilter, ","))
|
|
|
|
+ query := fmt.Sprintf(`SELECT ugt.group_id,ug.tag_operator,ugt.tag_id,ugt.logical_operator FROM convertlabsync.user_group_tag ugt left join user_group ug on (ugt.group_id=ug.id) %s`, where)
|
|
|
|
+ rs := entity.BaseMysql.SelectBySql(query, groupIdValue...)
|
|
|
|
+ return rs
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// InitTagList 处理成方便用的数组
|
|
|
|
+func (u *UserIdConstructor) InitTagList() bool {
|
|
|
|
+ rs := u.getGroupTags()
|
|
|
|
+ if rs == nil || len(*rs) == 0 {
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ groupMap := map[int64]*UserGroupTag{}
|
|
|
|
+ for i := 0; i < len(*rs); i++ {
|
|
|
|
+ groupId := common.Int64All((*rs)[i]["group_id"])
|
|
|
|
+ tagOperator := common.Int64All((*rs)[i]["tag_operator"])
|
|
|
|
+ tagId := common.Int64All((*rs)[i]["tag_id"])
|
|
|
|
+ logicalOperator := common.IntAll((*rs)[i]["logical_operator"])
|
|
|
|
+ if _, ok := groupMap[groupId]; !ok {
|
|
|
|
+ groupMap[groupId] = &UserGroupTag{
|
|
|
|
+ GroupId: groupId,
|
|
|
|
+ TagOperator: tagOperator,
|
|
|
|
+ NormalTag: []int64{},
|
|
|
|
+ NotTag: []int64{},
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // 追加
|
|
|
|
+ switch logicalOperator {
|
|
|
|
+ case logical_operator_normal:
|
|
|
|
+ groupMap[groupId].NormalTag = append(groupMap[groupId].NormalTag, tagId)
|
|
|
|
+ case logical_operator_not:
|
|
|
|
+ groupMap[groupId].NotTag = append(groupMap[groupId].NotTag, tagId)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for _, v := range groupMap {
|
|
|
|
+ u.userGroupTagList = append(u.userGroupTagList, v)
|
|
|
|
+ }
|
|
|
|
+ return true
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 转换成sql 这需要判断
|
|
|
|
+// '正常标签'这里指不是非运算
|
|
|
|
+// toBaseQuerySQL 转换成查询baseUserId 的sql
|
|
|
|
+func (u *UserIdConstructor) toBaseQuerySQL() string {
|
|
|
|
+ sqlList := []string{} // 包含多个群组的sql
|
|
|
|
+ for i := 0; i < len(u.userGroupTagList); i++ {
|
|
|
|
+ // 拼接群组内sql
|
|
|
|
+ groupTag := u.userGroupTagList[i]
|
|
|
|
+ normalTagSQL, notTagSQL := "", ""
|
|
|
|
+ tagSql := ""
|
|
|
|
+ switch groupTag.TagOperator {
|
|
|
|
+ case tag_operator_and:
|
|
|
|
+ if len(groupTag.NormalTag) > 0 { // 正常标签
|
|
|
|
+ normalTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NormalTag); j++ {
|
|
|
|
+ normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
|
|
|
|
+ }
|
|
|
|
+ normalTagSQL = fmt.Sprintf(andSql, strings.Join(normalTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ if len(groupTag.NotTag) > 0 { // 非标签
|
|
|
|
+ notTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NotTag); j++ {
|
|
|
|
+ notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
|
|
|
|
+ }
|
|
|
|
+ notTagSQL = fmt.Sprintf(orSql, strings.Join(notTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ // 同时有: 正常标签 - 非标签
|
|
|
|
+ if normalTagSQL != "" && notTagSQL != "" {
|
|
|
|
+ tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", normalTagSQL, notTagSQL)
|
|
|
|
+ } else if normalTagSQL != "" {
|
|
|
|
+ // 只有正常标签 : 正常标签
|
|
|
|
+ tagSql = normalTagSQL
|
|
|
|
+ } else if notTagSQL != "" {
|
|
|
|
+ // 只有非标签 : 全量标签-非标签
|
|
|
|
+ tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ case tag_operator_or:
|
|
|
|
+ if len(groupTag.NormalTag) > 0 { // 正常标签
|
|
|
|
+ normalTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NormalTag); j++ {
|
|
|
|
+ normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
|
|
|
|
+ }
|
|
|
|
+ normalTagSQL = fmt.Sprintf(orSql, strings.Join(normalTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ if len(groupTag.NotTag) > 0 { // 非标签
|
|
|
|
+ notTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NotTag); j++ {
|
|
|
|
+ notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
|
|
|
|
+ }
|
|
|
|
+ notTagSQL = fmt.Sprintf(andSql, strings.Join(notTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ // 同时有: 正常标签 ∪ (U-(B∩C∩D....)) U:全量标签 B、C、D... 非标签
|
|
|
|
+ if normalTagSQL != "" && notTagSQL != "" {
|
|
|
|
+ tmpNotTagSql := fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
|
|
|
|
+ tagSql = fmt.Sprintf("SELECT bitmapOr((%s),(%s)) as userIds", FullUserTagSql, tmpNotTagSql)
|
|
|
|
+ } else if normalTagSQL != "" {
|
|
|
|
+ // 只有正常标签
|
|
|
|
+ tagSql = normalTagSQL
|
|
|
|
+ } else if notTagSQL != "" {
|
|
|
|
+ // 只有非标签: U-(B∩C∩D....) U:全量标签 B、C、D... 非标签
|
|
|
|
+ tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ sqlList = append(sqlList, tagSql)
|
|
|
|
+ }
|
|
|
|
+ // 如果用户有过滤
|
|
|
|
+ if u.userGtFilter > 0 {
|
|
|
|
+ u.baseQuerySQL = fmt.Sprintf("SELECT arrayFilter(x -> x >%v,bitmapToArray( groupBitmapOrState(userIds))) as userIds from (%s)", u.userGtFilter, strings.Join(sqlList, " UNION DISTINCT "))
|
|
|
|
+ } else {
|
|
|
|
+ u.baseQuerySQL = fmt.Sprintf("SELECT bitmapToArray( groupBitmapOrState( userIds)) as userIds from (%s)", strings.Join(sqlList, " UNION DISTINCT "))
|
|
|
|
+ }
|
|
|
|
+ fmt.Println("baseQuerySQL:", u.baseQuerySQL)
|
|
|
|
+ return u.baseQuerySQL
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 从数据库查询
|
|
|
|
+func (u *UserIdConstructor) QueryBaseUserIdList() (userList []int64) {
|
|
|
|
+ if !u.InitTagList() {
|
|
|
|
+ return []int64{}
|
|
|
|
+ }
|
|
|
|
+ rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toBaseQuerySQL())
|
|
|
|
+ if err := rows.Scan(&userList); err != nil {
|
|
|
|
+ log.Println("QueryBaseUserIdList err:", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ return userList
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 判断活动群组id 和 用户身上的的标签是否匹配
|
|
|
|
+// 分组之间用 or 连接
|
|
|
|
+// 分组内
|
|
|
|
+// 且: 正常标签: bitmapHasAll and (not bitmapHasAny )
|
|
|
|
+// 或:bitmapHasAny or (not bitmapHasAny())
|
|
|
|
+func (u *UserIdConstructor) toCountUserSQL(userId string) string {
|
|
|
|
+ sqlList := []string{} // 包含多个群组的sql
|
|
|
|
+ for i := 0; i < len(u.userGroupTagList); i++ {
|
|
|
|
+ // 拼接群组内sql
|
|
|
|
+ groupTag := u.userGroupTagList[i]
|
|
|
|
+ normalTagSQL, notTagSQL := "", ""
|
|
|
|
+ tagSql := ""
|
|
|
|
+ switch groupTag.TagOperator {
|
|
|
|
+ case tag_operator_and:
|
|
|
|
+ if len(groupTag.NormalTag) > 0 { // 正常标签
|
|
|
|
+ normalTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NormalTag); j++ {
|
|
|
|
+ normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
|
|
|
|
+ }
|
|
|
|
+ normalTagSQL = fmt.Sprintf(hasAllSql, strings.Join(normalTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ if len(groupTag.NotTag) > 0 { // 非标签
|
|
|
|
+ notTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NotTag); j++ {
|
|
|
|
+ notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
|
|
|
|
+ }
|
|
|
|
+ notTagSQL = fmt.Sprintf(notHasAnySql, strings.Join(notTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ // 同时有: 正常标签 and 非标签
|
|
|
|
+ if normalTagSQL != "" && notTagSQL != "" {
|
|
|
|
+ tagSql = fmt.Sprintf("(%s and %s)", normalTagSQL, notTagSQL)
|
|
|
|
+ } else if normalTagSQL != "" {
|
|
|
|
+ // 只有正常标签 : 正常标签
|
|
|
|
+ tagSql = fmt.Sprintf("(%s)", normalTagSQL)
|
|
|
|
+ } else if notTagSQL != "" {
|
|
|
|
+ // 只有非标签 :
|
|
|
|
+ tagSql = fmt.Sprintf("(%s)", notTagSQL)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ case tag_operator_or:
|
|
|
|
+ if len(groupTag.NormalTag) > 0 { // 正常标签
|
|
|
|
+ normalTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NormalTag); j++ {
|
|
|
|
+ normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
|
|
|
|
+ }
|
|
|
|
+ normalTagSQL = fmt.Sprintf(hasAnySql, strings.Join(normalTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ if len(groupTag.NotTag) > 0 { // 非标签
|
|
|
|
+ notTagList := []string{}
|
|
|
|
+ for j := 0; j < len(groupTag.NotTag); j++ {
|
|
|
|
+ notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
|
|
|
|
+ }
|
|
|
|
+ notTagSQL = fmt.Sprintf(notHasAllSql, strings.Join(notTagList, ","))
|
|
|
|
+ }
|
|
|
|
+ // 同时有: 正常标签 or 非标签
|
|
|
|
+ if normalTagSQL != "" && notTagSQL != "" {
|
|
|
|
+ tagSql = fmt.Sprintf(" (%s or %s) ", normalTagSQL, notTagSQL)
|
|
|
|
+ } else if normalTagSQL != "" {
|
|
|
|
+ // 只有正常标签 : 正常标签
|
|
|
|
+ tagSql = fmt.Sprintf(" (%s) ", normalTagSQL)
|
|
|
|
+ } else if notTagSQL != "" {
|
|
|
|
+ // 只有非标签 :
|
|
|
|
+ tagSql = fmt.Sprintf(" (%s) ", notTagSQL)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ sqlList = append(sqlList, tagSql)
|
|
|
|
+ }
|
|
|
|
+ u.countUserSQL = fmt.Sprintf(countUserSql, userId, strings.Join(sqlList, " or "))
|
|
|
|
+ fmt.Println("baseQuerySQL:", u.baseQuerySQL)
|
|
|
|
+ return u.countUserSQL
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 从数据库查询
|
|
|
|
+func (u *UserIdConstructor) CountUser(userId string) (count int64) {
|
|
|
|
+ if !u.InitTagList() {
|
|
|
|
+ return 0
|
|
|
|
+ }
|
|
|
|
+ rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toCountUserSQL(userId))
|
|
|
|
+ if err := rows.Scan(&count); err != nil {
|
|
|
|
+ log.Println("QueryBaseUserIdList err:", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ return count
|
|
|
|
+}
|