Эх сурвалжийг харах

提交负载及代理client配置

wangkaiyue 3 жил өмнө
parent
commit
4bafc378e6

+ 2 - 1
common/gatecode/errcode.go

@@ -26,7 +26,8 @@ const (
 
 
 	// GATEWAY_ERR_NIL 代理网关错误定义
 	// GATEWAY_ERR_NIL 代理网关错误定义
 	GATEWAY_ERR_NIL                ErrCode = 2000 + iota // gateway ok
 	GATEWAY_ERR_NIL                ErrCode = 2000 + iota // gateway ok
-	GATEWAY_MODULE_UNDEFINED                             // 无可用服务
+	GATEWAY_MODULE_UNDEFINED                             // 未知节点
+	GATEWAY_MODULE_SERVER_ERR                            // 节点无可用服务
 	GATEWAY_ROUTER_NOTFIND                               // 未注册路由地址
 	GATEWAY_ROUTER_NOTFIND                               // 未注册路由地址
 	GATEWAY_ROUTER_UPHOLD                                // 接口维护中
 	GATEWAY_ROUTER_UPHOLD                                // 接口维护中
 	GATEWAY_REGISTED_URL_ERR                             // 服务地址异常
 	GATEWAY_REGISTED_URL_ERR                             // 服务地址异常

+ 19 - 18
common/gatecode/errcode_string.go

@@ -25,29 +25,30 @@ func _() {
 	_ = x[GLOBAL_ERR_RESOURCE_PORWE_NOTHAS-1014]
 	_ = x[GLOBAL_ERR_RESOURCE_PORWE_NOTHAS-1014]
 	_ = x[GATEWAY_ERR_NIL-2015]
 	_ = x[GATEWAY_ERR_NIL-2015]
 	_ = x[GATEWAY_MODULE_UNDEFINED-2016]
 	_ = x[GATEWAY_MODULE_UNDEFINED-2016]
-	_ = x[GATEWAY_ROUTER_NOTFIND-2017]
-	_ = x[GATEWAY_ROUTER_UPHOLD-2018]
-	_ = x[GATEWAY_REGISTED_URL_ERR-2019]
-	_ = x[GATEWAY_PROXY_ERR-2020]
-	_ = x[GATEWAY_RPC_USERCENTER_ERR-2021]
-	_ = x[GATEWAY_RPC_RESOURCECENTER_ERR-2022]
-	_ = x[SERVER_ERR_NIL-3023]
-	_ = x[SERVER_DETAIL_TIMEOUT-3024]
-	_ = x[SERVER_DEDUCT_PARAM_LACK-3025]
-	_ = x[OTHER_ERR_NIL-4026]
-	_ = x[OTHER_ERR_UNDEFINED-4027]
+	_ = x[GATEWAY_MODULE_SERVER_ERR-2017]
+	_ = x[GATEWAY_ROUTER_NOTFIND-2018]
+	_ = x[GATEWAY_ROUTER_UPHOLD-2019]
+	_ = x[GATEWAY_REGISTED_URL_ERR-2020]
+	_ = x[GATEWAY_PROXY_ERR-2021]
+	_ = x[GATEWAY_RPC_USERCENTER_ERR-2022]
+	_ = x[GATEWAY_RPC_RESOURCECENTER_ERR-2023]
+	_ = x[SERVER_ERR_NIL-3024]
+	_ = x[SERVER_DETAIL_TIMEOUT-3025]
+	_ = x[SERVER_DEDUCT_PARAM_LACK-3026]
+	_ = x[OTHER_ERR_NIL-4027]
+	_ = x[OTHER_ERR_UNDEFINED-4028]
 }
 }
 
 
 const (
 const (
 	_ErrCode_name_0 = "global OK无用户身份无选择企业没有权限正在开发中企业账户已被冻结企业未认证企业认证待审核企业认证未通过企业认证已过期无效资源code权限校验失败权益已过期权益余额不足没有权限"
 	_ErrCode_name_0 = "global OK无用户身份无选择企业没有权限正在开发中企业账户已被冻结企业未认证企业认证待审核企业认证未通过企业认证已过期无效资源code权限校验失败权益已过期权益余额不足没有权限"
-	_ErrCode_name_1 = "gateway ok无可用服务未注册路由地址接口维护中服务地址异常服务异常用户中心服务异常资源中心服务异常"
+	_ErrCode_name_1 = "gateway ok未知节点节点无可用服务未注册路由地址接口维护中服务地址异常服务异常用户中心服务异常资源中心服务异常"
 	_ErrCode_name_2 = "server ok接口超时扣减异常"
 	_ErrCode_name_2 = "server ok接口超时扣减异常"
 	_ErrCode_name_3 = "server ok未知异常"
 	_ErrCode_name_3 = "server ok未知异常"
 )
 )
 
 
 var (
 var (
 	_ErrCode_index_0 = [...]uint8{0, 9, 24, 39, 51, 66, 90, 105, 126, 147, 168, 184, 202, 217, 235, 247}
 	_ErrCode_index_0 = [...]uint8{0, 9, 24, 39, 51, 66, 90, 105, 126, 147, 168, 184, 202, 217, 235, 247}
-	_ErrCode_index_1 = [...]uint8{0, 10, 25, 46, 61, 79, 91, 115, 139}
+	_ErrCode_index_1 = [...]uint8{0, 10, 22, 43, 64, 79, 97, 109, 133, 157}
 	_ErrCode_index_2 = [...]uint8{0, 9, 21, 33}
 	_ErrCode_index_2 = [...]uint8{0, 9, 21, 33}
 	_ErrCode_index_3 = [...]uint8{0, 9, 21}
 	_ErrCode_index_3 = [...]uint8{0, 9, 21}
 )
 )
