123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- package common
- import (
- "app.yhyue.com/moapp/MessageCenter/entity"
- "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
- "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
- "app.yhyue.com/moapp/jybase/redis"
- "context"
- "errors"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "log"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var MsgGroupIdMap map[int]int
- func SetMsgSummary(newMsg, groupId, msgType int64) error {
- //更新所有消息
- gId := groupId
- if groupId == 11 {
- groupId = msgType
- //根据msgType更新待办二级分类汇总
- /*err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType))
- if err1 != nil {
- //插入失败
- return err1
- }*/
- }
- row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_summary where group_id = %d", msgType))
- var count uint64
- row.Scan(&count)
- var err error
- if count > 0 { //存在则更新
- err = entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId))
- } else {
- log.Println("插入消息汇总表:", fmt.Sprintf("INSERT INTO message_summary values (%d,bitmapBuild([toUInt64(%d)]),%d)", msgType, newMsg, gId))
- err = entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf("INSERT INTO message_summary values (%d,bitmapBuild([toUInt64(%d)]),%d)", msgType, newMsg, gId))
- }
- if err != nil {
- return err
- }
- return nil
- }
- func NewUserSendMsg(in *message.NewUserInsertMsgReq) error {
- userIdArr := strings.Split(in.UserIds, ",")
- //positionIdArr := strings.Split(in.PositionIds, ",")
- if len(userIdArr) == 0 {
- return errors.New("无效的用户id")
- }
- //wg := &sync.WaitGroup{}
- group_id := MsgGroupIdMap[int(in.MsgType)]
- for i := 0; i < len(userIdArr); i++ {
- if userIdArr[i] == "" {
- continue
- }
- //查询
- /*var positionId int64
- if len(positionIdArr) == len(userIdArr) {
- positionId = common.Int64All(positionIdArr[i])
- }*/
- row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", userIdArr[i]))
- var count uint64
- row.Scan(&count)
- if count > 0 { //存在则更新
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, userIdArr[i]))
- if err1 != nil {
- log.Println("新用户update message_user_summary表出错,error", err1)
- return err1
- }
- } else {
- //新用户需要insert
- sql := "INSERT INTO message_user_summary values "
- sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", userIdArr[i], int(in.MsgLogId))
- log.Println("sql", sql)
- err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
- if err1 != nil {
- //插入失败
- log.Println("新用户insert message_user_summary表出错,error", err1)
- return err1
- }
- }
- //微信推送模板消息、app push
- pushData := WxTmplAndPush{
- MsgType: in.MsgType,
- Title: in.Title,
- Content: in.Content,
- WxPushUrl: in.WxPushUrl,
- AppPushUrl: in.AppPushUrl,
- ProductName: in.ProductName,
- OrderId: in.OrderId,
- OrderMoney: in.OrderMoney,
- Row4: in.Row4,
- }
- SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "")
- /*key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
- redis.Del(redisModule, key)
- if in.MsgType == 11 || in.MsgType == 12 {
- key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType)
- redis.Del(redisModule, key1)
- }*/
- DelRedis(userIdArr[i], in.MsgType, group_id)
- }
- //wg.Wait()
- return nil
- }
- func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
- userIdArr := strings.Split(in.UserIds, ",")
- //positionIdArr := strings.Split(in.PositionIds, ",")
- if len(userIdArr) == 0 {
- return errors.New("无效的用户id")
- }
- wg := &sync.WaitGroup{}
- group_id := MsgGroupIdMap[int(in.MsgType)]
- var ids []string
- for _, v := range userIdArr {
- ids = append(ids, v)
- if len(ids) == 1000 {
- go Update(ids, in.MsgLogId)
- ids = []string{}
- }
- }
- if len(ids) > 0 {
- go Update(ids, in.MsgLogId)
- }
- //p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址
- equityName, equityAddr := "", ""
- if in.MsgType == config.ConfigJson.EquityInfoMsgType {
- equityRs := strings.Split(in.Content, "#jy#")
- if len(equityRs) != 3 {
- log.Println("消息内容格式有误:", in.Content)
- return errors.New("无效的消息内容格式")
- }
- in.Content = equityRs[0]
- equityName = equityRs[1]
- equityAddr = equityRs[2]
- }
- for i := 0; i < len(userIdArr); i++ {
- if userIdArr[i] == "" {
- continue
- }
- //查询
- wg.Add(1)
- entity.SaveConcurrencyChan <- 1
- /*var positionId int64
- if len(positionIdArr) == len(userIdArr) {
- positionId = common.Int64All(positionIdArr[i])
- }*/
- go func(v string) {
- defer func() {
- <-entity.SaveConcurrencyChan
- wg.Done()
- }()
- //微信推送模板消息、app push
- pushData := WxTmplAndPush{
- MsgType: in.MsgType,
- Title: in.Title,
- Content: in.Content,
- WxPushUrl: in.WxPushUrl,
- AppPushUrl: in.AppPushUrl,
- ProductName: in.ProductName,
- OrderId: in.OrderId,
- OrderMoney: in.OrderMoney,
- Row4: in.Row4,
- }
- SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr)
- /*key := fmt.Sprintf(MsgCountKey, v, group_id)
- redis.Del(redisModule, key)
- if in.MsgType == 11 || in.MsgType == 12 {
- key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
- redis.Del(redisModule, key1)
- }*/
- DelRedis(v, in.MsgType, group_id)
- }(userIdArr[i])
- }
- wg.Wait()
- return nil
- }
- func DelRedis(userId string, msgType int64, groupId int) {
- key := fmt.Sprintf(MsgCountKey, userId, groupId)
- redis.Del(redisModule, key)
- if msgType == 11 || msgType == 12 {
- key1 := fmt.Sprintf(MsgClassCountKey, userId, msgType)
- redis.Del(redisModule, key1)
- }
- if groupId == 5 || groupId == 11 {
- redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, userId))
- }
- redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, userId))
- redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId))
- }
- func Update(ids []string, msgLogId int64) {
- // 查询是否存在 都存在updae 不存在的插入
- userMap := make(map[string]bool)
- row, err := entity.ClickhouseConn.Query(context.Background(), fmt.Sprintf("SELECT userId FROM message_user_summary where userId in ('%s')", strings.Join(ids, `','`)))
- if err != nil {
- log.Println("查询消息失败")
- return
- }
- for row.Next() {
- var userId string
- _ = row.Scan(&userId)
- if userId != "" {
- userMap[userId] = true
- }
- }
- if len(userMap) == len(ids) {
- log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
- if err1 != nil {
- log.Printf("批量更新message_user_summary出错:%s", err1)
- return
- }
- } else {
- log.Println("匹配到数据量:", len(userMap))
- var uData, sqlArr []string
- for _, id := range ids {
- if userMap[id] {
- uData = append(uData, id)
- } else {
- sqlArr = append(sqlArr, fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", id, msgLogId))
- }
- }
- log.Println("缺失数据量:", len(sqlArr))
- if len(uData) > 0 {
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(uData, `','`)))
- if err1 != nil {
- log.Printf("批量更新message_user_summary出错:%s", err1)
- }
- }
- if len(sqlArr) > 0 {
- //新用户需要insert
- sql := "INSERT INTO message_user_summary values %s"
- log.Println("sql", sql)
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(sql, strings.Join(sqlArr, ",")))
- if err1 != nil {
- //插入失败
- log.Println("insert message_user_summary表出错,error", err1)
- }
- }
- }
- }
- // 附件下载、剑鱼币活动到期提醒存消息发送记录
- func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 {
- groupId := MsgGroupIdMap[int(in.MsgType)]
- id := entity.Mysql.Insert("message_send_log", map[string]interface{}{
- "send_usergroup_id": "",
- "send_usergroup_name": "",
- "msg_type": in.MsgType,
- "title": in.Title,
- "content": in.Content,
- "send_mode": 2,
- "send_time": time.Now().Format("2006-01-02 15:04:05"),
- "send_status": 4,
- "update_time": time.Now().Format("2006-01-02 15:04:05"),
- "link": in.Link,
- "isdel": 1,
- "send_userid": "",
- "update_user": "",
- "Sign": 5,
- "menu_name": "message",
- "group_id": groupId,
- })
- if id > 0 {
- //更新消息汇总表
- err := SetMsgSummary(id, int64(groupId), in.MsgType)
- if err != nil {
- log.Println("更新消息汇总表出错:", err)
- return 0
- }
- return id
- }
- return 0
- }
- func ConvertToBitmap(num int) (res []uint32) {
- binary := strconv.FormatInt(int64(num), 2)
- total := len(binary)
- for i := total - 1; i >= 0; i-- {
- if binary[i] == '1' {
- res = append(res, uint32(total-i))
- }
- }
- return
- }
- type WxTmplAndPush struct {
- MsgType int64
- Title string
- Content string
- WxPushUrl string
- AppPushUrl string
- ProductName string
- OrderId string
- OrderMoney string
- Row4 string
- SendUserId string
- }
- func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int, equityName, equityAddr string) {
- nTime := time.Now().Format("2006-01-02 15:04:05")
- //发送消息成功,推送微信、app
- //fmt.Println("this.MsgType", this.MsgType)
- pushConfig, err := GetWxTmplConfig(this.MsgType)
- if err != nil {
- logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
- }
- p := &WxTmplPush{
- Config: pushConfig,
- }
- p.MgoId = v
- if this.MsgType == 10 {
- this.Title = this.ProductName
- this.Content = this.OrderId
- nTime = this.OrderMoney
- }
- // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}}
- if this.MsgType != 1 && this.MsgType != 10 {
- if this.MsgType == config.ConfigJson.EquityInfoMsgType {
- // p459 服务地址特殊处理
- err = p.SendMsg(this.WxPushUrl, this.Title, equityName, nTime, this.Row4, equityAddr)
- } else {
- err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4, "")
- }
- if err != nil {
- logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s this.MsgType:%d", v, err.Error(), this.MsgType))
- } else {
- logx.Infof("SendWxTmplMsg uId success %s ", v)
- }
- }
- if this.MsgType == 1 {
- mst := new(WxTmplConfig)
- mst.Switch = AppPushMsgType[group_id]
- p.Config = mst
- }
- uData := p.GetUserPushInfo()
- //app推送
- if this.MsgType != 10 {
- category := ""
- if this.SendUserId == "cbgl" {
- category = "服务通知_工作事项"
- }
- if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil {
- logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error()))
- }
- }
- }
|