wangshan hace 1 año
padre
commit
9291b9195a

+ 1 - 0
jyBXBase/rpc/etc/bxbase.yaml

@@ -33,3 +33,4 @@ NewsCache: # 下面timeout 单位是秒
   Count: # 用户有没有推送数据缓存使用的key
     Key: push_count_%s_%s
     Timeout: 7200
+NewConcurrencyNum: 3

+ 5 - 0
jyBXBase/rpc/init/init.go

@@ -1,12 +1,14 @@
 package init
 
 import (
+	MC "app.yhyue.com/moapp/jybase/common"
 	"app.yhyue.com/moapp/jypkg/compatible"
 	"app.yhyue.com/moapp/jypkg/middleground"
 	"flag"
 	_ "github.com/go-sql-driver/mysql"
 	"github.com/zeromicro/go-zero/core/conf"
 	"jyBXBase/rpc/internal/config"
+	"jyBXBase/rpc/util"
 )
 
 var configFile = flag.String("cf", "etc/bxbase.yaml", "the config file")
@@ -40,4 +42,7 @@ func init() {
 		RegEntManageApplication(C.EntManageApplication)
 	Compatible = compatible.NewCompatible(&Mgo, BaseServiceMysql, MainMysql, Middleground)
 
+	num := MC.If(C.NewConcurrencyNum > 0, C.NewConcurrencyNum, 3).(int)
+	//创建执行池 等待池
+	util.NewLimit(num)
 }

+ 1 - 0
jyBXBase/rpc/internal/config/config.go

@@ -23,6 +23,7 @@ type Config struct {
 		Login     CacheConfig // 登录用户没有推送数据最新标讯使用的key
 		Count     CacheConfig // 查询用户有没有推送数据使用的key
 	}