@@ -57,14 +58,14 @@ func (i ErrCode) String() string {
 	case 1000 <= i && i <= 1014:
 	case 1000 <= i && i <= 1014:
 		i -= 1000
 		i -= 1000
 		return _ErrCode_name_0[_ErrCode_index_0[i]:_ErrCode_index_0[i+1]]
 		return _ErrCode_name_0[_ErrCode_index_0[i]:_ErrCode_index_0[i+1]]
-	case 2015 <= i && i <= 2022:
+	case 2015 <= i && i <= 2023:
 		i -= 2015
 		i -= 2015
 		return _ErrCode_name_1[_ErrCode_index_1[i]:_ErrCode_index_1[i+1]]
 		return _ErrCode_name_1[_ErrCode_index_1[i]:_ErrCode_index_1[i+1]]
-	case 3023 <= i && i <= 3025:
-		i -= 3023
+	case 3024 <= i && i <= 3026:
+		i -= 3024
 		return _ErrCode_name_2[_ErrCode_index_2[i]:_ErrCode_index_2[i+1]]
 		return _ErrCode_name_2[_ErrCode_index_2[i]:_ErrCode_index_2[i+1]]
-	case 4026 <= i && i <= 4027:
-		i -= 4026
+	case 4027 <= i && i <= 4028:
+		i -= 4027
 		return _ErrCode_name_3[_ErrCode_index_3[i]:_ErrCode_index_3[i+1]]
 		return _ErrCode_name_3[_ErrCode_index_3[i]:_ErrCode_index_3[i+1]]
 	default:
 	default:
 		return "ErrCode(" + strconv.FormatInt(int64(i), 10) + ")"
 		return "ErrCode(" + strconv.FormatInt(int64(i), 10) + ")"

+ 25 - 45
core/proxy/broker/broker.go

@@ -2,80 +2,60 @@ package broker
 
 
 import (
 import (
 	. "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
 	. "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
+	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
 	"fmt"
 	"fmt"
+	"github.com/gogf/gf/v2/os/gcfg"
+	"github.com/gogf/gf/v2/os/gctx"
 	"log"
 	"log"
-	"math/rand"
+	"net/url"
 	"sync"
 	"sync"
 )
 )
 
 
 type broker struct {
 type broker struct {
 	sync.RWMutex
 	sync.RWMutex
-	addresses map[string][]string
+	addresses map[string]loadmodule.ProxyLoadModule
 }
 }
 
 
 func InitBroker() *broker {
 func InitBroker() *broker {
 	b := new(broker)
 	b := new(broker)
-	b.addresses = make(map[string][]string)
+	b.addresses = make(map[string]loadmodule.ProxyLoadModule)
 	return b
 	return b
 }
 }
 
 
 // AddNode 发现注册新节点
 // AddNode 发现注册新节点
 func (r *broker) AddNode(code, address string) error {
 func (r *broker) AddNode(code, address string) error {
-	for _, addr := range r.getServers(code) {
-		if addr == address {
-			return fmt.Errorf("%s节点%s已经存在", code, address)
-		}
-	}
-	//进行添加操作
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
+	module, ok := r.addresses[code]
+	if !ok {
+		module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
+		r.addresses[code] = module
+	}
 	log.Printf("节点%s服务注册%s", code, address)
 	log.Printf("节点%s服务注册%s", code, address)
-	r.addresses[code] = append(r.addresses[code], address)
-	return nil
+	return module.Add(address)
 }
 }
 
 
 // DelNode 节点下线watcher删除节点
 // DelNode 节点下线watcher删除节点
 func (r *broker) DelNode(code, address string) error {
 func (r *broker) DelNode(code, address string) error {
-	var newAddress []string
-	var curlAddr = r.getServers(code)
-	for _, addr := range curlAddr {
-		if addr == address {
-			continue
-		}
-		newAddress = append(newAddress, addr)
-	}
-	if len(curlAddr) == len(newAddress) {
-		return fmt.Errorf("%s节点未发现待删除节点%s", code, address)
-	}
 	//进行删除操作
 	//进行删除操作
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
-	r.addresses[code] = newAddress
-	log.Printf("节点%s关闭服务%s", code, address)
-	return nil
-}
-
-// getServers 根据code获取所有可用节点
-func (r *broker) getServers(code string) []string {
-	r.RLock()
-	defer r.RUnlock()
-	addresses := make([]string, 0, len(r.addresses[code]))
-	for _, address := range r.addresses[code] {
-		addresses = append(addresses, address)
+	module, ok := r.addresses[code]
+	if !ok {
+		log.Printf("节点%s服务关闭异常%s", code, address)
+		return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
 	}
 	}
-	return addresses
+	log.Printf("节点%s服务关闭%s", code, address)
+	return module.Del(address)
 }
 }
 
 
 // GetServerAddr 获取代理节点
 // GetServerAddr 获取代理节点
-func (r *broker) GetServerAddr(code string) (string, error) {
-	serverAddress := r.getServers(code)
-	lenServer := len(serverAddress)
-	switch lenServer {
-	case 0:
-		return "", NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, "未找到可用服务")
-	case 1:
-		return serverAddress[0], nil
-	default:
-		return serverAddress[rand.Intn(lenServer)], nil
+func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
+	r.RLock()
+	defer r.RUnlock()
+	module, ok := r.addresses[code]
+	if !ok {
+		return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务异常 module %s", code))
 	}
 	}
+	return module.Get(ip)
 }
 }

