userGroupService.go 10 KB


  1. package service
  2. import (
  3. "app.yhyue.com/moapp/jyMarketing/entity"
  4. "app.yhyue.com/moapp/jybase/common"
  5. "context"
  6. "fmt"
  7. "log"
  8. "strings"
  9. )
  10. const (
  11. logicalOperatorNormal = 0 // 正常运算标签
  12. logicalOperatorNot = 1 // 非运算标签
  13. tagOperatorAnd = 1 // 且
  14. tagOperatorOr = 2 // 或
  15. Tabledwd_d_tag = "pub_tags.dwd_d_tag" // 标签用户表 todo 后边调整
  16. Tabledwd_mgo_position = "pub_tags.dwd_mgo_position" // base_user_id 对应的mgoid
  17. FullUserTagSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id=2017`
  18. andSql = `SELECT groupBitmapAndState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
  19. orSql = `SELECT groupBitmapOrState(bitobj) as userIds from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) `
  20. hasAllSql = ` bitmapHasAll( ddut.bitobj,bitmapBuild(%s)) `
  21. hasAnySql = ` bitmapHasAny( ddut.bitobj,bitmapBuild(%s)) `
  22. notHasAllSql = ` not bitmapHasAll( ddut.bitobj,bitmapBuild(%s)) `
  23. notHasAnySql = ` not bitmapHasAny( ddut.bitobj,bitmapBuild(%s)) `
  24. countUserSql = `SELECT
  25. COUNT(1) as count
  26. FROM
  27. pub_tags.dwd_d_user_tag ddut
  28. LEFT JOIN pub_tags.dwd_mgo_position dmp ON
  29. (ddut.baseUserId = toUInt64(dmp.baseUserId))
  30. WHERE
  31. dmp.mgoUserId = '%s'
  32. AND dmp.type= 0 AND (%s)`
  33. )
  34. // UserIdConstructor 用户群组标签转换
  35. type UserIdConstructor struct {
  36. groupFilter []int64 // 群组过滤条件
  37. userGtFilter int64 // 用户过滤条件 暂停发消息时 用的
  38. userGroupTagList []*UserGroupTag // 用户群组标签列表 (整理后的)
  39. baseQuerySQL string // 查询群组下base_user_id 的sql
  40. countUserSQL string // 查询用户标签是否符合群组标签的sql
  41. }
  42. type UserGroupTag struct {
  43. GroupId int64 // 群组id
  44. TagOperator int64 // 群组内关系
  45. NormalTag []int64 // 正常标签
  46. NotTag []int64 // 非标签
  47. }
  48. func NewUserIdConstructor(groupFilter []int64, userGtFilter int64) (u *UserIdConstructor) {
  49. u = &UserIdConstructor{
  50. groupFilter: groupFilter,
  51. userGtFilter: userGtFilter,
  52. userGroupTagList: []*UserGroupTag{},
  53. }
  54. return
  55. }
  56. // GetGroupTags 获取用户群组标签信息
  57. func (u *UserIdConstructor) getGroupTags() *[]map[string]interface{} {
  58. groupIdFilter := []string{}
  59. groupIdValue := []interface{}{}
  60. where := ""
  61. for i := 0; i < len(u.groupFilter); i++ {
  62. groupIdFilter = append(groupIdFilter, "?")
  63. groupIdValue = append(groupIdValue, u.groupFilter[i])
  64. }
  65. where = fmt.Sprintf("where ugt.group_id in (%s)", strings.Join(groupIdFilter, ","))
  66. query := fmt.Sprintf(`SELECT ugt.group_id,ug.tag_operator,ugt.tag_id,ugt.logical_operator FROM convertlabsync.user_group_tag ugt left join user_group ug on (ugt.group_id=ug.id) %s`, where)
  67. rs := entity.ConvertlabTidb.SelectBySql(query, groupIdValue...)
  68. return rs
  69. }
  70. // InitTagList 处理成方便用的数组
  71. func (u *UserIdConstructor) InitTagList() bool {
  72. rs := u.getGroupTags()
  73. if rs == nil || len(*rs) == 0 {
  74. return false
  75. }
  76. groupMap := map[int64]*UserGroupTag{}
  77. for i := 0; i < len(*rs); i++ {
  78. groupId := common.Int64All((*rs)[i]["group_id"])
  79. tagOperator := common.Int64All((*rs)[i]["tag_operator"])
  80. tagId := common.Int64All((*rs)[i]["tag_id"])
  81. logicalOperator := common.IntAll((*rs)[i]["logical_operator"])
  82. if _, ok := groupMap[groupId]; !ok {
  83. groupMap[groupId] = &UserGroupTag{
  84. GroupId: groupId,
  85. TagOperator: tagOperator,
  86. NormalTag: []int64{},
  87. NotTag: []int64{},
  88. }
  89. }
  90. // 追加
  91. switch logicalOperator {
  92. case logicalOperatorNormal:
  93. groupMap[groupId].NormalTag = append(groupMap[groupId].NormalTag, tagId)
  94. case logicalOperatorNot:
  95. groupMap[groupId].NotTag = append(groupMap[groupId].NotTag, tagId)
  96. }
  97. }
  98. for _, v := range groupMap {
  99. u.userGroupTagList = append(u.userGroupTagList, v)
  100. }
  101. return true
  102. }
  103. // 转换成sql 这需要判断
  104. // '正常标签'这里指不是非运算
  105. // toBaseQuerySQL 转换成查询baseUserId 的sql
  106. func (u *UserIdConstructor) toBaseQuerySQL() string {
  107. sqlList := []string{} // 包含多个群组的sql
  108. for i := 0; i < len(u.userGroupTagList); i++ {
  109. // 拼接群组内sql
  110. groupTag := u.userGroupTagList[i]
  111. normalTagSQL, notTagSQL := "", ""
  112. tagSql := ""
  113. switch groupTag.TagOperator {
  114. case tagOperatorAnd:
  115. if len(groupTag.NormalTag) > 0 { // 正常标签
  116. normalTagList := []string{}
  117. for j := 0; j < len(groupTag.NormalTag); j++ {
  118. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  119. }
  120. normalTagSQL = fmt.Sprintf(andSql, strings.Join(normalTagList, ","))
  121. }
  122. if len(groupTag.NotTag) > 0 { // 非标签
  123. notTagList := []string{}
  124. for j := 0; j < len(groupTag.NotTag); j++ {
  125. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  126. }
  127. notTagSQL = fmt.Sprintf(orSql, strings.Join(notTagList, ","))
  128. }
  129. // 同时有: 正常标签 - 非标签
  130. if normalTagSQL != "" && notTagSQL != "" {
  131. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", normalTagSQL, notTagSQL)
  132. } else if normalTagSQL != "" {
  133. // 只有正常标签 : 正常标签
  134. tagSql = normalTagSQL
  135. } else if notTagSQL != "" {
  136. // 只有非标签 : 全量标签-非标签
  137. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  138. }
  139. case tagOperatorOr:
  140. if len(groupTag.NormalTag) > 0 { // 正常标签
  141. normalTagList := []string{}
  142. for j := 0; j < len(groupTag.NormalTag); j++ {
  143. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  144. }
  145. normalTagSQL = fmt.Sprintf(orSql, strings.Join(normalTagList, ","))
  146. }
  147. if len(groupTag.NotTag) > 0 { // 非标签
  148. notTagList := []string{}
  149. for j := 0; j < len(groupTag.NotTag); j++ {
  150. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  151. }
  152. notTagSQL = fmt.Sprintf(andSql, strings.Join(notTagList, ","))
  153. }
  154. // 同时有: 正常标签 ∪ (U-(B∩C∩D....)) U:全量标签 B、C、D... 非标签
  155. if normalTagSQL != "" && notTagSQL != "" {
  156. tmpNotTagSql := fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  157. tagSql = fmt.Sprintf("SELECT bitmapOr((%s),(%s)) as userIds", FullUserTagSql, tmpNotTagSql)
  158. } else if normalTagSQL != "" {
  159. // 只有正常标签
  160. tagSql = normalTagSQL
  161. } else if notTagSQL != "" {
  162. // 只有非标签: U-(B∩C∩D....) U:全量标签 B、C、D... 非标签
  163. tagSql = fmt.Sprintf("SELECT bitmapAndnot((%s),(%s)) as userIds", FullUserTagSql, notTagSQL)
  164. }
  165. }
  166. sqlList = append(sqlList, tagSql)
  167. }
  168. // 如果用户有过滤
  169. if u.userGtFilter > 0 {
  170. u.baseQuerySQL = fmt.Sprintf("SELECT arrayFilter(x -> x >%v,bitmapToArray( groupBitmapOrState(userIds))) as userIds from (%s)", u.userGtFilter, strings.Join(sqlList, " UNION DISTINCT "))
  171. } else {
  172. u.baseQuerySQL = fmt.Sprintf("SELECT bitmapToArray( groupBitmapOrState( userIds)) as userIds from (%s)", strings.Join(sqlList, " UNION DISTINCT "))
  173. }
  174. fmt.Println("baseQuerySQL:", u.baseQuerySQL)
  175. return u.baseQuerySQL
  176. }
  177. // 从数据库查询
  178. func (u *UserIdConstructor) QueryBaseUserIdList() (userList []int64) {
  179. if !u.InitTagList() {
  180. return []int64{}
  181. }
  182. rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toBaseQuerySQL())
  183. if err := rows.Scan(&userList); err != nil {
  184. log.Println("QueryBaseUserIdList err:", err)
  185. return
  186. }
  187. return userList
  188. }
  189. // 判断活动群组id 和 用户身上的的标签是否匹配
  190. // 分组之间用 or 连接
  191. // 分组内
  192. // 且: 正常标签: bitmapHasAll and (not bitmapHasAny )
  193. // 或:bitmapHasAny or (not bitmapHasAny())
  194. func (u *UserIdConstructor) toCountUserSQL(userId string) string {
  195. sqlList := []string{} // 包含多个群组的sql
  196. for i := 0; i < len(u.userGroupTagList); i++ {
  197. // 拼接群组内sql
  198. groupTag := u.userGroupTagList[i]
  199. normalTagSQL, notTagSQL := "", ""
  200. tagSql := ""
  201. switch groupTag.TagOperator {
  202. case tagOperatorAnd:
  203. if len(groupTag.NormalTag) > 0 { // 正常标签
  204. normalTagList := []string{}
  205. for j := 0; j < len(groupTag.NormalTag); j++ {
  206. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  207. }
  208. normalTagSQL = fmt.Sprintf(hasAllSql, strings.Join(normalTagList, ","))
  209. }
  210. if len(groupTag.NotTag) > 0 { // 非标签
  211. notTagList := []string{}
  212. for j := 0; j < len(groupTag.NotTag); j++ {
  213. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  214. }
  215. notTagSQL = fmt.Sprintf(notHasAnySql, strings.Join(notTagList, ","))
  216. }
  217. // 同时有: 正常标签 and 非标签
  218. if normalTagSQL != "" && notTagSQL != "" {
  219. tagSql = fmt.Sprintf("(%s and %s)", normalTagSQL, notTagSQL)
  220. } else if normalTagSQL != "" {
  221. // 只有正常标签 : 正常标签
  222. tagSql = fmt.Sprintf("(%s)", normalTagSQL)
  223. } else if notTagSQL != "" {
  224. // 只有非标签 :
  225. tagSql = fmt.Sprintf("(%s)", notTagSQL)
  226. }
  227. case tagOperatorOr:
  228. if len(groupTag.NormalTag) > 0 { // 正常标签
  229. normalTagList := []string{}
  230. for j := 0; j < len(groupTag.NormalTag); j++ {
  231. normalTagList = append(normalTagList, fmt.Sprint(groupTag.NormalTag[j]))
  232. }
  233. normalTagSQL = fmt.Sprintf(hasAnySql, strings.Join(normalTagList, ","))
  234. }
  235. if len(groupTag.NotTag) > 0 { // 非标签
  236. notTagList := []string{}
  237. for j := 0; j < len(groupTag.NotTag); j++ {
  238. notTagList = append(notTagList, fmt.Sprint(groupTag.NotTag[j]))
  239. }
  240. notTagSQL = fmt.Sprintf(notHasAllSql, strings.Join(notTagList, ","))
  241. }
  242. // 同时有: 正常标签 or 非标签
  243. if normalTagSQL != "" && notTagSQL != "" {
  244. tagSql = fmt.Sprintf(" (%s or %s) ", normalTagSQL, notTagSQL)
  245. } else if normalTagSQL != "" {
  246. // 只有正常标签 : 正常标签
  247. tagSql = fmt.Sprintf(" (%s) ", normalTagSQL)
  248. } else if notTagSQL != "" {
  249. // 只有非标签 :
  250. tagSql = fmt.Sprintf(" (%s) ", notTagSQL)
  251. }
  252. }
  253. sqlList = append(sqlList, tagSql)
  254. }
  255. u.countUserSQL = fmt.Sprintf(countUserSql, userId, strings.Join(sqlList, " or "))
  256. fmt.Println("baseQuerySQL:", u.baseQuerySQL)
  257. return u.countUserSQL
  258. }
  259. // 从数据库查询
  260. func (u *UserIdConstructor) CountUser(userId string) (count int64) {
  261. if !u.InitTagList() {
  262. return 0
  263. }
  264. rows := entity.ClickhouseConn.QueryRow(context.Background(), u.toCountUserSQL(userId))
  265. if err := rows.Scan(&count); err != nil {
  266. log.Println("QueryBaseUserIdList err:", err)
  267. return
  268. }
  269. return count
  270. }