package common import ( "app.yhyue.com/moapp/MessageCenter/entity" "app.yhyue.com/moapp/MessageCenter/rpc/internal/config" "app.yhyue.com/moapp/MessageCenter/rpc/type/message" "app.yhyue.com/moapp/jybase/redis" "context" "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" "log" "strconv" "strings" "sync" "time" ) var MsgGroupIdMap map[int]int func SetMsgSummary(newMsg, groupId, msgType int64) error { //更新所有消息 if groupId == 11 { groupId = msgType //根据msgType更新待办二级分类汇总 /*err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType)) if err1 != nil { //插入失败 return err1 }*/ } err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId)) if err != nil { return err } return nil } func NewUserSendMsg(in *message.NewUserInsertMsgReq) error { userIdArr := strings.Split(in.UserIds, ",") //positionIdArr := strings.Split(in.PositionIds, ",") if len(userIdArr) == 0 { return errors.New("无效的用户id") } //wg := &sync.WaitGroup{} group_id := MsgGroupIdMap[int(in.MsgType)] for i := 0; i < len(userIdArr); i++ { if userIdArr[i] == "" { continue } //查询 /*var positionId int64 if len(positionIdArr) == len(userIdArr) { positionId = common.Int64All(positionIdArr[i]) }*/ row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", userIdArr[i])) var count uint64 row.Scan(&count) if count > 0 { //存在则更新 err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, userIdArr[i])) if err1 != nil { log.Println("新用户update message_user_summary表出错,error", err1) return err1 } } else { //新用户需要insert sql := "INSERT INTO message_user_summary values " sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", userIdArr[i], int(in.MsgLogId)) log.Println("sql", sql) err1 := entity.ClickhouseConn.Exec(context.Background(), sql) if err1 != nil { //插入失败 log.Println("新用户insert message_user_summary表出错,error", err1) return err1 } } //微信推送模板消息、app push pushData := WxTmplAndPush{ MsgType: in.MsgType, Title: in.Title, Content: in.Content, WxPushUrl: in.WxPushUrl, AppPushUrl: in.AppPushUrl, ProductName: in.ProductName, OrderId: in.OrderId, OrderMoney: in.OrderMoney, Row4: in.Row4, } SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "") /*key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id) redis.Del(redisModule, key) if in.MsgType == 11 || in.MsgType == 12 { key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType) redis.Del(redisModule, key1) }*/ DelRedis(userIdArr[i], in.MsgType, group_id) } //wg.Wait() return nil } func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error { userIdArr := strings.Split(in.UserIds, ",") //positionIdArr := strings.Split(in.PositionIds, ",") if len(userIdArr) == 0 { return errors.New("无效的用户id") } wg := &sync.WaitGroup{} group_id := MsgGroupIdMap[int(in.MsgType)] var ids []string for _, v := range userIdArr { ids = append(ids, fmt.Sprintf(`'%s'`, v)) if len(ids) == 1000 { idStr := strings.Join(ids, ",") go Update(idStr, in.MsgLogId) ids = []string{} } } if len(ids) > 0 { go Update(strings.Join(ids, ","), in.MsgLogId) } //p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址 equityName, equityAddr := "", "" if in.MsgType == config.ConfigJson.EquityInfoMsgType { equityRs := strings.Split(in.Content, "#jy#") if len(equityRs) != 3 { log.Println("消息内容格式有误:", in.Content) return errors.New("无效的消息内容格式") } in.Content = equityRs[0] equityName = equityRs[1] equityAddr = equityRs[2] } for i := 0; i < len(userIdArr); i++ { if userIdArr[i] == "" { continue } //查询 wg.Add(1) entity.SaveConcurrencyChan <- 1 /*var positionId int64 if len(positionIdArr) == len(userIdArr) { positionId = common.Int64All(positionIdArr[i]) }*/ go func(v string) { defer func() { <-entity.SaveConcurrencyChan wg.Done() }() //微信推送模板消息、app push pushData := WxTmplAndPush{ MsgType: in.MsgType, Title: in.Title, Content: in.Content, WxPushUrl: in.WxPushUrl, AppPushUrl: in.AppPushUrl, ProductName: in.ProductName, OrderId: in.OrderId, OrderMoney: in.OrderMoney, Row4: in.Row4, } SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr) /*key := fmt.Sprintf(MsgCountKey, v, group_id) redis.Del(redisModule, key) if in.MsgType == 11 || in.MsgType == 12 { key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType) redis.Del(redisModule, key1) }*/ DelRedis(v, in.MsgType, group_id) }(userIdArr[i]) } wg.Wait() return nil } func DelRedis(userId string, msgType int64, groupId int) { key := fmt.Sprintf(MsgCountKey, userId, groupId) redis.Del(redisModule, key) if msgType == 11 || msgType == 12 { key1 := fmt.Sprintf(MsgClassCountKey, userId, msgType) redis.Del(redisModule, key1) } if groupId == 5 || groupId == 11 { redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, userId)) } redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, userId)) redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId)) } func Update(str string, msgLogId int64) { log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str)) err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str)) if err1 != nil { log.Printf("批量更新message_user_summary出错:%s", err1) return } } //附件下载、剑鱼币活动到期提醒存消息发送记录 func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 { groupId := MsgGroupIdMap[int(in.MsgType)] id := entity.Mysql.Insert("message_send_log", map[string]interface{}{ "send_usergroup_id": "", "send_usergroup_name": "", "msg_type": in.MsgType, "title": in.Title, "content": in.Content, "send_mode": 2, "send_time": time.Now().Format("2006-01-02 15:04:05"), "send_status": 4, "update_time": time.Now().Format("2006-01-02 15:04:05"), "link": in.Link, "isdel": 1, "send_userid": "", "update_user": "", "Sign": 5, "menu_name": "message", "group_id": groupId, }) if id > 0 { //更新消息汇总表 err := SetMsgSummary(id, int64(groupId), in.MsgType) if err != nil { log.Println("更新消息汇总表出错:", err) return 0 } return id } return 0 } func ConvertToBitmap(num int) (res []uint32) { binary := strconv.FormatInt(int64(num), 2) total := len(binary) for i := total - 1; i >= 0; i-- { if binary[i] == '1' { res = append(res, uint32(total-i)) } } return } type WxTmplAndPush struct { MsgType int64 Title string Content string WxPushUrl string AppPushUrl string ProductName string OrderId string OrderMoney string Row4 string SendUserId string } func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int, equityName, equityAddr string) { nTime := time.Now().Format("2006-01-02 15:04:05") //发送消息成功,推送微信、app //fmt.Println("this.MsgType", this.MsgType) pushConfig, err := GetWxTmplConfig(this.MsgType) if err != nil { logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error())) } p := &WxTmplPush{ Config: pushConfig, } p.MgoId = v if this.MsgType == 10 { this.Title = this.ProductName this.Content = this.OrderId nTime = this.OrderMoney } // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}} if this.MsgType != 1 && this.MsgType != 10 { if this.MsgType == config.ConfigJson.EquityInfoMsgType { // p459 服务地址特殊处理 err = p.SendMsg(this.WxPushUrl, this.Title, equityName, nTime, this.Row4, equityAddr) } else { err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4, "") } if err != nil { logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error())) } else { logx.Infof("SendWxTmplMsg uId success %s ", v) } } if this.MsgType == 1 { mst := new(WxTmplConfig) mst.Switch = AppPushMsgType[group_id] p.Config = mst } uData := p.GetUserPushInfo() //app推送 if this.MsgType != 10 { category := "" if this.SendUserId == "cbgl" { category = "服务通知_工作事项" } if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil { logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error())) } } }