+ 13 - 16
core/proxy/loadmodule/interface.go

@@ -1,35 +1,32 @@
 package loadmodule
 package loadmodule
 
 
-import "net/url"
+import (
+	"net/url"
+)
 
 
 type ProxyLoadType int
 type ProxyLoadType int
 
 
 const (
 const (
-	RandomProxyModule           ProxyLoadType = iota //随机
-	RoundRobinProxyModule                            //轮询
-	WeightRoundRobinProxyModule                      //权重
-	//ConsistentHashProxy                            //哈希
+	RandomProxyModule     ProxyLoadType = iota //随机
+	RoundRobinProxyModule                      //轮询
+	HashProxyModule                            //哈希
 )
 )
 
 
 type ProxyLoadModule interface {
 type ProxyLoadModule interface {
-	Add(string) error
-	Get() *url.URL
-	Len() int
-
-	Update() //动态添加
+	Get(string) (*url.URL, error) //获取节点
+	Add(string) error             //添加节点
+	Del(string) error             //动态添加
+	//ALlNodes() []url.URL          //获取节点内容
 }
 }
 
 
 func LoadProxyLoadFactory(value int) ProxyLoadModule {
 func LoadProxyLoadFactory(value int) ProxyLoadModule {
-	loadType := ProxyLoadType(value)
-	switch loadType {
+	switch ProxyLoadType(value) {
 	case RandomProxyModule: //随机
 	case RandomProxyModule: //随机
 		return &RandomProxy{}
 		return &RandomProxy{}
 	case RoundRobinProxyModule: //轮询
 	case RoundRobinProxyModule: //轮询
 		return &RoundRobinProxy{}
 		return &RoundRobinProxy{}
-	case WeightRoundRobinProxyModule: //权重
-		return &WeightRoundRobinProxy{}
-	//case ConsistentHashProxy: //哈希
-	//	return NewConsistentHashBanlance(10, nil)
+	case HashProxyModule: //哈希
+		return &HashProxy{}
 	default:
 	default:
 		return &RandomProxy{}
 		return &RandomProxy{}
 	}
 	}

+ 62 - 0
core/proxy/loadmodule/module_hash.go

@@ -0,0 +1,62 @@
+package loadmodule
+
+import (
+	. "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
+	"fmt"
+	"hash/adler32"
+	"net/url"
+	"strings"
+	"time"
+)
+
+type HashProxy struct {
+	curIndex int
+	rss      []*url.URL
+}
+
+func (h *HashProxy) Add(path string) error {
+	remoteUrl, err := url.Parse(strings.TrimSpace(path))
+	if err != nil {
+		return NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务地址%s ERR:%v", path, err))
+	}
+	h.rss = append(h.rss, remoteUrl)
+	return nil
+}
+
+func (h *HashProxy) next(ip string) (url *url.URL, err error) {
+	defer func() {
+		//数组线程不安全处理
+		if rec := recover(); rec != nil {
+			err = NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务列表%v 获取服务异常%v ", h.rss, rec))
+		}
+	}()
+	if len(h.rss) == 1 {
+		url = h.rss[0]
+		return
+	}
+	url = h.rss[int64(adler32.Checksum([]byte(ip)))]
+	return
+}
+
+func (h *HashProxy) Get(ip string) (url *url.URL, err error) {
+	for i := 0; i < 3; i++ {
+		url, err = h.next(ip)
+		if err == nil {
+			return
+		}
+		time.Sleep(time.Millisecond * 500 * time.Duration(i))
+	}
+	return
+}
+
+func (h *HashProxy) Del(path string) error {
+	var newRss []*url.URL
+	for _, rss := range h.rss {
+		if rss.String() == path {
+			continue
+		}
+		newRss = append(newRss, rss)
+	}
+	h.rss = newRss
+	return nil
+}

+ 62 - 0
core/proxy/loadmodule/module_random.go

@@ -0,0 +1,62 @@
+package loadmodule
+
+import (
+	. "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"strings"
+	"time"
+)
+
+//RandomProxy 负载-随机
+type RandomProxy struct {
+	rss []*url.URL
+}
+
+func (r *RandomProxy) Get(ip string) (url *url.URL, err error) {
+	for i := 0; i < 3; i++ {
+		url, err = r.next()
+		if err == nil {
+			return
+		}
+		time.Sleep(time.Millisecond * 500 * time.Duration(i))
+	}
+	return
+}
+
+func (r *RandomProxy) next() (url *url.URL, err error) {
+	defer func() {
+		//数组线程不安全处理
+		if rec := recover(); rec != nil {
+			err = NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务列表%v 获取服务异常%v ", r.rss, rec))
+		}
+	}()
+
+	if len(r.rss) == 1 {
+		url = r.rss[0]
+		return
+	}
+	return r.rss[rand.Intn(len(r.rss))], nil
+}
+
+func (r *RandomProxy) Add(path string) error {
+	remoteUrl, err := url.Parse(strings.TrimSpace(path))
+	if err != nil {
+		return NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务地址%s ERR:%v", path, err))
+	}
+	r.rss = append(r.rss, remoteUrl)
+	return nil
+}
+
+func (r *RandomProxy) Del(path string) error {
+	var newRss []*url.URL
+	for _, rss := range r.rss {
+		if rss.String() == path {
+			continue
+		}
+		newRss = append(newRss, rss)
+	}
+	r.rss = newRss
+	return nil
+}

+ 63 - 0
core/proxy/loadmodule/module_round.go

@@ -0,0 +1,63 @@
+package loadmodule
+
+import (
+	. "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
+	"fmt"
+	"net/url"
+	"strings"
+	"time"
+)
+
+type RoundRobinProxy struct {
+	curIndex int
+	rss      []*url.URL
+}
+
+func (r *RoundRobinProxy) Add(path string) error {
+	remoteUrl, err := url.Parse(strings.TrimSpace(path))
+	if err != nil {
+		return NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务地址%s ERR:%v", path, err))
+	}
+	r.rss = append(r.rss, remoteUrl)
+	return nil
+}
+
+func (r *RoundRobinProxy) next() (url *url.URL, err error) {
+	defer func() {
+		//数组线程不安全处理
+		if rec := recover(); rec != nil {
+			err = NewErrorWithCode(GATEWAY_MODULE_SERVER_ERR, fmt.Sprintf("服务列表%v 获取服务异常%v ", r.rss, rec))
+		}
+	}()
+
+	lens := len(r.rss)
+	if r.curIndex >= lens {
+		r.curIndex = 0
+	}
+	url = r.rss[r.curIndex]
+	r.curIndex = (r.curIndex + 1) % lens
+	return
+}
+
+func (r *RoundRobinProxy) Get(ip string) (url *url.URL, err error) {
+	for i := 0; i < 3; i++ {
+		url, err = r.next()
+		if err == nil {
+			return
+		}
+		time.Sleep(time.Millisecond * 500 * time.Duration(i))
+	}
+	return
+}
+
+func (r *RoundRobinProxy) Del(path string) error {
+	var newRss []*url.URL
+	for _, rss := range r.rss {
+		if rss.String() == path {
+			continue
+		}
+		newRss = append(newRss, rss)
+	}
+	r.rss = newRss
+	return nil
+}

+ 0 - 51
core/proxy/loadmodule/random.go

@@ -1,51 +0,0 @@
-package loadmodule
-
-import (
-	"errors"
-	"math/rand"
-	"net/url"
-	"strings"
-)
-
-//负载-随机
-
-type RandomProxy struct {
-	curIndex int
-	rss      []*url.URL
-}
-
-func (r *RandomProxy) Add(param string) error {
-	urlParam := param
-	if params := strings.Split(param, "|"); len(params) == 2 {
-		urlParam = params[0]
-	}
-	if len(param) == 0 {
-		return errors.New("param len 1 at least")
-	}
-	remoteUrl, err := url.Parse(strings.TrimSpace(urlParam))
-	if err != nil {
-		return err
-	}
-	r.rss = append(r.rss, remoteUrl)
-	return nil
-}
-
-func (r *RandomProxy) Next() *url.URL {
-	if len(r.rss) == 1 {
-		return r.rss[0]
-	}
-	r.curIndex = rand.Intn(len(r.rss))
-	return r.rss[r.curIndex]
-}
-
-func (r *RandomProxy) Get() *url.URL {
-	return r.Next()
-}
-
-func (r *RandomProxy) Len() int {
-	return len(r.rss)
-}
-
-func (r *RandomProxy) Update() {
-
-}

+ 0 - 55
core/proxy/loadmodule/roundRobin.go

@@ -1,55 +0,0 @@
-package loadmodule
-
-import (
-	"errors"
-	"net/url"
-	"strings"
-)
-
-type RoundRobinProxy struct {
-	curIndex int
-	rss      []*url.URL
-}
-
-func (r *RoundRobinProxy) Add(param string) error {
-	urlParam := param
-	if params := strings.Split(param, "|"); len(params) == 2 {
-		urlParam = params[0]
-	}
-	if len(urlParam) == 0 {
-		return errors.New("param len 1 at least")
-	}
-
-	remoteUrl, err := url.Parse(strings.TrimSpace(urlParam))
-	if err != nil {
-		return err
-	}
-	r.rss = append(r.rss, remoteUrl)
-	return nil
-}
-
-func (r *RoundRobinProxy) Next() *url.URL {
-	if len(r.rss) == 1 {
-		return r.rss[0]
-	}
-
-	lens := len(r.rss)
-	if r.curIndex >= lens {
-		r.curIndex = 0
-	}
-	curAddr := r.rss[r.curIndex]
-	r.curIndex = (r.curIndex + 1) % lens
-	return curAddr
-}
-
-func (r *RoundRobinProxy) Get() *url.URL {
-	return r.Next()
-}
-
-func (r *RoundRobinProxy) Len() int {
-	return len(r.rss)
-}
-
-func (r *RoundRobinProxy) Update() {
-
-}

+ 0 - 87
core/proxy/loadmodule/weightRoundRobin.go

@@ -1,87 +0,0 @@
-package loadmodule
-
-import (
-	"bp.jydev.jianyu360.cn/BaseService/gateway/core/logs"
-	"errors"
-	"github.com/gogf/gf/v2/os/gctx"
-	"net/url"
-	"strconv"
-	"strings"
-)
-
-type WeightRoundRobinProxy struct {
-	curIndex int
-	rss      []*WeightNode
-	rsw      []int
-}
-
-type WeightNode struct {
-	addr            *url.URL
-	weight          int //权重值
-	currentWeight   int //节点当前权重
-	effectiveWeight int //有效权重
-}
-
-func (r *WeightRoundRobinProxy) Add(param string) error {
-	params := strings.Split(param, "|")
-	if len(params) != 2 {
-		return errors.New("param len need 2")
-	}
-	parInt, err := strconv.ParseInt(params[1], 10, 64)
-	if err != nil {
-		return err
-	}
-	remoteUrl, err := url.Parse(strings.TrimSpace(params[0]))
-	if err != nil {
-		return err
-	}
-
-	node := &WeightNode{addr: remoteUrl, weight: int(parInt)}
-	node.effectiveWeight = node.weight
-	r.rss = append(r.rss, node)
-	return nil
-}
-
-func (r *WeightRoundRobinProxy) Next() *url.URL {
-	if len(r.rss) == 1 {
-		return r.rss[0].addr
-	}
-	total := 0
-	var best *WeightNode
-	for i := 0; i < len(r.rss); i++ {
-		w := r.rss[i]
-		//step 1 统计所有有效权重之和
-		total += w.effectiveWeight
-
-		//step 2 变更节点临时权重为的节点临时权重+节点有效权重
-		w.currentWeight += w.effectiveWeight
-
-		//step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小
-		if w.effectiveWeight < w.weight {
-			w.effectiveWeight++
-		}
-		//step 4 选择最大临时权重点节点
-		if best == nil || w.currentWeight > best.currentWeight {
-			best = w
-		}
-	}
-	if best == nil {
-		logs.GInfo.Info(gctx.New(), "WeightRoundRobinProxy 获取权重异常 ")
-		return r.rss[0].addr
-	}
-	//step 5 变更临时权重为 临时权重-有效权重之和
-	best.currentWeight -= total
-	return best.addr
-}
-
-func (r *WeightRoundRobinProxy) Get() *url.URL {
-	return r.Next()
-}
-
-func (r *WeightRoundRobinProxy) Len() int {
-	return len(r.rss)
-}
-
-func (r *WeightRoundRobinProxy) Update() {
-
-}

+ 0 - 69
core/proxy/proxyClient.go

@@ -1,69 +0,0 @@
-package proxy
-
-import (
-	"net"
-	"net/http"
-	"net/http/httputil"
-	"net/url"
-	"strings"
-	"time"
-)
-
-func CreateCustomProxyClient(target *url.URL, errFunc func(http.ResponseWriter, *http.Request, error)) *httputil.ReverseProxy {
-	director := func(req *http.Request) {
-		targetQuery := target.RawQuery
-		req.URL.Scheme = target.Scheme
-		req.URL.Host = target.Host
-		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
-		if targetQuery == "" || req.URL.RawQuery == "" {
-			req.URL.RawQuery = targetQuery + req.URL.RawQuery
-		} else {
-			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
-		}
-		if _, ok := req.Header["User-Agent"]; !ok {
-			// explicitly disable User-Agent so it's not set to default value
-			req.Header.Set("User-Agent", "")
-		}
-	}
-
-	//超时处理
-	transport := &http.Transport{
-		Proxy: http.ProxyFromEnvironment,
-		DialContext: (&net.Dialer{
-			Timeout:   15 * time.Second, //连接超时
-			KeepAlive: 15 * time.Second, //长连接超时时间
-			DualStack: true,
-		}).DialContext,
-		MaxIdleConns:          30,               //最大空闲连接
-		IdleConnTimeout:       90 * time.Second, //空闲超时时间
-		TLSHandshakeTimeout:   10 * time.Second, //tls握手超时时间
-		ExpectContinueTimeout: 1 * time.Second,  //100-continue 超时时间
-		MaxIdleConnsPerHost:   300,
-	}
-
-	//异常处理
-	//errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
-	//	http.Error(w, "proxy error:", 500)
-	//	logs.GInfo.Error(r.Context(), err.Error())
-	//}
-
-	reverseProxy := &httputil.ReverseProxy{
-		Director:  director,
-		Transport: transport,
-		//ModifyResponse: change,
-		ErrorHandler: errFunc}
-
-	return reverseProxy
-}
-
-func singleJoiningSlash(a, b string) string {
-	aslash := strings.HasSuffix(a, "/")
-	bslash := strings.HasPrefix(b, "/")
-	switch {
-	case aslash && bslash:
-		return a + b[1:]
-	case !aslash && !bslash:
-		return a + "/" + b
-	}
-	return a + b
-}

+ 64 - 0
core/proxy/proxyClient/proxyClient.go

@@ -0,0 +1,64 @@
+package proxyClient
+
+import (
+	"github.com/gogf/gf/v2/os/gcfg"
+	"github.com/gogf/gf/v2/os/gctx"
+	"net"
+	"net/http"
+	"net/http/httputil"
+	"net/url"
+	"strings"
+	"time"
+)
+
+var transport = &http.Transport{}
+
+func CreateCustomProxyClient(target *url.URL, errFunc func(http.ResponseWriter, *http.Request, error)) *httputil.ReverseProxy {
+	return &httputil.ReverseProxy{
+		Director: func(req *http.Request) {
+			targetQuery := target.RawQuery
+			req.URL.Scheme = target.Scheme
+			req.URL.Host = target.Host
+			req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
+			if targetQuery == "" || req.URL.RawQuery == "" {
+				req.URL.RawQuery = targetQuery + req.URL.RawQuery
+			} else {
+				req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
+			}
+			if _, ok := req.Header["User-Agent"]; !ok {
+				// explicitly disable User-Agent so it's not set to default value
+				req.Header.Set("User-Agent", "")
+			}
+		},
+		Transport: transport,
+		//ModifyResponse: change,
+		ErrorHandler: errFunc}
+}
+
+func ReLoadClient() {
+	transport = &http.Transport{
+		Proxy: http.ProxyFromEnvironment,
+		DialContext: (&net.Dialer{
+			Timeout:   time.Duration(gcfg.Instance().MustGet(gctx.New(), "proxy.timeout", 30).Int()) * time.Second,   //连接超时
+			KeepAlive: time.Duration(gcfg.Instance().MustGet(gctx.New(), "proxy.keepAlive", 60).Int()) * time.Second, //长连接超时时间
+			DualStack: true,
+		}).DialContext,
+		MaxIdleConns:          gcfg.Instance().MustGet(gctx.New(), "proxy.maxIdleConns", 120).Int(),                                     //最大空闲连接 0没有限制
+		IdleConnTimeout:       time.Duration(gcfg.Instance().MustGet(gctx.New(), "proxy.idleConnTimeout", 90).Int()) * time.Second,      //空闲超时时间
+		TLSHandshakeTimeout:   time.Duration(gcfg.Instance().MustGet(gctx.New(), "proxy.tLSHandshakeTimeout", 1).Int()) * time.Second,   //tls握手超时时间
+		ExpectContinueTimeout: time.Duration(gcfg.Instance().MustGet(gctx.New(), "proxy.expectContinueTimeout", 1).Int()) * time.Second, //100-continue 超时时间
+		MaxIdleConnsPerHost:   gcfg.Instance().MustGet(gctx.New(), "proxy.maxIdleConnsPerHost", 5).Int(),                                //客户端可以持有的最大空闲连接
+	}
+}
+
+func singleJoiningSlash(a, b string) string {
+	aslash := strings.HasSuffix(a, "/")
+	bslash := strings.HasPrefix(b, "/")
+	switch {
+	case aslash && bslash:
+		return a + b[1:]
+	case !aslash && !bslash:
+		return a + "/" + b
+	}
+	return a + b
+}

+ 7 - 12
core/proxy/proxyServer.go

@@ -6,6 +6,7 @@ import (
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/node"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/node"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/broker"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/broker"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/middleware"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/middleware"
+	"bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/proxyClient"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/router"
 	"bp.jydev.jianyu360.cn/BaseService/gateway/core/router"
 	"fmt"
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/frame/g"
@@ -13,7 +14,6 @@ import (
 	"github.com/gogf/gf/v2/os/gcfg"
 	"github.com/gogf/gf/v2/os/gcfg"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/gogf/gf/v2/os/gctx"
 	"net/http"
 	"net/http"
-	"net/url"
 )
 )
 
 
 var bManager = broker.InitBroker()
 var bManager = broker.InitBroker()
@@ -46,6 +46,8 @@ func InitGateWayServer() *ghttp.Server {
 	})
 	})
 	gateWayServer.Use(middleware.FilterHandler) //权限过滤
 	gateWayServer.Use(middleware.FilterHandler) //权限过滤
 
 
