task.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package logic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. "github.com/gogf/gf/v2/os/gtime"
  8. "newuserGet/internal/dao"
  9. "newuserGet/internal/model/do"
  10. "newuserGet/internal/model/entity"
  11. "strings"
  12. "time"
  13. )
  14. func Task(ctx context.Context) {
  15. start := time.Now()
  16. defer func() {
  17. g.Log().Info(ctx, "本轮完成", time.Since(start))
  18. }()
  19. // get rule
  20. var rules []entity.NewUserSendRule
  21. err := dao.NewUserSendRule.Ctx(ctx).Scan(&rules)
  22. if err != nil {
  23. g.Log().Error(ctx, "获取规则失败", err)
  24. return
  25. }
  26. // 遍历规则
  27. for i := 0; i < len(rules); i++ {
  28. g.Log().Infof(ctx, "开始处理第%v个规则:id-%v name:%v \n", i, rules[i].Id, rules[i].Name)
  29. // 获取注册时间规则用户和过滤行为
  30. userList, err := getFromRegisterBehavior(ctx, rules[i])
  31. if err != nil {
  32. g.Log().Error(ctx, err)
  33. continue
  34. }
  35. if len(userList) == 0 {
  36. g.Log().Info(ctx, "当前规则getFromRegisterBehavior未获取到有效用户", rules[i].Id, rules[i].Name)
  37. continue
  38. }
  39. if rules[i].CallState != "" {
  40. // 过滤外呼状态
  41. userList, err = filterCallState(ctx, rules[i], userList)
  42. if err != nil {
  43. g.Log().Error(ctx, err)
  44. continue
  45. }
  46. if len(userList) == 0 {
  47. g.Log().Info(ctx, "当前规则filterCallState未获取到有效用户", rules[i].Id, rules[i].Name)
  48. continue
  49. }
  50. }
  51. if rules[i].Trailstatus != "" {
  52. // 过滤销售进程
  53. userList = filterTrailStatus(ctx, rules[i], userList)
  54. }
  55. // 保存用户
  56. saveList := []do.NewUserSendLog{}
  57. for j := 0; j <= len(userList); j++ {
  58. saveList = append(saveList, do.NewUserSendLog{
  59. Phone: userList[j].Phone,
  60. UserId: userList[j].MgoId,
  61. CreateTime: gtime.Now(),
  62. })
  63. }
  64. _, err = dao.NewUserSendLog.Ctx(ctx).Data(saveList).Insert()
  65. if err != nil {
  66. g.Log().Errorf(ctx, "第%v个规则 保存失败 :id-%v name:%v err:%v %v\n", i, rules[i].Id, rules[i].Name, err, saveList)
  67. continue
  68. }
  69. g.Log().Infof(ctx, "完成--第%v个规则:id-%v name:%v \n", i, rules[i].Id, rules[i].Name)
  70. }
  71. }
  72. type User struct {
  73. Phone string `json:"phone"`
  74. MgoId string `json:"mgoId"`
  75. }
  76. const (
  77. OrSQL = "SELECT bitobj from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) "
  78. BitMapSQL = "SELECT bitmapAnd((%s),(%s)) as userIds"
  79. QueryUserIdSQL = `SELECT
  80. DISTINCT dmp.mgoUserId as mgoId,dmp.phone as phone
  81. from
  82. pub_tags.dwd_mgo_position dmp
  83. WHERE
  84. bitmapHasAny( (%s),
  85. bitmapBuild([toUInt64(dmp.baseUserId)])) and dmp.mgoUserId!='' and dmp.phone!='';`
  86. CallStateNotDialed = "0" // 未拨打
  87. CallStateNotDeal = "notDeal" // 振铃未接通
  88. CallStateIvr = "ivr" // ivr
  89. CallStatDealing = "dealing" //已接听
  90. )
  91. // 通过时间和行为计算用户
  92. func getFromRegisterBehavior(ctx context.Context, rule entity.NewUserSendRule) (userList []*User, err error) {
  93. start := time.Now()
  94. defer func() {
  95. g.Log().Info(ctx, "getFromRegisterBehavior 耗时:", time.Since(start))
  96. }()
  97. // 时间标签并 之后与行为标签交
  98. // 处理注册时间标签
  99. if rule.RegisterTagIds == "" {
  100. return nil, errors.New("注册时间标签id为空")
  101. }
  102. regQuery := fmt.Sprintf(OrSQL, rule.RegisterTagIds)
  103. filterQuery := regQuery
  104. if rule.BehaviorTagIds != "" {
  105. //拼接行为标签sql
  106. behQuery := fmt.Sprintf(OrSQL, rule.BehaviorTagIds)
  107. filterQuery = fmt.Sprintf(BitMapSQL, regQuery, behQuery)
  108. }
  109. // 处理mgoid和手机号 拼接sql
  110. finalQuery := fmt.Sprintf(QueryUserIdSQL, filterQuery)
  111. g.Log().Info(ctx, "getFromRegisterBehavior 查询sql", finalQuery)
  112. err = dao.DwdDUserTag.DB().Raw(finalQuery).Scan(&userList)
  113. if err != nil {
  114. return nil, err
  115. }
  116. g.Log().Info(ctx, userList)
  117. return
  118. }
  119. // 外呼状态
  120. func filterCallState(ctx context.Context, rule entity.NewUserSendRule, userList []*User) (newUserList []*User, err error) {
  121. start := time.Now()
  122. defer func() {
  123. g.Log().Info(ctx, "filterCallState 耗时:", time.Since(start))
  124. }()
  125. callstateList := strings.Split(rule.CallState, ",")
  126. for i := 0; i < len(userList); i++ {
  127. for j := 0; j < len(callstateList); j++ {
  128. // 未拨打 判断不存在
  129. if callstateList[j] == CallStateNotDialed {
  130. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).Exist()
  131. if err != nil {
  132. g.Log().Error(ctx, "filterCallState CallStateNotDialed err:", err)
  133. continue
  134. }
  135. if !exist {
  136. newUserList = append(newUserList, userList[i])
  137. continue
  138. }
  139. }
  140. // 已接听 至少一个记录
  141. if callstateList[j] == CallStatDealing {
  142. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=? and State=?", userList[i].Phone, CallStatDealing).Exist()
  143. if err != nil {
  144. g.Log().Error(ctx, "filterCallState CallStatDealing err:", err, userList[i])
  145. continue
  146. }
  147. if exist {
  148. newUserList = append(newUserList, userList[i])
  149. continue
  150. }
  151. }
  152. // 其他的 判断是否存在记录
  153. findState := strings.Split(callstateList[j], "#")
  154. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).WhereNotIn(dao.VoiceRecord.Columns().CallState, findState).Exist()
  155. if err != nil {
  156. g.Log().Error(ctx, "filterCallState otherstate err:", err, findState, userList[i])
  157. continue
  158. }
  159. if !exist {
  160. continue
  161. }
  162. // 存在则查询是否包含其他记录
  163. exist, err = dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).WhereNotIn(dao.VoiceRecord.Columns().CallState, findState).Exist()
  164. if err != nil {
  165. g.Log().Error(ctx, "filterCallState otherstate notin err:", err, findState, userList[i])
  166. continue
  167. }
  168. if !exist {
  169. newUserList = append(newUserList, userList[i])
  170. continue
  171. }
  172. }
  173. }
  174. return
  175. }
  176. // 跟进状态
  177. func filterTrailStatus(ctx context.Context, rule entity.NewUserSendRule, userList []*User) (newUserList []*User) {
  178. start := time.Now()
  179. defer func() {
  180. g.Log().Info(ctx, "filterTrailStatus 耗时:", time.Since(start))
  181. }()
  182. for i := 0; i < len(userList); i++ {
  183. }
  184. return
  185. }