소스 검색

fix:修改并发限制

duxin 2 년 전
부모
커밋
87552f6478
1개의 변경된 파일56개의 추가작업 그리고 54개의 파일을 삭제
  1. 56 54
      src/jfw/front/tags.go

+ 56 - 54
src/jfw/front/tags.go

@@ -1,6 +1,7 @@
 package front
 
 import (
+	"context"
 	"fmt"
 	"jy/src/jfw/config"
 	"jy/src/jfw/jyutil"
@@ -11,7 +12,6 @@ import (
 	"net/http"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	qu "app.yhyue.com/moapp/jybase/common"
@@ -36,17 +36,33 @@ type Tags struct {
 
 var LetterArr = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"}
 var bidField = `"_id","site","title","publishtime","toptype","subtype","type","area","href","bidopentime","winner","buyer","bidamount","budget","s_subscopeclass","projectname","detail"`
-var (
-	BeExecuting      int
-	NeedWait         chan int
-	NotificationWait = make(chan int, 1)
-	syw, rwm         = &sync.RWMutex{}, &sync.RWMutex{}
-)
+
 var seoBidField = `"_id","title","publishtime","subtype","area","href","bidamount","budget","s_subscopeclass","projectname"`
 
+type reqLimit struct {
+	doPool   chan struct{}
+	waitPool chan struct{}
+}
+
+var reqLimitInit *reqLimit
+
 func init() {
 	xweb.AddAction(&Tags{})
-	NeedWait = make(chan int, qu.IntAll(config.Sysconfig["awaitNum"]))
+	//创建执行池 等待池
+	doPool := make(chan struct{}, qu.IntAll(config.Sysconfig["awaitNum"]))
+	do := qu.IntAll(config.Sysconfig["awaitNum"])
+	wait := qu.IntAll(config.Sysconfig["awaitNum"])
+	for i := 0; i < do; i++ {
+		doPool <- struct{}{}
+	}
+	waitPool := make(chan struct{}, wait)
+	for i := 0; i < wait; i++ {
+		waitPool <- struct{}{}
+	}
+	reqLimitInit = &reqLimit{
+		doPool:   doPool,
+		waitPool: waitPool,
+	}
 }
 
 /*
@@ -608,31 +624,30 @@ func GetBiddingPlatformType() (nameToCode map[string]int64, codeToName map[int64
 	return
 }
 
-func LimitedConcurrency() int {
-	var ExecutionState int //执行状态 0 直接执行 1 等待执行 -1 已达上线
-	executionNum := qu.IntAll(config.Sysconfig["executionNum"])
-	awaitNum := qu.IntAll(config.Sysconfig["awaitNum"])
-	var wg sync.WaitGroup
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		if BeExecuting < executionNum { //并发数不足 直接执行
-			ExecutionState = 0
-			BeExecuting++
-			return
-		}
-		syw.Lock()
-		defer syw.Unlock()
-		if len(NeedWait) < awaitNum { //等待人数不足 加入等待队列
-			ExecutionState = 1
-			NeedWait <- 1
-			return
-		}
-		ExecutionState = -1 //并发数与等待数以达到上线 直接结束
-		return
-	}()
-	wg.Wait()
-	return ExecutionState
+// -2 等待池已满
+// -1 超时
+// 1:可以执行查询
+func (r *reqLimit) Limit(ctx context.Context) int {
+	ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
+	defer cancel()
+	select {
+	case <-r.waitPool:
+		defer func() {
+			r.waitPool <- struct{}{}
+		}()
+		select {
+		case <-r.doPool:
+			return 1
+		case <-ctx.Done(): //超时
+			return -1
+		}
+	default:
+		return -2
+	}
+}
+
+func (r *reqLimit) Release() {
+	r.doPool <- struct{}{}
 }
 
 func (this *Tags) GetBidding(industry, area, city, stype, keyword string, request *http.Request, responseWriter http.ResponseWriter, session *httpsession.Session) ([]map[string]interface{}, int64, bool) {
@@ -644,30 +659,17 @@ func (this *Tags) GetBidding(industry, area, city, stype, keyword string, reques
 		return qu.ObjArrToMapArr(l), int64(count), false
 	} else {
 		if area != "" || stype != "" || industry != "" || city != "" || keyword != "" {
-			// 限制并发数
-			executionState := LimitedConcurrency()
-		env:
-			switch executionState {
-			case 0:
-				defer func() {
-					BeExecuting-- //执行完成减去在执行数
-					rwm.Lock()
-					defer rwm.Unlock()
-					if len(NeedWait) > 0 { //查看是否有等待执行 有通知可以执行
-						NotificationWait <- 1
-					}
-				}()
-			case 1:
-				select { //监听等待
-				case <-NotificationWait:
-					<-NeedWait
-					executionState = 0
-					BeExecuting++
-					goto env //可以开始执行
+			if flag := reqLimitInit.Limit(context.Background()); flag == 1 {
+				defer reqLimitInit.Release()
+			} else {
+				if flag == -2 {
+					log.Println("等待队列已满")
+				} else if flag == -1 {
+					log.Println("等待超时")
 				}
-			default:
 				return nil, 0, true
 			}
+
 			if public.Lst.IsLimited(request, responseWriter, session, false) == 1 { //没有被限制
 				defer public.Lst.Limit()
 			} else {