+	//加载代理客户端
+	proxyClient.ReLoadClient()
 	//注册代理
 	//注册代理
 	gateWayServer.BindHandler("POST:/*", proxyHandler)
 	gateWayServer.BindHandler("POST:/*", proxyHandler)
 	return gateWayServer
 	return gateWayServer
@@ -57,25 +59,18 @@ var proxyHandler = func(r *ghttp.Request) {
 		return
 		return
 	}
 	}
 	gCtx := router.GetGContext(r.GetCtx())
 	gCtx := router.GetGContext(r.GetCtx())
-
 	//获取服务地址
 	//获取服务地址
-	proxyAddr, err := bManager.GetServerAddr(gCtx.RouterRule.MiddleCode)
+	proxyAddr, err := bManager.GetServerAddr(gCtx.RouterRule.MiddleCode, r.GetClientIp())
 	if err != nil {
 	if err != nil {
 		r.SetError(err)
 		r.SetError(err)
 		return
 		return
 	}
 	}
 	//代理地址存入ctx中
 	//代理地址存入ctx中
-	gCtx.ServerAddr = proxyAddr
+	gCtx.ServerAddr = proxyAddr.String()
 	router.UpdateGContext(r, gCtx)
 	router.UpdateGContext(r, gCtx)
-	//调用请求
-	proxyUrl, err := url.Parse(proxyAddr)
-	if err != nil {
-		r.SetError(NewErrorWithCode(GATEWAY_REGISTED_URL_ERR, err.Error()))
-		// WillDo:后续异常节点冻结
-		return
-	}
 	errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) {
 	errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) {
 		r.SetError(NewErrorWithCode(GATEWAY_PROXY_ERR, fmt.Sprintf("代理异常:%s err:%v \n", gCtx.ServerAddr, err.Error())))
 		r.SetError(NewErrorWithCode(GATEWAY_PROXY_ERR, fmt.Sprintf("代理异常:%s err:%v \n", gCtx.ServerAddr, err.Error())))
 	}
 	}
