Przeglądaj źródła

Merge branch 'master' into feature/v1.2.15

lianbingjie 1 rok temu
rodzic
commit
2d16e652b4
1 zmienionych plików z 170 dodań i 107 usunięć
  1. 170 107
      rpc/internal/common/task.go

+ 170 - 107
rpc/internal/common/task.go

@@ -18,7 +18,7 @@ var GlobMsgMap map[int]map[string]interface{}
 
 func LoadTask() {
 	// 每隔10分钟执行一次
-	LoadMsgOnTime()
+	StatisticalUser(true)
 	c := cron.New(cron.WithSeconds())
 	c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
 	c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
@@ -42,12 +42,12 @@ func LoadMsgOnTime() {
 }
 
 func FreeIntelUserPush() {
-	data := messageData(config.ConfigJson.FreePushNumber)
-	if data == nil || len(*data) == 0 {
+	ids := FreeMessageData(config.ConfigJson.FreePushNumber)
+	if len(ids) == 0 {
 		return
 	}
-	users := IntelUser(true)
-	PushData(users, data)
+	users := StatisticalUser(true)
+	PushData(users, ids, nil)
 }
 
 func PayIntelUserPush() {
@@ -55,149 +55,203 @@ func PayIntelUserPush() {
 	if data == nil || len(*data) == 0 {
 		return
 	}
-	users := IntelUser(false)
-	PushData(users, data)
+	users := StatisticalUser(false)
+	PushData(users, nil, data)
 }
 
-func PushData(users []string, data *[]map[string]interface{}) {
+func PushData(users []string, ids []int64, data *[]map[string]interface{}) {
 	log.Printf("需推送用户:%d\n", len(users))
-	var ids []int64
-	if data != nil {
-		for _, m := range *data {
-			_id := common.InterfaceToStr(m["_id"])
-			var link []string
-			link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
-			mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
-			link = append(link, mobLink)
-			link = append(link, mobLink)
-			link = append(link, mobLink)
+	switch len(ids) {
+	case 0: //付费用户
+		if data != nil {
+			for _, m := range *data {
+				_id := common.InterfaceToStr(m["_id"])
+				var link []string
+				link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
+				mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
+				link = append(link, mobLink)
+				link = append(link, mobLink)
+				link = append(link, mobLink)
 
-			iData := map[string]interface{}{
-				"msg_type":    9,
-				"title":       "您有一条专属商机情报",
-				"content":     fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
-				"send_mode":   1,
-				"send_time":   time.Now().Format("2006-01-02 15:04:05"),
-				"send_status": 4,
-				"update_time": time.Now().Format("2006-01-02 15:04:05"),
-				"createtime":  time.Now().Format("2006-01-02 15:04:05"),
-				"link":        strings.Join(link, ","),
-				"isdel":       1,
-				"send_userid": "商机情报定时推送",
-				"sign":        0,
-				"group_id":    5,
+				iData := map[string]interface{}{
+					"msg_type":    9,
+					"title":       "您有一条专属商机情报",
+					"content":     fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
+					"send_mode":   1,
+					"send_time":   time.Now().Format("2006-01-02 15:04:05"),
+					"send_status": 4,
+					"update_time": time.Now().Format("2006-01-02 15:04:05"),
+					"createtime":  time.Now().Format("2006-01-02 15:04:05"),
+					"link":        strings.Join(link, ","),
+					"isdel":       1,
+					"send_userid": "商机情报定时推送",
+					"sign":        0,
+					"group_id":    5,
+				}
+				logId := entity.Mysql.Insert("message_send_log", iData)
+				ids = append(ids, logId)
 			}
-			id := entity.Mysql.Insert("message_send_log", iData)
-			ids = append(ids, id)
 		}
-	}
-
-	if len(ids) > 0 {
+		if len(ids) > 0 {
+			var bits []string
+			for _, i2 := range ids {
+				bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+			}
+			biyStr := strings.Join(bits, ",")
+			err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, biyStr, 5))
+			if err != nil {
+				log.Println("message_summary err=== ", err.Error())
+				return
+			}
+			var userIds []string
+			for k, user := range users {
+				userIds = append(userIds, user)
+				if len(userIds) == 1000 || k == len(users)-1 {
+					UpdateBatch(userIds, biyStr)
+					userIds = []string{}
+				}
+			}
+		}
+	default: //免费用户
 		var bits []string
 		for _, i2 := range ids {
 			bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
 		}
-		err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, strings.Join(bits, ","), 5))
-		if err != nil {
-			log.Println("message_summary err=== ", err.Error())
+		log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ",")))
+		err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ",")))
+		if err1 != nil {
+			log.Printf("批量更新message_user_summary出错:%s", err1)
 			return
 		}
