task.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package logic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gtime"
  7. "newuserSend/internal/consts"
  8. "newuserSend/internal/dao"
  9. "newuserSend/internal/model/entity"
  10. "newuserSend/utility"
  11. "strings"
  12. "time"
  13. )
  14. var nsm *utility.StationMessage
  15. func Task(ctx context.Context) {
  16. addr := g.Cfg().MustGet(ctx, "siteMsg.addr").String()
  17. action := g.Cfg().MustGet(ctx, "siteMsg.action").String()
  18. callplatform := g.Cfg().MustGet(ctx, "siteMsg.callPlatform").String()
  19. nsm = utility.NewStationMessage(addr, action, callplatform)
  20. start := time.Now()
  21. defer func() {
  22. g.Log().Info(ctx, "本轮完成", time.Since(start))
  23. }()
  24. // get sendContent
  25. var sendContent []entity.NewUserSendContent
  26. err := dao.NewUserSendContent.Ctx(ctx).Scan(&sendContent)
  27. if err != nil {
  28. g.Log().Error(ctx, "获取发送内容失败", err)
  29. return
  30. }
  31. // 遍历规则
  32. for i := 0; i < len(sendContent); i++ {
  33. g.Log().Infof(ctx, "开始处理第%v个发送规则:id-%v 内容id:%v \n", i, sendContent[i].RuleId, sendContent[i].Id)
  34. // 查询总数
  35. count, err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=? and create_time>?", sendContent[i].RuleId, 0).Count()
  36. if err != nil {
  37. g.Log().Error(ctx, "获取待发送用户失败", sendContent[i].RuleId, err)
  38. continue
  39. }
  40. g.Log().Info(ctx, "规则%v-获取待发送用户数量-%v", sendContent[i].RuleId, count)
  41. if count == 0 {
  42. continue
  43. }
  44. userList := []entity.NewUserSendLog{}
  45. for j := 0; j < count; j += 200 {
  46. // 分页查跟进规则查询要发送的用户
  47. err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=?", sendContent[i].RuleId, 0).Limit(200).Scan(&userList)
  48. if err != nil {
  49. g.Log().Error(ctx, "分页-获取待发送用户失败", sendContent[i].RuleId, err)
  50. break
  51. }
  52. // 发送消息
  53. sendMsg(ctx, sendContent[i], userList)
  54. updateState(ctx, userList)
  55. }
  56. }
  57. }
  58. func sendMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
  59. switch content.SendChannel {
  60. case consts.SendChannelSMS:
  61. sms(ctx, content, sendUserList)
  62. case consts.SendChannelSiteMsg:
  63. siteMsg(ctx, content, sendUserList)
  64. default:
  65. g.Log().Error(ctx, "SendChannel 类型有误", content)
  66. }
  67. }
  68. func sms(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
  69. // 处理短信请求数据
  70. varmap := map[string]interface{}{}
  71. if content.SmsVar != "" {
  72. smsvars := strings.Split(content.SmsVar, ",")
  73. for i := 0; i < len(smsvars); i++ {
  74. switch smsvars[i] {
  75. case consts.SmsVarMonth:
  76. varmap[smsvars[i]] = gtime.Now().Month()
  77. case consts.SmsVarYear:
  78. varmap[smsvars[i]] = gtime.Now().Year()
  79. }
  80. }
  81. }
  82. records := []utility.SmsRecord{}
  83. for i := 0; i < len(sendUserList); i++ {
  84. r := utility.SmsRecord{
  85. Mobile: sendUserList[i].Phone,
  86. }
  87. if len(varmap) > 0 {
  88. r.TpContent = &varmap
  89. }
  90. records = append(records, r)
  91. }
  92. utility.SendSms(ctx, records, content.SmsId)
  93. }
  94. func siteMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
  95. // 处理消息变量
  96. msgInfo := content.MsgContent
  97. msgTitle := content.MsgTitle
  98. monthStr := fmt.Sprintf("%v", gtime.Now().Month())
  99. msgTitle = strings.ReplaceAll(msgTitle, consts.SiteMsgMonth, monthStr)
  100. urls := strings.Split(content.MsgUrl, ",")
  101. var pcUrl, androidUrl, iosUrl, weChatUrl string
  102. if len(urls) > 0 {
  103. if len(urls) != 4 {
  104. pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[0], urls[0], urls[0]
  105. } else {
  106. pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[1], urls[2], urls[3]
  107. }
  108. }
  109. mgoIds := []string{}
  110. for i := 0; i < len(sendUserList); i++ {
  111. mgoIds = append(mgoIds, sendUserList[i].UserId)
  112. }
  113. msgType := g.Cfg().MustGet(ctx, "siteMsg.msgType").Int()
  114. parm := utility.MessageParam{
  115. UserIds: strings.Join(mgoIds, ","), Title: msgTitle, Content: msgInfo, Link: pcUrl, AndroidUrl: androidUrl, IosUrl: iosUrl, WeChatUrl: weChatUrl, MsgType: msgType,
  116. }
  117. nsm.SendStationMessages(ctx, parm)
  118. }
  119. func updateState(ctx context.Context, sendUserList []entity.NewUserSendLog) {
  120. ids := []int{}
  121. for i := 0; i < len(sendUserList); i++ {
  122. ids = append(ids, sendUserList[i].Id)
  123. }
  124. _, err := dao.NewUserSendLog.Ctx(ctx).Data("state=1").WhereIn("id", ids).Update()
  125. if err != nil {
  126. g.Log().Error(ctx, "更新发送状态失败:", err, ids)
  127. return
  128. }
  129. }