123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package logic
- import (
- "context"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gtime"
- "newuserSend/internal/consts"
- "newuserSend/internal/dao"
- "newuserSend/internal/model/entity"
- "newuserSend/utility"
- "strings"
- "time"
- )
- var nsm *utility.StationMessage
- func Task(ctx context.Context) {
- addr := g.Cfg().MustGet(ctx, "siteMsg.addr").String()
- action := g.Cfg().MustGet(ctx, "siteMsg.action").String()
- callplatform := g.Cfg().MustGet(ctx, "siteMsg.callPlatform").String()
- nsm = utility.NewStationMessage(addr, action, callplatform)
- start := time.Now()
- defer func() {
- g.Log().Info(ctx, "本轮完成", time.Since(start))
- }()
- // get sendContent
- var sendContent []entity.NewUserSendContent
- err := dao.NewUserSendContent.Ctx(ctx).Scan(&sendContent)
- if err != nil {
- g.Log().Error(ctx, "获取发送内容失败", err)
- return
- }
- // 遍历规则
- for i := 0; i < len(sendContent); i++ {
- g.Log().Infof(ctx, "开始处理第%v个发送规则:id-%v 内容id:%v \n", i, sendContent[i].RuleId, sendContent[i].Id)
- // 查询总数
- count, err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=? and create_time>?", sendContent[i].RuleId, 0).Count()
- if err != nil {
- g.Log().Error(ctx, "获取待发送用户失败", sendContent[i].RuleId, err)
- continue
- }
- g.Log().Info(ctx, "规则%v-获取待发送用户数量-%v", sendContent[i].RuleId, count)
- if count == 0 {
- continue
- }
- userList := []entity.NewUserSendLog{}
- for j := 0; j < count; j += 200 {
- // 分页查跟进规则查询要发送的用户
- err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=?", sendContent[i].RuleId, 0).Limit(200).Scan(&userList)
- if err != nil {
- g.Log().Error(ctx, "分页-获取待发送用户失败", sendContent[i].RuleId, err)
- break
- }
- // 发送消息
- sendMsg(ctx, sendContent[i], userList)
- updateState(ctx, userList)
- }
- }
- }
- func sendMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
- switch content.SendChannel {
- case consts.SendChannelSMS:
- sms(ctx, content, sendUserList)
- case consts.SendChannelSiteMsg:
- siteMsg(ctx, content, sendUserList)
- default:
- g.Log().Error(ctx, "SendChannel 类型有误", content)
- }
- }
- func sms(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
- // 处理短信请求数据
- varmap := map[string]interface{}{}
- if content.SmsVar != "" {
- smsvars := strings.Split(content.SmsVar, ",")
- for i := 0; i < len(smsvars); i++ {
- switch smsvars[i] {
- case consts.SmsVarMonth:
- varmap[smsvars[i]] = gtime.Now().Month()
- case consts.SmsVarYear:
- varmap[smsvars[i]] = gtime.Now().Year()
- }
- }
- }
- records := []utility.SmsRecord{}
- for i := 0; i < len(sendUserList); i++ {
- r := utility.SmsRecord{
- Mobile: sendUserList[i].Phone,
- }
- if len(varmap) > 0 {
- r.TpContent = &varmap
- }
- records = append(records, r)
- }
- utility.SendSms(ctx, records, content.SmsId)
- }
- func siteMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) {
- // 处理消息变量
- msgInfo := content.MsgContent
- msgTitle := content.MsgTitle
- monthStr := fmt.Sprintf("%v", gtime.Now().Month())
- msgTitle = strings.ReplaceAll(msgTitle, consts.SiteMsgMonth, monthStr)
- urls := strings.Split(content.MsgUrl, ",")
- var pcUrl, androidUrl, iosUrl, weChatUrl string
- if len(urls) > 0 {
- if len(urls) != 4 {
- pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[0], urls[0], urls[0]
- } else {
- pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[1], urls[2], urls[3]
- }
- }
- mgoIds := []string{}
- for i := 0; i < len(sendUserList); i++ {
- mgoIds = append(mgoIds, sendUserList[i].UserId)
- }
- msgType := g.Cfg().MustGet(ctx, "siteMsg.msgType").Int()
- parm := utility.MessageParam{
- UserIds: strings.Join(mgoIds, ","), Title: msgTitle, Content: msgInfo, Link: pcUrl, AndroidUrl: androidUrl, IosUrl: iosUrl, WeChatUrl: weChatUrl, MsgType: msgType,
- }
- nsm.SendStationMessages(ctx, parm)
- }
- func updateState(ctx context.Context, sendUserList []entity.NewUserSendLog) {
- ids := []int{}
- for i := 0; i < len(sendUserList); i++ {
- ids = append(ids, sendUserList[i].Id)
- }
- _, err := dao.NewUserSendLog.Ctx(ctx).Data("state=1").WhereIn("id", ids).Update()
- if err != nil {
- g.Log().Error(ctx, "更新发送状态失败:", err, ids)
- return
- }
- }
|