-	CreateCustomProxyClient(proxyUrl, errHandel).ServeHTTP(r.Response.ResponseWriter, r.Request)
+
+	proxyClient.CreateCustomProxyClient(proxyAddr, errHandel).ServeHTTP(r.Response.ResponseWriter, r.Request)
 }
 }

+ 13 - 7
etc/config.yaml

@@ -18,7 +18,6 @@ server:
 etcd:
 etcd:
   # 基础服务
   # 基础服务
   baseserver:
   baseserver:
-    appid: 10000
     # 用户中台配置
     # 用户中台配置
     userCenter:
     userCenter:
       key: usercenter.rpc
       key: usercenter.rpc
@@ -50,7 +49,7 @@ system:
   # 监听服务注册etcd配置
   # 监听服务注册etcd配置
   etcdListen:
   etcdListen:
     - 192.168.3.240:2379
     - 192.168.3.240:2379
-
+  loadProxyModule: 1 #0随机 1轮询 2哈希。默认随机
   response:
   response:
     head-clear: # 响应头删除
     head-clear: # 响应头删除
       - Trace-Id
       - Trace-Id
@@ -60,10 +59,10 @@ system:
   # 系统日志
   # 系统日志
   log:
   log:
     path: ./logs                                # 系统日志默认文件默认报错路径。默认为./logs
     path: ./logs                                # 系统日志默认文件默认报错路径。默认为./logs
