userGroupService.go 11 KB


  1. package service
  2. import (
  3. "app.yhyue.com/moapp/jyMarketing/entity"
  4. "app.yhyue.com/moapp/jybase/common"
  5. "context"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "log"
  9. "strings"
  10. )
  11. const (
  12. logicalOperatorNormal = 0 // 正常运算标签
  13. logicalOperatorNot = 1 // 非运算标签
  14. tagOperatorAnd = 1 // 且
  15. tagOperatorOr = 2 // 或
  16. Tabledwd_d_tag = "pub_tags.dwd_d_tag" // 标签用户表 todo 后边调整
  17. Tabledwd_mgo_position = "pub_tags.dwd_mgo_position" // base_user_id 对应的mgoid
  18. FullUserTagSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id=2017`
  19. andSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
  20. orSql = `SELECT groupBitmapOrState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
  21. hasAllSql = ` bitmapHasAll( ddut.bitobj,bitmapBuild([%s])) `
  22. hasAnySql = ` bitmapHasAny( ddut.bitobj,bitmapBuild([%s])) `
  23. notHasAllSql = ` not bitmapHasAll( ddut.bitobj,bitmapBuild([%s])) `
  24. notHasAnySql = ` not bitmapHasAny( ddut.bitobj,bitmapBuild([%s])) `
  25. countUserSql = `SELECT
  26. COUNT(1) as count
  27. FROM
  28. pub_tags.dwd_d_user_tag ddut
  29. LEFT JOIN pub_tags.dwd_mgo_position dmp ON
  30. (ddut.baseUserId = toUInt64(dmp.baseUserId))
  31. WHERE
  32. dmp.mgoUserId = '%s'
  33. AND dmp.type= 0 AND (%s)`
  34. )
  35. // UserIdConstructor 用户群组标签转换
  36. type UserIdConstructor struct {
  37. groupFilter []int64 // 群组过滤条件
  38. userGtFilter int64 // 用户过滤条件 暂停发消息时 用的
  39. userGroupTagList []*UserGroupTag // 用户群组标签列表 (整理后的)
  40. baseQuerySQL string // 查询群组下base_user_id 的sql
  41. countUserSQL string // 查询用户标签是否符合群组标签的sql
  42. }
  43. type UserGroupTag struct {
  44. GroupId int64 // 群组id
  45. TagOperator int64 // 群组内关系
  46. NormalTag []int64 // 正常标签
  47. NotTag []int64 // 非标签
  48. }
  49. func NewUserIdConstructor(groupFilter []int64, userGtFilter int64) (u *UserIdConstructor) {
  50. u = &UserIdConstructor{
  51. groupFilter: groupFilter,
  52. userGtFilter: userGtFilter,
  53. userGroupTagList: []*UserGroupTag{},
  54. }
  55. return
  56. }
  57. // GetGroupTags 获取用户群组标签信息
  58. func (u *UserIdConstructor) getGroupTags() *[]map[string]interface{} {
  59. groupIdFilter := []string{}
  60. groupIdValue := []interface{}{}
  61. where := ""
  62. for i := 0; i < len(u.groupFilter); i++ {
  63. groupIdFilter = append(groupIdFilter, "?")
  64. groupIdValue = append(groupIdValue, u.groupFilter[i])
  65. }
  66. where = fmt.Sprintf("where ugt.group_id in (%s)", strings.Join(groupIdFilter, ","))
  67. query := fmt.Sprintf(`SELECT ugt.group_id,ug.tag_operator,ugt.tag_id,ugt.logical_operator FROM convertlabsync.user_group_tag ugt left join user_group ug on (ugt.group_id=ug.id) %s`, where)
  68. rs := entity.ConvertlabTidb.SelectBySql(query, groupIdValue...)
  69. return rs
  70. }
  71. // InitTagList 处理成方便用的数组
  72. func (u *UserIdConstructor) InitTagList() bool {
  73. rs := u.getGroupTags()
  74. if rs == nil || len(*rs) == 0 {
  75. return false
  76. }
  77. groupMap := map[int64]*UserGroupTag{}
  78. for i := 0; i < len(*rs); i++ {
  79. groupId := common.Int64All((*rs)[i]["group_id"])
  80. tagOperator := common.Int64All((*rs)[i]["tag_operator"])
  81. tagId := common.Int64All((*rs)[i]["tag_id"])
  82. logicalOperator := common.IntAll((*rs)[i]["logical_operator"])
  83. if _, ok := groupMap[groupId]; !ok {
  84. groupMap[groupId] = &UserGroupTag{
  85. GroupId: groupId,
  86. TagOperator: tagOperator,
  87. NormalTag: []int64{},
  88. NotTag: []int64{},
  89. }
  90. }
  91. // 追加
  92. switch logicalOperator {
  93. case logicalOperatorNormal:
  94. groupMap[groupId].NormalTag = append(groupMap[groupId].NormalTag, tagId)
  95. case logicalOperatorNot:
  96. groupMap[groupId].NotTag = append(groupMap[groupId].NotTag, tagId)
  97. }
  98. }
  99. for _, v := range groupMap {
  100. u.userGroupTagList = append(u.userGroupTagList, v)
  101. }
  102. return true
  103. }
  104. // 转换成sql 这需要判断
  105. // '正常标签'这里指不是非运算
  106. // toBaseQuerySQL 转换成查询baseUserId 的sql
  107. func (u *UserIdConstructor) toBaseQuerySQL() string {
  108. sqlList := []string{} // 包含多个群组的sql
  109. for i := 0; i < len(u.userGroupTagList); i++ {
  110. // 拼接群组内sql
  111. groupTag := u.userGroupTagList[i]
  112. normalTagSQL, notTagSQL := "", ""
  113. tagSql := ""
  114. switch groupTag.TagOperator {
  115. case tagOperatorAnd:
  116. if len(groupTag.NormalTag) > 0 { // 正常标签
  117. normalTagList := []string{}
  118. for j := 0; j < len(groupTag.NormalTag); j++ {
  119. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  120. }
  121. normalTagSQL = fmt.Sprintf(andSql, strings.Join(normalTagList, ","))
  122. }
  123. if len(groupTag.NotTag) > 0 { // 非标签
  124. notTagList := []string{}
  125. for j := 0; j < len(groupTag.NotTag); j++ {
  126. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  127. }
  128. notTagSQL = fmt.Sprintf(orSql, strings.Join(notTagList, ","))
  129. }
  130. // 同时有: 正常标签 - 非标签
  131. if normalTagSQL != "" && notTagSQL != "" {
  132. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", normalTagSQL, notTagSQL)
  133. } else if normalTagSQL != "" {
  134. // 只有正常标签 : 正常标签
  135. tagSql = normalTagSQL
  136. } else if notTagSQL != "" {
  137. // 只有非标签 : 全量标签-非标签
  138. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  139. }
  140. case tagOperatorOr:
  141. if len(groupTag.NormalTag) > 0 { // 正常标签
  142. normalTagList := []string{}
  143. for j := 0; j < len(groupTag.NormalTag); j++ {
  144. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  145. }
  146. normalTagSQL = fmt.Sprintf(orSql, strings.Join(normalTagList, ","))
  147. }
  148. if len(groupTag.NotTag) > 0 { // 非标签
  149. notTagList := []string{}
  150. for j := 0; j < len(groupTag.NotTag); j++ {
  151. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  152. }
  153. notTagSQL = fmt.Sprintf(andSql, strings.Join(notTagList, ","))
  154. }
  155. // 同时有: 正常标签 ∪ (U-(B∩C∩D....)) U:全量标签 B、C、D... 非标签
  156. if normalTagSQL != "" && notTagSQL != "" {
  157. tmpNotTagSql := fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  158. tagSql = fmt.Sprintf("SELECT bitmapOr((%s),(%s)) as userIds", FullUserTagSql, tmpNotTagSql)
  159. } else if normalTagSQL != "" {
  160. // 只有正常标签
  161. tagSql = normalTagSQL
  162. } else if notTagSQL != "" {
  163. // 只有非标签: U-(B∩C∩D....) U:全量标签 B、C、D... 非标签
  164. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  165. }
  166. }
  167. sqlList = append(sqlList, tagSql)
  168. }
  169. // 如果用户有过滤
  170. if u.userGtFilter > 0 {
  171. u.baseQuerySQL = fmt.Sprintf("SELECT arrayFilter(x -> x >%v,bitmapToArray( groupBitmapOrState(userIds))) as userIds from (%s)", u.userGtFilter, strings.Join(sqlList, " UNION DISTINCT "))
  172. } else {
  173. u.baseQuerySQL = fmt.Sprintf("SELECT bitmapToArray( groupBitmapOrState( userIds)) as userIds from (%s)", strings.Join(sqlList, " UNION DISTINCT "))
  174. }
  175. log.Println("baseQuerySQL:", u.baseQuerySQL)
  176. return u.baseQuerySQL
  177. }
  178. // 从数据库查询
  179. func (u *UserIdConstructor) QueryBaseUserIdList() (userList []int64) {
  180. if !u.InitTagList() {
  181. return []int64{}
  182. }
  183. rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toBaseQuerySQL())
  184. if err := rows.Scan(&userList); err != nil {
  185. log.Println("QueryBaseUserIdList err:", err)
  186. return
  187. }
  188. return userList
  189. }
  190. // 判断活动群组id 和 用户身上的的标签是否匹配
  191. // 分组之间用 or 连接
  192. // 分组内
  193. // 且: 正常标签: bitmapHasAll and (not bitmapHasAny )
  194. // 或:bitmapHasAny or (not bitmapHasAny())
  195. func (u *UserIdConstructor) toCountUserSQL(userId string) string {
  196. sqlList := []string{} // 包含多个群组的sql
  197. for i := 0; i < len(u.userGroupTagList); i++ {
  198. // 拼接群组内sql
  199. groupTag := u.userGroupTagList[i]
  200. normalTagSQL, notTagSQL := "", ""
  201. tagSql := ""
  202. switch groupTag.TagOperator {
  203. case tagOperatorAnd:
  204. if len(groupTag.NormalTag) > 0 { // 正常标签
  205. normalTagList := []string{}
  206. for j := 0; j < len(groupTag.NormalTag); j++ {
  207. normalTagList = append(normalTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NormalTag[j]))
  208. }
  209. normalTagSQL = fmt.Sprintf(hasAllSql, strings.Join(normalTagList, ","))
  210. }
  211. if len(groupTag.NotTag) > 0 { // 非标签
  212. notTagList := []string{}
  213. for j := 0; j < len(groupTag.NotTag); j++ {
  214. notTagList = append(notTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NotTag[j]))
  215. }
  216. notTagSQL = fmt.Sprintf(notHasAnySql, strings.Join(notTagList, ","))
  217. }
  218. // 同时有: 正常标签 and 非标签
  219. if normalTagSQL != "" && notTagSQL != "" {
  220. tagSql = fmt.Sprintf("(%s and %s)", normalTagSQL, notTagSQL)
  221. } else if normalTagSQL != "" {
  222. // 只有正常标签 : 正常标签
  223. tagSql = fmt.Sprintf("(%s)", normalTagSQL)
  224. } else if notTagSQL != "" {
  225. // 只有非标签 :
  226. tagSql = fmt.Sprintf("(%s)", notTagSQL)
  227. }
  228. case tagOperatorOr:
  229. if len(groupTag.NormalTag) > 0 { // 正常标签
  230. normalTagList := []string{}
  231. for j := 0; j < len(groupTag.NormalTag); j++ {
  232. normalTagList = append(normalTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NormalTag[j]))
  233. }
  234. normalTagSQL = fmt.Sprintf(hasAnySql, strings.Join(normalTagList, ","))
  235. }
  236. if len(groupTag.NotTag) > 0 { // 非标签
  237. notTagList := []string{}
  238. for j := 0; j < len(groupTag.NotTag); j++ {
  239. notTagList = append(notTagList, fmt.Sprintf("toUInt64(%v)", groupTag.NotTag[j]))
  240. }
  241. notTagSQL = fmt.Sprintf(notHasAllSql, strings.Join(notTagList, ","))
  242. }
  243. // 同时有: 正常标签 or 非标签
  244. if normalTagSQL != "" && notTagSQL != "" {
  245. tagSql = fmt.Sprintf(" (%s or %s) ", normalTagSQL, notTagSQL)
  246. } else if normalTagSQL != "" {
  247. // 只有正常标签 : 正常标签
  248. tagSql = fmt.Sprintf(" (%s) ", normalTagSQL)
  249. } else if notTagSQL != "" {
  250. // 只有非标签 :
  251. tagSql = fmt.Sprintf(" (%s) ", notTagSQL)
  252. }
  253. }
  254. sqlList = append(sqlList, tagSql)
  255. }
  256. u.countUserSQL = fmt.Sprintf(countUserSql, userId, strings.Join(sqlList, " or "))
  257. logx.Info("toCountUserSQL:", u.countUserSQL)
  258. return u.countUserSQL
  259. }
  260. // 从数据库查询
  261. func (u *UserIdConstructor) CountUser(userId string) (count uint64) {
  262. if !u.InitTagList() {
  263. logx.Info("InitTagList 异常")
  264. return 0
  265. }
  266. rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toCountUserSQL(userId))
  267. if err := rows.Scan(&count); err != nil {
  268. logx.Info("QueryBaseUserIdList err:", err)
  269. return
  270. }
  271. return count
  272. }