Explorar el Código

Merge branch 'hotfix/v1.1.31.1_ws' of BaseService/jyMicroservices into hotfix/v1.1.31.1

wangshan hace 1 año
padre
commit
041382c1a1

+ 60 - 0
jyBXBase/entity/newBidding.go

@@ -0,0 +1,60 @@
+package entity
+
+import (
+	"context"
+	"sync"
+	"time"
+)
+
+//并发限制
+
+type reqLimit struct {
+	doPool   chan struct{}
+	waitPool chan struct{}
+}
+
+var (
+	ReqLimitInit *reqLimit
+	ReqLimitLock = &sync.Mutex{}
+)
+
+func NewLimit(num int) {
+	doPool := make(chan struct{}, num)
+	for i := 0; i < num; i++ {
+		doPool <- struct{}{}
+	}
+	waitPool := make(chan struct{}, num)
+	for i := 0; i < num; i++ {
+		waitPool <- struct{}{}
+	}
+	ReqLimitInit = &reqLimit{
+		doPool:   doPool,
+		waitPool: waitPool,
+	}
+}
+
+// -2 等待池已满
+// -1 超时
+// 1:可以执行查询
+func (r *reqLimit) Limit(ctx context.Context) int {
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second) //30秒
+	defer cancel()
+	select {
+	case <-r.waitPool:
+		defer func() {
+			r.waitPool <- struct{}{}
+		}()
+		select {
+		case <-r.doPool:
+			return 1
+		case <-ctx.Done(): //超时
+			return -1
+		}
+	default:
+		return -2
+	}
+}
+
+func (r *reqLimit) Release() {
+	r.doPool <- struct{}{}
+}

+ 4 - 4
jyBXBase/rpc/etc/db.yaml

@@ -15,8 +15,8 @@ mysql:
         maxIdleConns: 5
 redis:
     addr:
-        - other=192.168.3.206:1712
-        - new=192.168.3.206:1712
+        - other=192.168.3.11:1712
+        - new=192.168.3.11:1712
 es:
     addr: http://192.168.3.241:9205,http://192.168.3.149:9200
     size: 30
@@ -35,10 +35,10 @@ mongo:
         userName: jyDevGroup
         password: DevGroup
     bidding:
-        address: 192.168.3.207:27001,192.168.3.206:27002
+        address: 192.168.3.206:27002
         size: 50
         dbName: qfw_data
         collection: bidding
         collectionChange: bidding_back
         userName: jyDevGroup
-        password: jy@DevGroup
+        password: jy@DevGroup

+ 3 - 4
jyBXBase/rpc/init/init.go

@@ -7,8 +7,8 @@ import (
 	"flag"
 	_ "github.com/go-sql-driver/mysql"
 	"github.com/zeromicro/go-zero/core/conf"
+	"jyBXBase/entity"
 	"jyBXBase/rpc/internal/config"
-	"jyBXBase/rpc/util"
 )
 
 var configFile = flag.String("cf", "etc/bxbase.yaml", "the config file")
@@ -41,8 +41,7 @@ func init() {
 		RegPowerCheckCenter(C.PowerCheckCenterKey).
 		RegEntManageApplication(C.EntManageApplication)
 	Compatible = compatible.NewCompatible(&Mgo, BaseServiceMysql, MainMysql, Middleground)
-
-	num := MC.If(C.NewConcurrencyNum > 0, C.NewConcurrencyNum, 3).(int)
 	//创建执行池 等待池
-	util.NewLimit(num)
+	num := MC.If(C.NewConcurrencyNum > 0, C.NewConcurrencyNum, 3).(int)
+	entity.NewLimit(num)
 }

+ 16 - 16
jyBXBase/rpc/model/newestBidding.go

@@ -5,10 +5,10 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"jyBXBase/entity"
 	"jyBXBase/rpc/bxbase"
 	IC "jyBXBase/rpc/init"
 	"jyBXBase/rpc/internal/config"
-	"jyBXBase/rpc/util"
 	"log"
 	"sort"
 	"strings"
