package logic import ( "context" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "newuserSend/internal/consts" "newuserSend/internal/dao" "newuserSend/internal/model/entity" "newuserSend/utility" "strings" "time" ) var nsm *utility.StationMessage func Task(ctx context.Context) { addr := g.Cfg().MustGet(ctx, "siteMsg.addr").String() action := g.Cfg().MustGet(ctx, "siteMsg.action").String() callplatform := g.Cfg().MustGet(ctx, "siteMsg.callPlatform").String() nsm = utility.NewStationMessage(addr, action, callplatform) start := time.Now() defer func() { g.Log().Info(ctx, "本轮完成", time.Since(start)) }() // get sendContent var sendContent []entity.NewUserSendContent err := dao.NewUserSendContent.Ctx(ctx).Scan(&sendContent) if err != nil { g.Log().Error(ctx, "获取发送内容失败", err) return } // 遍历规则 for i := 0; i < len(sendContent); i++ { g.Log().Infof(ctx, "开始处理第%v个发送规则:id-%v 内容id:%v \n", i, sendContent[i].RuleId, sendContent[i].Id) // 查询总数 count, err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=? and create_time>?", sendContent[i].RuleId, 0).Count() if err != nil { g.Log().Error(ctx, "获取待发送用户失败", sendContent[i].RuleId, err) continue } g.Log().Info(ctx, "规则%v-获取待发送用户数量-%v", sendContent[i].RuleId, count) if count == 0 { continue } userList := []entity.NewUserSendLog{} for j := 0; j < count; j += 200 { // 分页查跟进规则查询要发送的用户 err := dao.NewUserSendLog.Ctx(ctx).Where("rule_id=? and state=?", sendContent[i].RuleId, 0).Limit(200).Scan(&userList) if err != nil { g.Log().Error(ctx, "分页-获取待发送用户失败", sendContent[i].RuleId, err) break } // 发送消息 sendMsg(ctx, sendContent[i], userList) updateState(ctx, userList) } } } func sendMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) { switch content.SendChannel { case consts.SendChannelSMS: sms(ctx, content, sendUserList) case consts.SendChannelSiteMsg: siteMsg(ctx, content, sendUserList) default: g.Log().Error(ctx, "SendChannel 类型有误", content) } } func sms(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) { // 处理短信请求数据 varmap := map[string]interface{}{} if content.SmsVar != "" { smsvars := strings.Split(content.SmsVar, ",") for i := 0; i < len(smsvars); i++ { switch smsvars[i] { case consts.SmsVarMonth: varmap[smsvars[i]] = gtime.Now().Month() case consts.SmsVarYear: varmap[smsvars[i]] = gtime.Now().Year() } } } records := []utility.SmsRecord{} for i := 0; i < len(sendUserList); i++ { r := utility.SmsRecord{ Mobile: sendUserList[i].Phone, } if len(varmap) > 0 { r.TpContent = &varmap } records = append(records, r) } utility.SendSms(ctx, records, content.SmsId) } func siteMsg(ctx context.Context, content entity.NewUserSendContent, sendUserList []entity.NewUserSendLog) { // 处理消息变量 msgInfo := content.MsgContent msgTitle := content.MsgTitle monthStr := fmt.Sprintf("%v", gtime.Now().Month()) msgTitle = strings.ReplaceAll(msgTitle, consts.SiteMsgMonth, monthStr) urls := strings.Split(content.MsgUrl, ",") var pcUrl, androidUrl, iosUrl, weChatUrl string if len(urls) > 0 { if len(urls) != 4 { pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[0], urls[0], urls[0] } else { pcUrl, androidUrl, iosUrl, weChatUrl = urls[0], urls[1], urls[2], urls[3] } } mgoIds := []string{} for i := 0; i < len(sendUserList); i++ { mgoIds = append(mgoIds, sendUserList[i].UserId) } msgType := g.Cfg().MustGet(ctx, "siteMsg.msgType").Int() parm := utility.MessageParam{ UserIds: strings.Join(mgoIds, ","), Title: msgTitle, Content: msgInfo, Link: pcUrl, AndroidUrl: androidUrl, IosUrl: iosUrl, WeChatUrl: weChatUrl, MsgType: msgType, } nsm.SendStationMessages(ctx, parm) } func updateState(ctx context.Context, sendUserList []entity.NewUserSendLog) { ids := []int{} for i := 0; i < len(sendUserList); i++ { ids = append(ids, sendUserList[i].Id) } _, err := dao.NewUserSendLog.Ctx(ctx).Data("state=1").WhereIn("id", ids).Update() if err != nil { g.Log().Error(ctx, "更新发送状态失败:", err, ids) return } }