|
@@ -4,7 +4,6 @@ import (
|
|
|
"app.yhyue.com/moapp/MessageCenter/entity"
|
|
|
"app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
|
|
|
"app.yhyue.com/moapp/MessageCenter/rpc/type/message"
|
|
|
- "app.yhyue.com/moapp/jybase/common"
|
|
|
"app.yhyue.com/moapp/jybase/redis"
|
|
|
"context"
|
|
|
"errors"
|
|
@@ -37,76 +36,66 @@ func SetMsgSummary(newMsg, groupId, msgType int64) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func NewUserSendMsg(in *message.NewUserInsertMsgReq) string {
|
|
|
+func NewUserSendMsg(in *message.NewUserInsertMsgReq) error {
|
|
|
userIdArr := strings.Split(in.UserIds, ",")
|
|
|
- positionIdArr := strings.Split(in.PositionIds, ",")
|
|
|
+ //positionIdArr := strings.Split(in.PositionIds, ",")
|
|
|
if len(userIdArr) == 0 {
|
|
|
- return "无效的用户id"
|
|
|
+ return errors.New("无效的用户id")
|
|
|
}
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
+ //wg := &sync.WaitGroup{}
|
|
|
group_id := MsgGroupIdMap[int(in.MsgType)]
|
|
|
for i := 0; i < len(userIdArr); i++ {
|
|
|
if userIdArr[i] == "" {
|
|
|
continue
|
|
|
}
|
|
|
//查询
|
|
|
- wg.Add(1)
|
|
|
- entity.SaveConcurrencyChan <- 1
|
|
|
- var positionId int64
|
|
|
+ /*var positionId int64
|
|
|
if len(positionIdArr) == len(userIdArr) {
|
|
|
positionId = common.Int64All(positionIdArr[i])
|
|
|
- }
|
|
|
-
|
|
|
- go func(v string, positionId int64) {
|
|
|
- defer func() {
|
|
|
- <-entity.SaveConcurrencyChan
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
-
|
|
|
- row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", v))
|
|
|
- var count uint64
|
|
|
- row.Scan(&count)
|
|
|
- if count > 0 { //存在则更新
|
|
|
- err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, v))
|
|
|
- if err1 != nil {
|
|
|
- log.Println("新用户update message_user_summary表出错,error", err1)
|
|
|
- return
|
|
|
- }
|
|
|
- } else {
|
|
|
- //新用户需要insert
|
|
|
- sql := "INSERT INTO message_user_summary values "
|
|
|
- sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", v, int(in.MsgLogId))
|
|
|
- log.Println("sql", sql)
|
|
|
- err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
|
|
|
- if err1 != nil {
|
|
|
- //插入失败
|
|
|
- log.Println("新用户insert message_user_summary表出错,error", err1)
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- //微信推送模板消息、app push
|
|
|
- pushData := WxTmplAndPush{
|
|
|
- MsgType: in.MsgType,
|
|
|
- Title: in.Title,
|
|
|
- Content: in.Content,
|
|
|
- WxPushUrl: in.WxPushUrl,
|
|
|
- AppPushUrl: in.AppPushUrl,
|
|
|
- ProductName: in.ProductName,
|
|
|
- OrderId: in.OrderId,
|
|
|
- OrderMoney: in.OrderMoney,
|
|
|
- Row4: in.Row4,
|
|
|
+ }*/
|
|
|
+ row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", userIdArr[i]))
|
|
|
+ var count uint64
|
|
|
+ row.Scan(&count)
|
|
|
+ if count > 0 { //存在则更新
|
|
|
+ err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, userIdArr[i]))
|
|
|
+ if err1 != nil {
|
|
|
+ log.Println("新用户update message_user_summary表出错,error", err1)
|
|
|
+ return err1
|
|
|
}
|
|
|
- SentWxTmplAndAppPush(pushData, v, group_id, "", "")
|
|
|
- key := fmt.Sprintf(MsgCountKey, v, group_id)
|
|
|
- redis.Del(redisModule, key)
|
|
|
- if in.MsgType == 11 || in.MsgType == 12 {
|
|
|
- key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
|
|
|
- redis.Del(redisModule, key1)
|
|
|
+ } else {
|
|
|
+ //新用户需要insert
|
|
|
+ sql := "INSERT INTO message_user_summary values "
|
|
|
+ sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", userIdArr[i], int(in.MsgLogId))
|
|
|
+ log.Println("sql", sql)
|
|
|
+ err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
|
|
|
+ if err1 != nil {
|
|
|
+ //插入失败
|
|
|
+ log.Println("新用户insert message_user_summary表出错,error", err1)
|
|
|
+ return err1
|
|
|
}
|
|
|
- }(userIdArr[i], positionId)
|
|
|
+ }
|
|
|
+ //微信推送模板消息、app push
|
|
|
+ pushData := WxTmplAndPush{
|
|
|
+ MsgType: in.MsgType,
|
|
|
+ Title: in.Title,
|
|
|
+ Content: in.Content,
|
|
|
+ WxPushUrl: in.WxPushUrl,
|
|
|
+ AppPushUrl: in.AppPushUrl,
|
|
|
+ ProductName: in.ProductName,
|
|
|
+ OrderId: in.OrderId,
|
|
|
+ OrderMoney: in.OrderMoney,
|
|
|
+ Row4: in.Row4,
|
|
|
+ }
|
|
|
+ SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "")
|
|
|
+ key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
|
|
|
+ redis.Del(redisModule, key)
|
|
|
+ if in.MsgType == 11 || in.MsgType == 12 {
|
|
|
+ key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType)
|
|
|
+ redis.Del(redisModule, key1)
|
|
|
+ }
|
|
|
}
|
|
|
- wg.Wait()
|
|
|
- return ""
|
|
|
+ //wg.Wait()
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
|