-    debug: true                                 # 是否打印调试信息。默认false
-    stdout: true                                # 是否输出到控制台。默认false
+    debug: false                                # 是否打印调试信息。默认false
+    stdout: false                               # 是否输出到控制台。默认false
     systemLogPattern: system-{Ymd}.log          # 日志文件格式。默认为"system-{Ymd}.log"
     systemLogPattern: system-{Ymd}.log          # 日志文件格式。默认为"system-{Ymd}.log"
-    serverErrorStack: true                      # 当Server捕获到异常时是否记录堆栈信息到日志中。默认为true
+    serverErrorStack: false                     # 当Server捕获到异常时是否记录堆栈信息到日志中。默认为true
     serverErrorLogEnabled: true                 # 是否记录异常日志信息到日志中。默认为true
     serverErrorLogEnabled: true                 # 是否记录异常日志信息到日志中。默认为true
     serverErrorLogPattern: error-{Ymd}.log      # 异常错误日志文件格式。默认为"error-{Ymd}.log"
     serverErrorLogPattern: error-{Ymd}.log      # 异常错误日志文件格式。默认为"error-{Ymd}.log"
     serverAccessLogEnabled: true                # 是否记录访问日志。默认为false
     serverAccessLogEnabled: true                # 是否记录访问日志。默认为false