@@ -145,10 +145,10 @@ func (n *NewestInfo) GetPushHistory() (res []*bxbase.NewestList) {
 		}
 		if len(es_ids) > 0 {
 			t = time.Now()
-			list := elastic.Get(search_index, search_type, fmt.Sprintf(query, strings.Join(es_ids, `","`), len(es_ids)))
+			esList := elastic.Get(search_index, search_type, fmt.Sprintf(query, strings.Join(es_ids, `","`), len(es_ids)))
 			log.Println("elastic 招标信息耗时:", time.Since(t))
-			if list != nil {
-				for _, v := range *list {
+			if esList != nil && len(*esList) > 0 {
+				for _, v := range *esList {
 					_id := MC.ObjToString(v["_id"])
 					bn := infos[_id]
 					bn.Title = MC.ObjToString(v["title"])
@@ -172,10 +172,10 @@ func (n *NewestInfo) GetPushHistory() (res []*bxbase.NewestList) {
 		}
 		if len(mgo_ids) > 0 {
 			t = time.Now()
-			list, ok := IC.MgoBidding.Find("bidding", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_ids}}, nil, mongodb_fields, false, -1, -1)
+			mgoList, ok := IC.MgoBidding.Find("bidding", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_ids}}, nil, mongodb_fields, false, -1, -1)
 			log.Println("mongodb bidding 招标信息耗时:", time.Since(t))
-			if ok && *list != nil {
-				for _, v := range *list {
+			if ok && *mgoList != nil && len(*mgoList) > 0 {
+				for _, v := range *mgoList {
 					_id := mongodb.BsonIdToSId(v["_id"])
 					bn := infos[_id]
 					bn.Title = MC.ObjToString(v["title"])
@@ -199,10 +199,10 @@ func (n *NewestInfo) GetPushHistory() (res []*bxbase.NewestList) {
 		}
 		if len(mgo_back_ids) > 0 {
 			t = time.Now()
-			list, ok := IC.MgoBidding.Find("bidding_back", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_back_ids}}, nil, mongodb_fields, false, -1, -1)
+			mgoBKList, ok := IC.MgoBidding.Find("bidding_back", map[string]interface{}{"_id": map[string]interface{}{"$in": mgo_back_ids}}, nil, mongodb_fields, false, -1, -1)
 			log.Println("mongodb bidding_back 招标信息耗时:", time.Since(t))
-			if ok && *list != nil {
-				for _, v := range *list {
+			if ok && *mgoBKList != nil && len(*mgoBKList) > 0 {
+				for _, v := range *mgoBKList {
 					_id := mongodb.BsonIdToSId(v["_id"])
 					bn := infos[_id]
 					bn.Title = MC.ObjToString(v["title"])
@@ -367,14 +367,14 @@ func DataSortInRedis(r *bxbase.NewsetBiddingResp, status int, positionId int64)
 // 延长缓存
 // 获取最新数据 --  延长缓存时间7天---待调整: 1、判断是否需要更新;2、判断订阅信息是否存在,如果存在是否是最新推送,3、订阅信息查推送缓存(只有个人版有缓存:pushcache_2_a )
 func ExtendNewListCache(n *NewSet, in *bxbase.NewestBiddingReq, list []*bxbase.NewestList) {
-	if flag := util.ReqLimitInit.Limit(context.Background()); flag != 1 {
+	if flag := entity.ReqLimitInit.Limit(context.Background()); flag != 1 {
 		if flag == -2 {
 			log.Println("等待队列已满")
 		} else if flag == -1 {
 			log.Println("等待超时")
 		}
 	} else {
-		defer util.ReqLimitInit.Release()
+		defer entity.ReqLimitInit.Release()
 		if n.RedisKeyModel.Key != "" {
 			now := time.Now()
 			if n.RedisKeyModel.CacheUpdateKey != "" {
@@ -388,13 +388,13 @@ func ExtendNewListCache(n *NewSet, in *bxbase.NewestBiddingReq, list []*bxbase.N
 					isDoing    bool
 				)
 				//防止穿透
-				util.ReqLimitLock.Lock()
+				entity.ReqLimitLock.Lock()
 				isDoing, _ = redis.Exists("new", fmt.Sprintf("p1_indexMessage_new_recovery_%d", in.PositionId))
-				util.ReqLimitLock.Unlock()
+				entity.ReqLimitLock.Unlock()
 				if !isDoing {
-					util.ReqLimitLock.Lock()
+					entity.ReqLimitLock.Lock()
 					redis.Put("new", fmt.Sprintf("p1_indexMessage_new_recovery_%d", in.PositionId), "1", 15*time.Now().Minute()) //十五分钟
-					util.ReqLimitLock.Unlock()
+					entity.ReqLimitLock.Unlock()
 					switch n.RedisStatus {
 					case StatusLoginUser:
 						//十五分钟内 更新过一次 不再更新

+ 0 - 55
jyBXBase/rpc/util/common.go

@@ -1,14 +1,12 @@
 package util
 
 import (
-	"context"
 	"fmt"
 	"jyBXBase/rpc/bxbase"
 	IC "jyBXBase/rpc/init"
 	"log"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	"app.yhyue.com/moapp/jybase/common"
@@ -594,56 +592,3 @@ func CollListSql(c *bxbase.ListReq, isPay bool, userid string) string {
 	sql += fmt.Sprintf(` order by createdate desc limit %v`, limit)
 	return sql
 }
-
-//并发限制
-
-type reqLimit struct {
-	doPool   chan struct{}
-	waitPool chan struct{}
-}
-
-var (
-	ReqLimitInit *reqLimit
-	ReqLimitLock *sync.Mutex
-)
-
-func NewLimit(num int) {
-	doPool := make(chan struct{}, num)
-	for i := 0; i < num; i++ {
-		doPool <- struct{}{}
-	}
-	waitPool := make(chan struct{}, num)
-	for i := 0; i < num; i++ {
-		waitPool <- struct{}{}
-	}
-	ReqLimitInit = &reqLimit{
-		doPool:   doPool,
-		waitPool: waitPool,
-	}
-}
-
-// -2 等待池已满
-// -1 超时
-// 1:可以执行查询
-func (r *reqLimit) Limit(ctx context.Context) int {
-	ctx, cancel := context.WithTimeout(ctx, 30*time.Second) //30秒
-	defer cancel()
-	select {
-	case <-r.waitPool:
-		defer func() {
-			r.waitPool <- struct{}{}
-		}()
-		select {
-		case <-r.doPool:
-			return 1
-		case <-ctx.Done(): //超时
-			return -1
-		}
-	default:
-		return -2
-	}
-}
-
-func (r *reqLimit) Release() {
-	r.doPool <- struct{}{}
-}