package common import ( "encoding/json" "fmt" "log" "net/url" "regexp" "strings" "time" "app.yhyue.com/moapp/MessageCenter/entity" "app.yhyue.com/moapp/MessageCenter/rpc/internal/config" "app.yhyue.com/moapp/MessageCenter/rpc/type/message" "app.yhyue.com/moapp/MessageCenter/util" "app.yhyue.com/moapp/jybase/common" dataFormat "app.yhyue.com/moapp/jybase/date" m "app.yhyue.com/moapp/jybase/mongodb" "app.yhyue.com/moapp/jybase/redis" qrpc "app.yhyue.com/moapp/jybase/rpc" ) type WxTmplPush struct { MgoId, OpenId, Position, OpushId, JpushId, AppPoneType string //UserId 用户mgoId, OpenId 微信id, Position 职位id Config *WxTmplConfig CustomWxTpl *message.CustomWxTpl } var AllMsgType func() map[int64]WxTmplConfig var allMsgValueKeys = []string{"$class", "$title", "$detail", "$date", "$row4", "$note"} var AppPushMsgType map[int]string type WxTmplConfig struct { Name string //信息名称 Switch string //开关 TmplId string //微信模版id TmplValue string //微信模版 } const CacheDb = "msgCount" func MessageType() (func() map[int64]WxTmplConfig, []map[string]interface{}) { var data []map[string]interface{} rData1 := entity.Mysql.SelectBySql(`SELECT * FROM message_group ORDER BY sequence ASC`) switchName := map[int64]WxTmplConfig{} appMsgType := map[int]string{} if rData1 != nil && len(*rData1) > 0 { data = *rData1 for _, v := range *rData1 { groupId := util.IntAll(v["group_id"]) switchs := util.ObjToString(v["switch"]) appMsgType[groupId] = switchs } AppPushMsgType = appMsgType } rData2 := entity.Mysql.SelectBySql(`SELECT g.switch,c.wxtmpl_Id,c.msg_type,c.msg_name,c.wxtmp_value FROM message_class c INNER JOIN message_group g on c.group_id =g.group_id WHERE c.wxtmpl_Id IS NOT NULL`) for _, mData := range *rData2 { if msg_type, settingKey, messageName, tmplId, tmplValue := util.Int64All(mData["msg_type"]), util.ObjToString(mData["switch"]), util.ObjToString(mData["msg_name"]), util.ObjToString(mData["wxtmpl_Id"]), util.ObjToString(mData["wxtmp_value"]); msg_type > 0 && settingKey != "" && messageName != "" && tmplId != "" && tmplValue != "" { switchName[msg_type] = WxTmplConfig{ Name: messageName, Switch: settingKey, TmplId: tmplId, TmplValue: tmplValue, } } } return func() map[int64]WxTmplConfig { return switchName }, data } var getSendTotalRedisKey = func(uFlag ...string) string { if len(uFlag) == 0 { //统计当日信息总发送量 return fmt.Sprintf("messageCenter_SendWxMsgTotal_%s", time.Now().Format(dataFormat.Date_yyyyMMdd)) } //统计单用户今日发送量 return fmt.Sprintf("messageCenter_SendWxMsgTotal_%s_%s", time.Now().Format(dataFormat.Date_yyyyMMdd), uFlag[0]) } func GetWxTmplConfig(msgType int64) (*WxTmplConfig, error) { if val, ok := AllMsgType()[msgType]; ok { return &val, nil } return nil, fmt.Errorf("未知消息类型") } func (stm *WxTmplPush) SendMsg(link, title, detail, date, row4 string, note string) error { if stm.Config == nil || stm.Config.TmplId == "" || (stm.MgoId != "" && stm.OpenId != "" && stm.Position != "") || link == "" { return fmt.Errorf("缺少参数 stm.Config.TmplId:%v stm.MgoId:%v stm.OpenId:%v stm.Position:%v link:%v ", stm.Config.TmplId, stm.MgoId, stm.OpenId, stm.Position, link) } // 校验推送是否开启 if err := stm.getUserOpenIdAndWxPushState(); err != nil { return err } var msg map[string]*qrpc.TmplItem if stm.CustomWxTpl != nil && stm.CustomWxTpl.TplId != "" { stm.Config.TmplId = stm.CustomWxTpl.TplId msg = map[string]*qrpc.TmplItem{} for k, v := range stm.CustomWxTpl.TmplData { msg[k] = &qrpc.TmplItem{ Value: v.Value, Color: v.Color, } } } else { // 获取消息 msgTemp, err := stm.getMessage(title, detail, date, row4, note) if err != nil { return err } msg = msgTemp } // 校验发送量及频率 err, noteFunc := stm.IncrCount() if err != nil { return err } // 发送信息 autoLoginHref := fmt.Sprintf("%s/swordfish/SingleLogin?toHref=%s", config.ConfigJson.WxWebdomain, url.QueryEscape(link)) if _, err := stm.Send(autoLoginHref, msg); err != nil { // 发送失败数量回滚 stm.RollBack() return err } if noteFunc != nil { noteFunc() } return nil } var ( regSpecial = regexp.MustCompile(`\\n|\\t|\\'|\\"|\n|\t|\'|\"`) ) // getMessage 获取消息内容 func (stm *WxTmplPush) getMessage(title, detail, date, row4 string, note string) (map[string]*qrpc.TmplItem, error) { var formatValue string = stm.Config.TmplValue for _, key := range allMsgValueKeys { switch key { case "$class": formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(stm.Config.Name, "")) case "$title": formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(title, "")) case "$detail": formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(detail, "")) case "$date": formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(date, "")) case "$row4": formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(row4, "")) case "$note": if note == "" { formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(config.ConfigJson.WxTmplConfig.CloseNotice, "")) } else { formatValue = strings.ReplaceAll(formatValue, key, regSpecial.ReplaceAllString(note, "")) } } } bValue := map[string]*qrpc.TmplItem{} if err := json.Unmarshal([]byte(formatValue), &bValue); err != nil { return nil, fmt.Errorf("格式化信息内容异常 %s", err.Error()) } for _, item := range bValue { val := []rune(item.Value) if len(val) > 20 { item.Value = string(val[:17]) + "..." } } return bValue, nil } // RollBack 发送失败数量回滚 func (stm *WxTmplPush) RollBack() { uCache, allCache := getSendTotalRedisKey(stm.OpenId), getSendTotalRedisKey() redis.Decrby(CacheDb, allCache, 1) redis.Decrby(CacheDb, uCache, 1) redis.Del(CacheDb, fmt.Sprintf("%s_sendwait", uCache)) //清除发送间隔 } func (stm *WxTmplPush) IncrCount() (error, func()) { uCache, allCache := getSendTotalRedisKey(stm.OpenId), getSendTotalRedisKey() //当日微信模版消息发送总量 //校验发送间隔 if sendWait, _ := redis.Exists(CacheDb, fmt.Sprintf("%s_sendwait", uCache)); sendWait { return fmt.Errorf("发送模版消息频繁,稍后重试"), nil } var total int64 if total = redis.Incr(CacheDb, allCache); total == 1 { _ = redis.SetExpire(CacheDb, allCache, 60*60*24) } if total > config.ConfigJson.WxTmplConfig.Limit.Total { redis.Decrby(CacheDb, allCache, 1) return fmt.Errorf("已达发送总量上限"), nil } uTotal := redis.Incr(CacheDb, uCache) if uTotal == 1 { _ = redis.SetExpire(CacheDb, uCache, 60*60*24) } //当日用户发送数量 if uTotal > config.ConfigJson.WxTmplConfig.Limit.OneDayLimit { redis.Decrby(CacheDb, allCache, 1) redis.Decrby(CacheDb, uCache, 1) return fmt.Errorf("已达单该用户发送总量上限"), nil } //下次发送时间 redis.Put(CacheDb, fmt.Sprintf("%s_sendwait", uCache), 1, config.ConfigJson.WxTmplConfig.Limit.DuringMine*60) return nil, func() { for _, num := range config.ConfigJson.WxTmplConfig.Limit.Alert.Nums { if total == num { util.SendRetryMail(3, strings.Join(config.ConfigJson.WxTmplConfig.Limit.Alert.ToMail, ","), strings.Join(config.ConfigJson.WxTmplConfig.Limit.Alert.CcMail, ","), "剑鱼微信模版告警邮件", fmt.Sprintf("今日发送微信模版信息数量已达%d条,总量%d条", total, config.ConfigJson.WxTmplConfig.Limit.Total), entity.GmailAuth) } } } } // getUserOpenIdAndWxPushState 查询微信openid微信消息通知状态 // mId mongoUserid、oId 用户openid、pId positionId 用户职位id func (stm *WxTmplPush) getUserOpenIdAndWxPushState() error { uData := stm.GetUserPushInfo() if uData == nil { return fmt.Errorf("未查询到用户信息") } stm.OpenId = common.ObjToString(uData["s_m_openid"]) if stm.OpenId == "" { return fmt.Errorf("未查询到用户微信信息") } log.Println("======", stm.Config.Switch) if pushSetMap := common.ObjToMap(uData["o_pushset"]); pushSetMap != nil && len(*pushSetMap) > 0 { if pushKeyMap := common.ObjToMap((*pushSetMap)[stm.Config.Switch]); pushKeyMap != nil && len(*pushKeyMap) > 0 { if common.Int64All((*pushKeyMap)["i_wxpush"]) == 1 { return nil } } } return fmt.Errorf("未开启推送设置") } func (stm *WxTmplPush) GetUserPushInfo() map[string]interface{} { uData := func() map[string]interface{} { query := map[string]interface{}{} if stm.OpenId != "" { query["s_m_openid"] = stm.OpenId } else if stm.MgoId != "" { query["_id"] = m.StringTOBsonId(stm.MgoId) } else if stm.Position != "" { uInfo := entity.Mysql.SelectBySql("SELECT user_id FROM base_service.base_position WHERE id = ? ", stm.Position) if uInfo != nil && len(*uInfo) > 0 { if baseUserId := common.Int64All((*uInfo)[0]["user_id"]); baseUserId != 0 { query["base_user_id"] = baseUserId } } } if len(query) > 0 { rData, _ := entity.MQFW.FindOneByField("user", query, fmt.Sprintf(`{"s_m_openid":1,"s_opushid": 1, "s_jpushid": 1, "s_appponetype": 1, "s_appversion": 1,"o_pushset.%s":1}`, stm.Config.Switch)) if rData != nil && len(*rData) > 0 { return *rData } } return nil }() return uData } // Send 发送微信模版消息 func (stm *WxTmplPush) Send(link string, msg map[string]*qrpc.TmplItem) (pushOk bool, err error) { return qrpc.WxSendTmplMsg(config.ConfigJson.WxTmplConfig.RpcAddr, &qrpc.WxTmplMsg{ OpenId: stm.OpenId, TplId: stm.Config.TmplId, TmplData: msg, Url: link, }) }