瀏覽代碼

fix:商机管理消息推送修改

duxin 1 年之前
父節點
當前提交
e363092745
共有 1 個文件被更改,包括 74 次插入55 次删除
  1. 74 55
      rpc/internal/common/task.go

+ 74 - 55
rpc/internal/common/task.go

@@ -18,7 +18,6 @@ var GlobMsgMap map[int]map[string]interface{}
 
 func LoadTask() {
 	// 每隔10分钟执行一次
-	LoadMsgOnTime()
 	c := cron.New(cron.WithSeconds())
 	c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
 	c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
@@ -42,12 +41,12 @@ func LoadMsgOnTime() {
 }
 
 func FreeIntelUserPush() {
-	data := messageData(config.ConfigJson.FreePushNumber)
-	if data == nil || len(*data) == 0 {
+	id := FreeMessageData(config.ConfigJson.FreePushNumber)
+	if id == 0 {
 		return
 	}
 	users := StatisticalUser(true)
-	PushData(users, data)
+	PushData(users, id, nil)
 }
 
 func PayIntelUserPush() {
@@ -56,62 +55,76 @@ func PayIntelUserPush() {
 		return
 	}
 	users := StatisticalUser(false)
-	PushData(users, data)
+	PushData(users, 0, data)
 }
 
-func PushData(users []string, data *[]map[string]interface{}) {
+func PushData(users []string, id int64, data *[]map[string]interface{}) {
 	log.Printf("需推送用户:%d\n", len(users))
 	var ids []int64
-	if data != nil {
-		for _, m := range *data {
-			_id := common.InterfaceToStr(m["_id"])
-			var link []string
-			link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
-			mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
-			link = append(link, mobLink)
-			link = append(link, mobLink)
-			link = append(link, mobLink)
+	switch id {
+	case 0: //付费用户
+		if data != nil {
+			for _, m := range *data {
+				_id := common.InterfaceToStr(m["_id"])
+				var link []string
+				link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
+				mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
+				link = append(link, mobLink)
+				link = append(link, mobLink)
+				link = append(link, mobLink)
 
-			iData := map[string]interface{}{
-				"msg_type":    9,
-				"title":       "您有一条专属商机情报",
-				"content":     fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
-				"send_mode":   1,
-				"send_time":   time.Now().Format("2006-01-02 15:04:05"),
-				"send_status": 4,
-				"update_time": time.Now().Format("2006-01-02 15:04:05"),
-				"createtime":  time.Now().Format("2006-01-02 15:04:05"),
-				"link":        strings.Join(link, ","),
-				"isdel":       1,
-				"send_userid": "商机情报定时推送",
-				"sign":        0,
-				"group_id":    5,
+				iData := map[string]interface{}{
+					"msg_type":    9,
+					"title":       "您有一条专属商机情报",
+					"content":     fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
+					"send_mode":   1,
+					"send_time":   time.Now().Format("2006-01-02 15:04:05"),
+					"send_status": 4,
+					"update_time": time.Now().Format("2006-01-02 15:04:05"),
+					"createtime":  time.Now().Format("2006-01-02 15:04:05"),
+					"link":        strings.Join(link, ","),
+					"isdel":       1,
+					"send_userid": "商机情报定时推送",
+					"sign":        0,
+					"group_id":    5,
+				}
+				logId := entity.Mysql.Insert("message_send_log", iData)
+				ids = append(ids, logId)
 			}
-			id := entity.Mysql.Insert("message_send_log", iData)
-			ids = append(ids, id)
 		}
-	}
-
-	if len(ids) > 0 {
-		var bits []string
-		for _, i2 := range ids {
-			bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+		if len(ids) > 0 {
+			var bits []string
+			for _, i2 := range ids {
+				bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+			}
+			biyStr := strings.Join(bits, ",")
+			err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, biyStr, 5))
+			if err != nil {
+				log.Println("message_summary err=== ", err.Error())
+				return
+			}
+			var userIds []string
+			for k, user := range users {
+				userIds = append(userIds, user)
+				if len(userIds) == 1000 || k == len(users)-1 {
+					UpdateBatch(userIds, biyStr)
+					userIds = []string{}
+				}
+			}
 		}
-		err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, strings.Join(bits, ","), 5))
-		if err != nil {
-			log.Println("message_summary err=== ", err.Error())
+	default: //免费用户
+		log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where 1=1`, id))
+		err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where 1=1`, id))
+		if err1 != nil {
+			log.Printf("批量更新message_user_summary出错:%s", err1)
 			return
 		}
-
-		var userIds []string
-		for k, user := range users {
-			userIds = append(userIds, user)
-			if len(userIds) == 1000 || k == len(users)-1 {
-				UpdateBatch(userIds, ids)
-				userIds = []string{}
-			}
+		for _, userid := range users {
+			keyString := fmt.Sprintf(MsgCountKey, userid, 5)
+			redis.Del(redisModule, keyString)
 		}
 	}
+	log.Println("PushData完成")
 }
 
 // 用户
@@ -250,14 +263,20 @@ func messageData(number int) *[]map[string]interface{} {
 	return data
 }
 
-func UpdateBatch(ids []string, msgLogId []int64) {
-	str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
-	var bits []string
-	for _, i2 := range msgLogId {
-		bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+func FreeMessageData(number int) int64 {
+	now := time.Now()
+	startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).String()
+	m := entity.Mysql.SelectBySql(fmt.Sprintf(`SELECT id FROM message_send_log WHERE createtime > '%s' and send_userid = '商机情报定时推送' ORDER BY createtime DESC LIMIT %d `, startTime, number))
+	if m != nil && len(*m) > 0 {
+		return common.Int64All((*m)[0]["id"])
 	}
-	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
-	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
+	return 0
+}
+
+func UpdateBatch(ids []string, biyStr string) {
+	str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
+	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, biyStr, str))
+	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, biyStr, str))
 	if err1 != nil {
 		log.Printf("批量更新message_user_summary出错:%s", err1)
 		return