+	NewConcurrencyNum int //最新标讯并发数
 }
 type CacheConfig struct {
 	Key                string `json:"Key,optional"`                // 缓存key

+ 61 - 40
jyBXBase/rpc/model/newestBidding.go

@@ -2,11 +2,13 @@ package model
 
 import (
 	"app.yhyue.com/moapp/jybase/redis"
+	"context"
 	"encoding/json"
 	"fmt"
 	"jyBXBase/rpc/bxbase"
 	IC "jyBXBase/rpc/init"
 	"jyBXBase/rpc/internal/config"
+	"jyBXBase/rpc/util"
 	"log"
 	"sort"
 	"strings"
@@ -365,49 +367,68 @@ func DataSortInRedis(r *bxbase.NewsetBiddingResp, status int, positionId int64)
 // 延长缓存
 // 获取最新数据 --  延长缓存时间7天---待调整: 1、判断是否需要更新;2、判断订阅信息是否存在,如果存在是否是最新推送,3、订阅信息查推送缓存(只有个人版有缓存:pushcache_2_a )
 func ExtendNewListCache(n *NewSet, in *bxbase.NewestBiddingReq, list []*bxbase.NewestList) {
-	if n.RedisKeyModel.Key != "" {
-		now := time.Now()
-		if n.RedisKeyModel.CacheUpdateKey != "" {
-			var (
-				res = &bxbase.NewsetBiddingResp{
-					Data: &bxbase.NewsetBidding{
-						List: []*bxbase.NewestList{},
-					},
-				}
-				updateTime = redis.GetInt("new", n.RedisKeyModel.CacheUpdateKey)
-			)
-			switch n.RedisStatus {
-			case StatusLoginUser:
-				//十五分钟内 更新过一次 不再更新
-				if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout {
-					// 登录用户
-					roleNewestInfo, _ := GetRoleNewestInfoService(in.AppId, in.MgoUserId, in.NewUserId, in.AccountId, in.EntId, in.EntUserId, in.PositionType, in.PositionId)
-					//当前用户有最新推送信息
-					lastPublishTime := list[len(list)-1].PublishTime
-					if roleNewestInfo.IsHasNewPushData(lastPublishTime) {
-						//  查推送
-						subscribeTime := time.Now()
-						res.Data.List = roleNewestInfo.GetPushHistory()
-						log.Println(in.PositionId, "获取订阅数据 存缓存 耗时:", time.Since(subscribeTime).Seconds())
-					} else {
-						res.Data.List = list
+	if flag := util.ReqLimitInit.Limit(context.Background()); flag != 1 {
+		if flag == -2 {
+			log.Println("等待队列已满")
+		} else if flag == -1 {
+			log.Println("等待超时")
+		}
+	} else {
+		defer util.ReqLimitInit.Release()
+		if n.RedisKeyModel.Key != "" {
+			now := time.Now()
+			if n.RedisKeyModel.CacheUpdateKey != "" {
+				var (
+					res = &bxbase.NewsetBiddingResp{
+						Data: &bxbase.NewsetBidding{
+							List: []*bxbase.NewestList{},
+						},
+					}
+					updateTime = redis.GetInt("new", n.RedisKeyModel.CacheUpdateKey)
+					isDoing    bool
+				)
+				//防止穿透
+				util.ReqLimitLock.Lock()
+				isDoing, _ = redis.Exists("new", fmt.Sprintf("p1_indexMessage_new_recovery_%d", in.PositionId))
+				util.ReqLimitLock.Unlock()
+				if !isDoing {
+					util.ReqLimitLock.Lock()
+					redis.Put("new", fmt.Sprintf("p1_indexMessage_new_recovery_%d", in.PositionId), "1", 15*time.Now().Minute()) //十五分钟
+					util.ReqLimitLock.Unlock()
+					switch n.RedisStatus {
+					case StatusLoginUser:
+						//十五分钟内 更新过一次 不再更新
+						if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout {
+							// 登录用户
+							roleNewestInfo, _ := GetRoleNewestInfoService(in.AppId, in.MgoUserId, in.NewUserId, in.AccountId, in.EntId, in.EntUserId, in.PositionType, in.PositionId)
+							//当前用户有最新推送信息
+							lastPublishTime := list[len(list)-1].PublishTime
+							if roleNewestInfo.IsHasNewPushData(lastPublishTime) {
+								//  查推送
+								subscribeTime := time.Now()
+								res.Data.List = roleNewestInfo.GetPushHistory()
+								log.Println(in.PositionId, "获取订阅数据 存缓存 耗时:", time.Since(subscribeTime).Seconds())
+							} else {
+								res.Data.List = list
+							}
+						}
+					default:
+						//十五分钟内 更新过一次 不再更新
+						if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout {
+							res.Data.List = NewestES(n.Query)
+						}
+					}
+					if len(res.Data.List) > 0 {
+						//更新update time
+						redis.Put("new", n.RedisKeyModel.CacheUpdateKey, now.Unix(), n.RedisKeyModel.CacheUpdateTimeout)
+						go DataSortInRedis(res, n.RedisStatus, in.PositionId)
 					}
 				}
-			default:
-				//十五分钟内 更新过一次 不再更新
-				if int(now.Unix())-updateTime > n.RedisKeyModel.CacheUpdateTimeout {
-					res.Data.List = NewestES(n.Query)
-				}
-			}
-			if len(res.Data.List) > 0 {
-				//更新update time
-				redis.Put("new", n.RedisKeyModel.CacheUpdateKey, now.Unix(), n.RedisKeyModel.CacheUpdateTimeout)
-				go DataSortInRedis(res, n.RedisStatus, in.PositionId)
 			}
+			////剩余时间 在 IC.C.NewsTimeOut 之内
+			//if ttl := redis.GetTTL("new", n.RedisKeyModel.Key); ttl-IC.C.NewsTimeOut < 0 {
+			//
+			//}
 		}
-		////剩余时间 在 IC.C.NewsTimeOut 之内
-		//if ttl := redis.GetTTL("new", n.RedisKeyModel.Key); ttl-IC.C.NewsTimeOut < 0 {
-		//
-		//}
 	}
 }

+ 64 - 7
jyBXBase/rpc/util/common.go

@@ -1,12 +1,14 @@
 package util
 
 import (
+	"context"
 	"fmt"
 	"jyBXBase/rpc/bxbase"
 	IC "jyBXBase/rpc/init"
 	"log"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"app.yhyue.com/moapp/jybase/common"
@@ -28,7 +30,7 @@ const (
 	TopTypesLeadingProject    = "拟建,采购意向"
 )
 
-//是否是付费用户 -bool: true:是 fasle:不是
+// 是否是付费用户 -bool: true:是 fasle:不是
 func Power(userid string) (bool, map[string]interface{}) {
 	isVip, isMember, isEnt := false, false, false
 	vipstatus := 0
@@ -77,7 +79,7 @@ func Power(userid string) (bool, map[string]interface{}) {
 	}
 }
 
-//招标信息是否被收藏
+// 招标信息是否被收藏
 func IsCollByBids(bids, userid string) []string {
 	res := []string{}
 	collBidMap := map[string]bool{}
@@ -128,8 +130,10 @@ func IsCollByBids(bids, userid string) []string {
 }
 
 /*
-    isColl int:	收藏条数的状态 (是否超过100条 超过:2 没超过:1)
+	isColl int:	收藏条数的状态 (是否超过100条 超过:2 没超过:1)
+
 return
+
 	[]string:收藏的id
 */
 func GetCollRedis(userid string, isColl int) []string {
@@ -150,7 +154,7 @@ func GetCollRedis(userid string, isColl int) []string {
 	return redisArr
 }
 
-//招标信息 bid&userid 唯一
+// 招标信息 bid&userid 唯一
 type BidInfo struct {
 	Bid        string `json:"bid"`        //招标信息id 加密后
 	Buyerclass string `json:"buyerclass"` //采购单位类型
@@ -367,7 +371,7 @@ type InfoList struct {
 	FileExists      bool        `json:"fileExists"`
 }
 
-//根据id取内容
+// 根据id取内容
 func GetInfoById(Mgo_bidding mongodb.MongodbSim, bidding, bidding_back string, idlist []map[string]interface{}) []*InfoList {
 	array := make([]*InfoList, len(idlist))
 	if len(idlist) == 0 {
@@ -455,7 +459,7 @@ func GetInfoById(Mgo_bidding mongodb.MongodbSim, bidding, bidding_back string, i
 	return array
 }
 
-//列表单条信息格式化
+// 列表单条信息格式化
 func InfoFormat(p string, info *map[string]interface{}) *InfoList {
 	area := common.ObjToString((*info)["area"])
 	if area == "A" {
@@ -505,7 +509,7 @@ func InfoFormat(p string, info *map[string]interface{}) *InfoList {
 	}
 }
 
-//搜藏列表
+// 搜藏列表
 func CollListSql(c *bxbase.ListReq, isPay bool, userid string) string {
 	sql := fmt.Sprintf(`select bid from %s where userid ='%s'`, "bdcollection", userid)
 	limit := 10
@@ -590,3 +594,56 @@ func CollListSql(c *bxbase.ListReq, isPay bool, userid string) string {
 	sql += fmt.Sprintf(` order by createdate desc limit %v`, limit)
 	return sql
 }
+
+//并发限制
+
+type reqLimit struct {
+	doPool   chan struct{}
+	waitPool chan struct{}
+}
+
+var (
+	ReqLimitInit *reqLimit
+	ReqLimitLock *sync.Mutex
+)
+
+func NewLimit(num int) {
+	doPool := make(chan struct{}, num)
+	for i := 0; i < num; i++ {
+		doPool <- struct{}{}
+	}
+	waitPool := make(chan struct{}, num)
+	for i := 0; i < num; i++ {
+		waitPool <- struct{}{}
+	}
+	ReqLimitInit = &reqLimit{
+		doPool:   doPool,
+		waitPool: waitPool,
+	}
+}
+
+// -2 等待池已满
+// -1 超时
+// 1:可以执行查询
+func (r *reqLimit) Limit(ctx context.Context) int {
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second) //30秒
+	defer cancel()
+	select {
+	case <-r.waitPool:
+		defer func() {
+			r.waitPool <- struct{}{}
+		}()
+		select {
+		case <-r.doPool:
+			return 1
+		case <-ctx.Done(): //超时
+			return -1
+		}
+	default:
+		return -2
+	}
+}
+
+func (r *reqLimit) Release() {
+	r.doPool <- struct{}{}
+}