소스 검색

fix:消息更新修改

duxin 11 달 전
부모
커밋
f4b9a66d5f
1개의 변경된 파일52개의 추가작업 그리고 11개의 파일을 삭제
  1. 52 11
      rpc/internal/common/newSendMsgService.go

+ 52 - 11
rpc/internal/common/newSendMsgService.go

@@ -4,6 +4,7 @@ import (
 	"app.yhyue.com/moapp/MessageCenter/entity"
 	"app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
 	"app.yhyue.com/moapp/MessageCenter/rpc/type/message"
+	qutil "app.yhyue.com/moapp/jybase/common"
 	"app.yhyue.com/moapp/jybase/redis"
 	"context"
 	"errors"
@@ -109,15 +110,14 @@ func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
 	group_id := MsgGroupIdMap[int(in.MsgType)]
 	var ids []string
 	for _, v := range userIdArr {
-		ids = append(ids, fmt.Sprintf(`'%s'`, v))
+		ids = append(ids, v)
 		if len(ids) == 1000 {
-			idStr := strings.Join(ids, ",")
-			go Update(idStr, in.MsgLogId)
+			go Update(ids, in.MsgLogId)
 			ids = []string{}
 		}
 	}
 	if len(ids) > 0 {
-		go Update(strings.Join(ids, ","), in.MsgLogId)
+		go Update(ids, in.MsgLogId)
 	}
 	//p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址
 	equityName, equityAddr := "", ""
@@ -187,16 +187,57 @@ func DelRedis(userId string, msgType int64, groupId int) {
 	redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId))
 }
 
-func Update(str string, msgLogId int64) {
-	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
-	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
-	if err1 != nil {
-		log.Printf("批量更新message_user_summary出错:%s", err1)
-		return
+func Update(ids []string, msgLogId int64) {
+	// 查询是否存在 都存在updae 不存在的插入
+	row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT  userId  FROM message_user_summary where  userId in ('%s')", strings.Join(ids, `','`)))
+	var userMsg []map[string]interface{}
+	_ = row.Scan(&userMsg)
+	if userMsg != nil && len(userMsg) == len(ids) {
+		log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
+		err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
+		if err1 != nil {
+			log.Printf("批量更新message_user_summary出错:%s", err1)
+			return
+		}
+	} else {
+		log.Println("匹配到数据量:", len(userMsg))
+		var uData, sqlArr []string
+		for _, id := range ids {
+			var b bool
+			for _, m := range userMsg {
+				if id == qutil.InterfaceToStr(m["userId"]) {
+					b = true
+					break
+				}
+			}
+			if b {
+				uData = append(uData, id)
+			} else {
+				sqlArr = append(sqlArr, fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", id, msgLogId))
+			}
+		}
+		log.Println("缺失数据量:", len(sqlArr))
+		if len(uData) > 0 {
+			err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(uData, `','`)))
+			if err1 != nil {
+				log.Printf("批量更新message_user_summary出错:%s", err1)
+				return
+			}
+		}
+		if len(sqlArr) > 0 {
+			//新用户需要insert
+			sql := "INSERT INTO message_user_summary values %s"
+			log.Println("sql", sql)
+			err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(sql, strings.Join(sqlArr, ",")))
+			if err1 != nil {
+				//插入失败
+				log.Println("insert message_user_summary表出错,error", err1)
+			}
+		}
 	}
 }
 
-//附件下载、剑鱼币活动到期提醒存消息发送记录
+// 附件下载、剑鱼币活动到期提醒存消息发送记录
 func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 {
 	groupId := MsgGroupIdMap[int(in.MsgType)]
 	id := entity.Mysql.Insert("message_send_log", map[string]interface{}{