@@ -72,7 +71,7 @@ system:
 
 
   # 系统告警
   # 系统告警
   alarm:
   alarm:
-    isOpen: true                        # 异常通知开关。默认关闭
+    isOpen: false                       # 异常通知开关。默认关闭
     address: 192.168.3.207:4150         # nsq消息通知地址。默认关闭
     address: 192.168.3.207:4150         # nsq消息通知地址。默认关闭
     toppic: jyalert                     # 消息管道
     toppic: jyalert                     # 消息管道
     isJsonEncode: false                 # 是否加密
     isJsonEncode: false                 # 是否加密
@@ -80,7 +79,14 @@ system:
     title: 你有新的告警消息处理            # 消息标题
     title: 你有新的告警消息处理            # 消息标题
     text: 前置代理告警请查看               # 消息正文
     text: 前置代理告警请查看               # 消息正文
 
 
-
+proxy:
+  timeout: 30                 # 连接超时。默认30秒
+  keepAlive: 60               # 长链接超时。默认60秒
+  maxIdleConns: 120           # 最大空闲连接 0没有限制。默认120
+  idleConnTimeout: 90         # 空闲超时时间。默认90秒
+  tLSHandshakeTimeout: 1      # tls握手超时时间。默认1秒
+  expectContinueTimeout: 1    # 100-continue 超时时间。默认1秒
+  maxIdleConnsPerHost: 5      # 客户端可以持有的最大空闲连接。默认5