-
-		var userIds []string
-		for k, user := range users {
-			userIds = append(userIds, user)
-			if len(userIds) == 1000 || k == len(users)-1 {
-				UpdateBatch(userIds, ids)
-				userIds = []string{}
-			}
+		for _, userid := range users {
+			keyString := fmt.Sprintf(MsgCountKey, userid, 5)
+			redis.Del(redisModule, keyString)
 		}
 	}
+	log.Println("PushData完成")
 }
 
 // 用户
-func IntelUser(isFree bool) []string {
-	mUser := make(map[string]bool)
+func StatisticalUser(isFree bool) []string {
+	var ids []string
+	mUser := make(map[string]bool) //商机管理用户
 	data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on  b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`)
-	if data != nil { //统计商机管理用户
+	if data != nil && len(*data) > 0 { //统计商机管理用户
 		for _, m := range *data {
-			mUser[common.InterfaceToStr(m["phone"])] = true
+			if common.InterfaceToStr(m["phone"]) != "" {
+				mUser[common.InterfaceToStr(m["phone"])] = true
+			}
 		}
 	}
-	var uData []string
+	log.Printf("获取商机管理用户:%d\n", len(mUser))
+	//企业大会员或超级订阅
+	entPayUser, ok := entity.MQFW.Find("ent_user", map[string]interface{}{
+		"$or": []map[string]interface{}{
+			{ //个人订阅
+				"i_vip_status": map[string]interface{}{
+					"$gt": 0,
+				},
+			},
+			{ //个人大会员
+				"i_member_status": map[string]interface{}{
+					"$gt": 0,
+				},
+			},
+		},
+	}, nil, `{"i_userid":1}`, false, -1, -1)
+	if ok && entPayUser != nil && len(*entPayUser) > 0 {
+		var userIds []string
+		for k, m := range *entPayUser {
+			userIds = append(userIds, common.InterfaceToStr(m["i_userid"]))
+			if len(userIds) == 100 || k == len(*entPayUser)-1 {
+				idSql := fmt.Sprintf(`SELECT phone FROM entniche_user WHERE id in (%s)`, strings.Join(userIds, `,`))
+				phoneArr := entity.Mysql.SelectBySql(idSql)
+				if phoneArr != nil && len(*phoneArr) > 0 {
+					for _, m2 := range *phoneArr {
+						if common.InterfaceToStr(m2["phone"]) != "" {
+							mUser[common.InterfaceToStr(m2["phone"])] = true
+						}
+					}
+				}
+				userIds = []string{}
+			}
+		}
+	}
+	log.Printf("获取商机管理+企业付费用户:%d\n", len(mUser))
 	switch isFree {
-	case false:
-		user, ok := entity.MQFW.Find("user", map[string]interface{}{
-			"i_appid": 2,
+	case false: //付费用户
+		payMap := make(map[string]bool)
+		payUsers, _ := entity.MQFW.Find("user", map[string]interface{}{
 			"$or": []map[string]interface{}{
-				{
-					"i_vip_status": map[string]interface{}{"$gt": 0},
+				{ //个人订阅
+					"i_vip_status": map[string]interface{}{
+						"$gt": 0,
+					},
 				},
-				{
-					"i_member_status": map[string]interface{}{"$gt": 0},
+				{ //个人大会员
+					"i_member_status": map[string]interface{}{
+						"$gt": 0,
+					},
 				},
 			},
-		}, "", `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
-		if ok && user != nil && len(*user) > 0 {
-			for _, m := range *user {
+		}, nil, `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
+		if payUsers != nil && len(*payUsers) > 0 {
+			for _, m := range *payUsers {
 				phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
-				uData = append(uData, common.InterfaceToStr(m["_id"]))
-				if mUser[phone] { //获取剩余商机管理用户
+				payMap[common.InterfaceToStr(m["_id"])] = true
+				if mUser[phone] { //避免重复获取
 					delete(mUser, phone)
 				}
 			}
-			if len(mUser) > 0 { //统计剩余商机管理用户
-				var (
-					phones []string
-					count  int
-				)
-
-				for phone := range mUser {
-					count++
-					phones = append(phones, phone)
-					if len(phones) == 100 || count == len(mUser) {
-						user1, ok1 := entity.MQFW.Find("user", map[string]interface{}{
-							"i_appid": 2,
-							"$or": []map[string]interface{}{
-								{
-									"s_phone": map[string]interface{}{
-										"$in": phones,
-									},
-								},
-								{
-									"s_m_phone": map[string]interface{}{
-										"$in": phones,
-									},
-								},
-							},
-						}, "", `{"_id":1}`, false, -1, -1)
-						if ok1 && user1 != nil && len(*user1) > 0 {
-							for _, m := range *user1 {
-								uData = append(uData, common.InterfaceToStr(m["_id"]))
-							}
-						}
-						phones = []string{}
+		}
+		var (
+			phones []string
+			count  int
+		)
+		maxCount := len(mUser)
+		for phone := range mUser { //获取剩余商机管理与企业付费用户
+			count++
+			phones = append(phones, phone)
+			if len(phones) == 100 || count == maxCount {
+				entPayUsers, _ := entity.MQFW.Find("user", map[string]interface{}{
+					"$or": []map[string]interface{}{
+						{"s_phone": map[string]interface{}{ //企业订阅||大会员||商机管理
+							"$in": phones,
+						}},
+						{"s_m_phone": map[string]interface{}{ //企业订阅||大会员||商机管理
+							"$in": phones,
+						}},
+					},
+				}, nil, `{"_id":1}`, false, -1, -1)
+				if entPayUsers != nil && len(*entPayUsers) > 0 {
+					for _, m := range *entPayUsers {
+						payMap[common.InterfaceToStr(m["_id"])] = true
 					}
 				}
+				phones = []string{}
 			}
 		}
-
+		for id := range payMap {
+			ids = append(ids, id)
+		}
 	case true:
 		sess := entity.MQFW.GetMgoConn()
 		defer entity.MQFW.DestoryMongoConn(sess)
 		iter := sess.DB("qfw").C("user").Find(map[string]interface{}{
 			"i_appid": 2,
-		}).Select(map[string]interface{}{"i_vip_status": 1, "i_member_status": 1, "_id": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
+		}).Select(map[string]interface{}{"_id": 1, "i_vip_status": 1, "i_member_status": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
 		for m := make(map[string]interface{}); iter.Next(&m); {
 			if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 {
 				phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
 				if !mUser[phone] {
-					uData = append(uData, common.InterfaceToStr(m["_id"]))
+					ids = append(ids, common.InterfaceToStr(m["_id"]))
 				}
 			}
 			m = map[string]interface{}{}
 		}
 	}
-	return uData
+	log.Printf("用户类型:%v,用户数:%d", isFree, len(ids))
+	return ids
 }
 
 // 获取推送消息
@@ -213,14 +267,23 @@ func messageData(number int) *[]map[string]interface{} {
 	return data
 }
 
-func UpdateBatch(ids []string, msgLogId []int64) {
-	str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
-	var bits []string
-	for _, i2 := range msgLogId {
-		bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
+func FreeMessageData(number int) []int64 {
+	var ids []int64
+	now := time.Now()
+	startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).String()
+	m := entity.Mysql.SelectBySql(fmt.Sprintf(`SELECT id FROM message_send_log WHERE createtime > '%s' and send_userid = '商机情报定时推送' ORDER BY createtime DESC LIMIT %d `, startTime, number))
+	if m != nil && len(*m) > 0 {
+		for _, i2 := range *m {
+			ids = append(ids, common.Int64All(i2["id"]))
+		}
 	}
-	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
-	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
+	return ids
+}
+
+func UpdateBatch(ids []string, bitStr string) {
+	str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
+	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str))
+	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str))
 	if err1 != nil {
 		log.Printf("批量更新message_user_summary出错:%s", err1)
 		return