task.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package logic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/v2/container/gset"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/os/gtime"
  9. "newuserGet/internal/dao"
  10. "newuserGet/internal/model/do"
  11. "newuserGet/internal/model/entity"
  12. "strings"
  13. "time"
  14. )
  15. func Task(ctx context.Context) {
  16. start := time.Now()
  17. defer func() {
  18. g.Log().Info(ctx, "本轮完成", time.Since(start))
  19. }()
  20. // get rule
  21. var rules []entity.NewUserSendRule
  22. err := dao.NewUserSendRule.Ctx(ctx).Scan(&rules)
  23. if err != nil {
  24. g.Log().Error(ctx, "获取规则失败", err)
  25. return
  26. }
  27. // 遍历规则
  28. for i := 0; i < len(rules); i++ {
  29. g.Log().Infof(ctx, "开始处理第%v个规则:id-%v name:%v \n", i, rules[i].Id, rules[i].Name)
  30. // 获取注册时间规则用户和过滤行为
  31. userList, err := getFromRegisterBehavior(ctx, rules[i])
  32. if err != nil {
  33. g.Log().Error(ctx, err)
  34. continue
  35. }
  36. if len(userList) == 0 {
  37. g.Log().Info(ctx, "当前规则getFromRegisterBehavior未获取到有效用户", rules[i].Id, rules[i].Name)
  38. continue
  39. }
  40. if rules[i].CallState != "" {
  41. // 过滤外呼状态
  42. userList, err = filterCallState(ctx, rules[i], userList)
  43. if err != nil {
  44. g.Log().Error(ctx, err)
  45. continue
  46. }
  47. if len(userList) == 0 {
  48. g.Log().Info(ctx, "当前规则filterCallState未获取到有效用户", rules[i].Id, rules[i].Name)
  49. continue
  50. }
  51. }
  52. if rules[i].Trailstatus != "" {
  53. // 过滤销售进程
  54. userList = filterTrailStatus(ctx, rules[i], userList)
  55. }
  56. if len(userList) > 0 {
  57. // 保存用户
  58. saveList := []do.NewUserSendLog{}
  59. for j := 0; j <= len(userList); j++ {
  60. saveList = append(saveList, do.NewUserSendLog{
  61. Phone: userList[j].Phone,
  62. UserId: userList[j].MgoId,
  63. CreateTime: gtime.Now(),
  64. })
  65. }
  66. _, err = dao.NewUserSendLog.Ctx(ctx).Data(saveList).Insert()
  67. if err != nil {
  68. g.Log().Errorf(ctx, "第%v个规则 保存失败 :id-%v name:%v err:%v %v\n", i, rules[i].Id, rules[i].Name, err, saveList)
  69. continue
  70. }
  71. g.Log().Infof(ctx, "完成--第%v个规则:id-%v name:%v \n", i, rules[i].Id, rules[i].Name)
  72. } else {
  73. g.Log().Infof(ctx, "第%v个规则:id-%v name:%v 未获取到有效用户 \n", i, rules[i].Id, rules[i].Name)
  74. }
  75. }
  76. }
  77. type User struct {
  78. Phone string `json:"phone"`
  79. MgoId string `json:"mgoId"`
  80. }
  81. const (
  82. OrSQL = "SELECT groupBitmapOrState(bitobj) from pub_tags.dwd_d_tag ddt WHERE ddt.id in (%s) "
  83. BitMapSQL = "SELECT bitmapAnd((%s),(%s)) as userIds"
  84. QueryUserIdSQL = `SELECT
  85. DISTINCT dmp.mgoUserId as mgoId,dmp.phone as phone
  86. from
  87. pub_tags.dwd_mgo_position dmp
  88. WHERE
  89. bitmapHasAny( (%s),
  90. bitmapBuild([toUInt64(dmp.baseUserId)])) and dmp.mgoUserId!='' and dmp.phone!='';`
  91. CallStateNotDialed = "0" // 未拨打
  92. CallStateNotDeal = "notDeal" // 振铃未接通
  93. CallStateIvr = "ivr" // ivr
  94. CallStatDealing = "dealing" //已接听
  95. )
  96. // 通过时间和行为计算用户
  97. func getFromRegisterBehavior(ctx context.Context, rule entity.NewUserSendRule) (userList []*User, err error) {
  98. start := time.Now()
  99. defer func() {
  100. g.Log().Info(ctx, "getFromRegisterBehavior 耗时:", time.Since(start))
  101. }()
  102. // 时间标签并 之后与行为标签交
  103. // 处理注册时间标签
  104. if rule.RegisterTagIds == "" {
  105. return nil, errors.New("注册时间标签id为空")
  106. }
  107. regQuery := fmt.Sprintf(OrSQL, rule.RegisterTagIds)
  108. filterQuery := regQuery
  109. if rule.BehaviorTagIds != "" {
  110. //拼接行为标签sql
  111. behQuery := fmt.Sprintf(OrSQL, rule.BehaviorTagIds)
  112. filterQuery = fmt.Sprintf(BitMapSQL, regQuery, behQuery)
  113. }
  114. // 处理mgoid和手机号 拼接sql
  115. finalQuery := fmt.Sprintf(QueryUserIdSQL, filterQuery)
  116. g.Log().Info(ctx, "getFromRegisterBehavior 查询sql", finalQuery)
  117. err = dao.DwdDUserTag.DB().Raw(finalQuery).Scan(&userList)
  118. if err != nil {
  119. return nil, err
  120. }
  121. g.Log().Info(ctx, userList)
  122. g.Log().Info(ctx, "getFromRegisterBehavior 获得用户数量", len(userList))
  123. return
  124. }
  125. // 外呼状态
  126. func filterCallState(ctx context.Context, rule entity.NewUserSendRule, userList []*User) (newUserList []*User, err error) {
  127. start := time.Now()
  128. defer func() {
  129. g.Log().Info(ctx, "filterCallState 耗时:", time.Since(start))
  130. }()
  131. callstateList := strings.Split(rule.CallState, ",")
  132. for i := 0; i < len(userList); i++ {
  133. for j := 0; j < len(callstateList); j++ {
  134. // 未拨打 判断不存在
  135. if callstateList[j] == CallStateNotDialed {
  136. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).Exist()
  137. if err != nil {
  138. g.Log().Error(ctx, "filterCallState CallStateNotDialed err:", err)
  139. continue
  140. }
  141. if !exist {
  142. newUserList = append(newUserList, userList[i])
  143. continue
  144. }
  145. }
  146. // 已接听 至少一个记录
  147. if callstateList[j] == CallStatDealing {
  148. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=? and State=?", userList[i].Phone, CallStatDealing).Exist()
  149. if err != nil {
  150. g.Log().Error(ctx, "filterCallState CallStatDealing err:", err, userList[i])
  151. continue
  152. }
  153. if exist {
  154. newUserList = append(newUserList, userList[i])
  155. continue
  156. }
  157. }
  158. // 其他的 判断是否存在记录
  159. findState := strings.Split(callstateList[j], "#")
  160. exist, err := dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).WhereNotIn(dao.VoiceRecord.Columns().CallState, findState).Exist()
  161. if err != nil {
  162. g.Log().Error(ctx, "filterCallState otherstate err:", err, findState, userList[i])
  163. continue
  164. }
  165. if !exist {
  166. continue
  167. }
  168. // 存在则查询是否包含其他记录
  169. exist, err = dao.VoiceRecord.Ctx(ctx).Where("CalledNo=?", userList[i].Phone).WhereNotIn(dao.VoiceRecord.Columns().CallState, findState).Exist()
  170. if err != nil {
  171. g.Log().Error(ctx, "filterCallState otherstate notin err:", err, findState, userList[i])
  172. continue
  173. }
  174. if !exist {
  175. newUserList = append(newUserList, userList[i])
  176. continue
  177. }
  178. }
  179. }
  180. g.Log().Info(ctx, "filterCallState 获得用户数量", len(userList))
  181. return
  182. }
  183. // 跟进状态
  184. func filterTrailStatus(ctx context.Context, rule entity.NewUserSendRule, userList []*User) (newUserList []*User) {
  185. start := time.Now()
  186. defer func() {
  187. g.Log().Info(ctx, "filterTrailStatus 耗时:", time.Since(start))
  188. }()
  189. for i := 0; i < len(userList); i++ {
  190. var trailstatus string
  191. err := dao.DwdFCrmClueInfo.Ctx(ctx).Fields("trailstatus").Where("userid=?", userList[i]).Order("trailstatus_time desc").Limit(1).Scan(&trailstatus)
  192. if err != nil {
  193. g.Log().Error(ctx, "查询跟进状态异常", err)
  194. continue
  195. }
  196. // 判断最新跟进状态是否与规则设置的相等
  197. trailstatusSet := gset.NewStrSetFrom(strings.Split(rule.Trailstatus, ","), true)
  198. if trailstatus != "" && trailstatusSet.Contains(trailstatus) {
  199. newUserList = append(newUserList, userList[i])
  200. }
  201. }
  202. g.Log().Info(ctx, "filterCallState 获得用户数量", len(userList))
  203. return
  204. }