renzheng vor 2 Jahren
Ursprung
Commit
65b79c9d4f
1 geänderte Dateien mit 111 neuen und 70 gelöschten Zeilen
  1. 111 70
      redis/goredis.go

+ 111 - 70
redis/goredis.go

@@ -19,11 +19,12 @@ import (
 type GoRedis struct {
 	//Password string
 	Code   string
-	DB     int
-	HashDb bool  //是否是多个数据库
+	DB     int   //默认库
 	DBS    []int //数据库列表
+	Nodes  []int //节点列表
+	HashDb int   //是否是多个数据库 0单库 1单节点多库 2多节点单库 3多节点多库
 	Ctx    context.Context
-	CMap   map[int]*redis.Client
+	CMap   map[int]map[int]*redis.Client
 	BakDb  bool     //是否有备用节点
 	Bak    *GoRedis //备用节点连接
 }
@@ -48,29 +49,30 @@ other=127.0.0.1:2203#127.0.0.1:2204#127.0.0.1:2205#127.0.0.1:2206=0=10=300 带co
 //解析配置并初始化,opt为string或map
 func (r *GoRedis) Init(opt interface{}) {
 	check := false
+	//代码  地址 []库范围 []池配置 空闲时间
 	code, addr, dbs, pool, idle := "", "", []int{0}, []int{2, 30}, 300
 	if so, ok := opt.(string); ok {
 		arr := strings.Split(so, "=")
 		regAddr := regexp.MustCompile("[0-9.a-zA-Z/]+:[0-9]+.*")
-		if len(arr) == 1 {
+		if len(arr) == 1 { //只是一个串
 			if regAddr.MatchString(arr[0]) {
 				check = true
 				addr = arr[0]
 			}
 		} else if len(arr) > 1 {
 			index := 0
-			if regAddr.MatchString(arr[0]) {
+			if regAddr.MatchString(arr[0]) { //第1个是地址
 				index = 1
 				addr = arr[0]
 				check = true
-			} else if regAddr.MatchString(arr[1]) {
+			} else if regAddr.MatchString(arr[1]) { //第二个是地址
 				check = true
 				addr = arr[1]
 				code = arr[0]
 			}
 			//解析库配置
 			if len(arr) > 2-index { //dbs配置
-				dbs1 := strings.Split(arr[2-index], "-")
+				dbs1 := strings.Split(arr[2-index], "-") //库范围的配置
 				if len(dbs1) == 1 || len(dbs1) == 2 {
 					check = true
 					dbs[0], _ = strconv.Atoi(dbs1[0])
@@ -110,14 +112,15 @@ func (r *GoRedis) Init(opt interface{}) {
 
 	}
 	if check {
-		addrs := strings.Split(addr, "|")
-		if len(addrs) == 2 { //备用节点的模式 2选1
+		log.Println("代码  地址 []库范围 []池配置 空闲时间", code, addr, dbs, pool, idle)
+		addrs := strings.Split(addr, "|") //备用节点模式
+		if len(addrs) == 2 {              //备用节点的模式 2选1
 			r.init(addrs[0], code, dbs, pool[1], pool[0], idle)
 			r.BakDb = true
 			//有备用节点,集群先不考虑,支持指定的库
 			addr1 := strings.Split(addrs[1], "/")
 			r.Bak = &GoRedis{}
-			if len(addr1) == 2 { //没有指定库
+			if len(addr1) == 2 { //没有指定库 根据/区分
 				i, _ := strconv.Atoi(addr1[1])
 				dbs = []int{i}
 			}
@@ -126,21 +129,26 @@ func (r *GoRedis) Init(opt interface{}) {
 			addr1 := strings.Split(addrs[0], "#") //是多节点模式,所有库默认为0
 			if len(addr1) == 1 {
 				r.init(addr1[0], code, dbs, pool[1], pool[0], idle)
-			} else {
+			} else { //多节点模式
 				r.Code = code
-				r.DB = 0
+				r.DB = dbs[0]
+				r.DBS = dbs
 				r.Ctx = context.Background()
-				r.CMap = map[int]*redis.Client{}
-				r.HashDb = true
-				r.DBS = []int{}
+				r.CMap = map[int]map[int]*redis.Client{}
+				r.HashDb = 1 + len(dbs)
+				r.Nodes = []int{}
 				for i := 0; i < len(addr1); i++ {
-					r.DBS = append(r.DBS, i)
-					r.CMap[i] = redis.NewClient(&redis.Options{
-						Addr:         addr1[i],
-						PoolSize:     pool[1],
-						MinIdleConns: pool[0],
-						IdleTimeout:  time.Duration(idle) * time.Second,
-					})
+					r.Nodes = append(r.Nodes, i)
+					r.CMap[i] = map[int]*redis.Client{}
+					for k, v := range r.DBS { //按节点的每个库初始化
+						r.CMap[i][k] = redis.NewClient(&redis.Options{
+							Addr:         addr1[i],
+							DB:           v,
+							PoolSize:     pool[1],
+							MinIdleConns: pool[0],
+							IdleTimeout:  time.Duration(idle) * time.Second,
+						})
+					}
 				}
 			}
 		}
@@ -153,9 +161,9 @@ func (r *GoRedis) init(addr, code string, dbs []int, poolSize, minIdleConns, idl
 	r.Code = code
 	r.DB = dbs[0]
 	r.Ctx = context.Background()
-	r.CMap = map[int]*redis.Client{}
+	r.CMap = map[int]map[int]*redis.Client{}
 	if len(dbs) > 1 {
-		r.HashDb = true
+		r.HashDb = len(dbs) - 1
 		r.DBS = []int{}
 		for i := dbs[0]; i <= dbs[1]; i++ {
 			r.DBS = append(r.DBS, i)
@@ -163,8 +171,8 @@ func (r *GoRedis) init(addr, code string, dbs []int, poolSize, minIdleConns, idl
 	} else {
 		r.DBS = dbs
 	}
-	for k, v := range r.DBS {
-		r.CMap[k] = redis.NewClient(&redis.Options{
+	for k, v := range r.DBS { //按节点的每个库初始化
+		r.CMap[0][k] = redis.NewClient(&redis.Options{
 			Addr:         addr,
 			DB:           v,
 			PoolSize:     poolSize,
@@ -180,13 +188,25 @@ func D(t int) time.Duration {
 }
 
 //取db
-func (r *GoRedis) GetDB(key string) int {
-	if r.HashDb {
-		//return int(key[len(key)-1]) % len(r.DBS)
-		return hashCode(key) % len(r.DBS)
-	} else {
-		return r.DB
+func (r *GoRedis) GetDB(key string) (int, int) {
+	switch r.HashDb {
+	case 0:
+		return 0, r.DB
+	case 1:
+		return 0, hashCode(key) % len(r.DBS)
+	case 2:
+		return hashCode(key) % len(r.Nodes), r.DB
+	case 3:
+		hk := hashCode(key)
+		return hk % len(r.Nodes), hk % len(r.DBS)
 	}
+	// if r.HashDb {
+	// 	//return int(key[len(key)-1]) % len(r.DBS)
+	// 	return hashCode(key) % len(r.DBS)
+	// } else {
+	// 	return r.DB
+	// }
+	return 0, 0
 }
 
 //根据key取hash
@@ -201,14 +221,16 @@ func hashCode(key string) int {
 //-----具体方法
 //简单的Put方法
 func (r *GoRedis) Put(key string, val interface{}) (string, error) {
-	stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, 0)
+	i, k := r.GetDB(key)
+	stutsCmd := r.CMap[i][k].Set(r.Ctx, key, val, 0)
 	str, err := stutsCmd.Result()
 	return str, err
 }
 
 //存key,加过期时间
 func (r *GoRedis) Set(key string, val interface{}, timeout int) (string, error) {
-	stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, D(timeout))
+	i, k := r.GetDB(key)
+	stutsCmd := r.CMap[i][k].Set(r.Ctx, key, val, D(timeout))
 	//cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "setex", key, timeout, val)
 	str, err := stutsCmd.Result()
 	return str, err
@@ -216,7 +238,8 @@ func (r *GoRedis) Set(key string, val interface{}, timeout int) (string, error)
 
 //简单的Get方法,无结果返回空串,err: redis: nil
 func (r *GoRedis) Get(key string) (string, error) {
-	stutsCmd := r.CMap[r.GetDB(key)].Get(r.Ctx, key)
+	i, k := r.GetDB(key)
+	stutsCmd := r.CMap[i][k].Get(r.Ctx, key)
 	str, err := stutsCmd.Result()
 	// log.Println("1", str, "-", err)
 	if err != nil {
@@ -231,13 +254,15 @@ func (r *GoRedis) Get(key string) (string, error) {
 //根据正则取key,未考虑负载、多库
 func (r *GoRedis) GetByPattern(key string) (res []string, err error) {
 	serr := ""
-	for _, v := range r.CMap {
-		strSlice := v.Keys(r.Ctx, key)
-		arr, err1 := strSlice.Result()
-		if len(arr) > 0 {
-			res = append(res, arr...)
-		} else if err1 != nil {
-			serr += err1.Error()
+	for _, v1 := range r.CMap {
+		for _, v := range v1 {
+			strSlice := v.Keys(r.Ctx, key)
+			arr, err1 := strSlice.Result()
+			if len(arr) > 0 {
+				res = append(res, arr...)
+			} else if err1 != nil {
+				serr += err1.Error()
+			}
 		}
 	}
 	if serr != "" {
@@ -252,7 +277,7 @@ func (r *GoRedis) GetByPattern(key string) (res []string, err error) {
 //  []interface{[]interface{k1,v1},[]interface{k2,v2},[]interface{k3,v3} }
 //   - MSet(map[string]interface{}{"key1": "value1", "key2": "value2"})
 func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
-	if r.HashDb {
+	if r.HashDb > 0 {
 		if timeout < 1 {
 			timeout = 0
 		}
@@ -265,7 +290,8 @@ func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
 					if ok && len(tmp) == 2 {
 						key, ok1 := tmp[0].(string)
 						if ok1 {
-							r.CMap[r.GetDB(key)].Set(r.Ctx, key, tmp[1], timeEx)
+							_i, _k := r.GetDB(key)
+							r.CMap[_i][_k].Set(r.Ctx, key, tmp[1], timeEx)
 						}
 					}
 				}
@@ -273,21 +299,22 @@ func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
 		case map[string]interface{}:
 			if objs, ok := obj.(map[string]interface{}); ok {
 				for k, v := range objs {
-					r.CMap[r.GetDB(k)].Set(r.Ctx, k, v, timeEx)
+					_i, _k := r.GetDB(k)
+					r.CMap[_i][_k].Set(r.Ctx, k, v, timeEx)
 				}
 			}
 		}
 	} else {
 		//单库直接保存
 		if timeout < 1 {
-			stutsCmd := r.CMap[r.DB].MSet(r.Ctx, obj)
+			stutsCmd := r.CMap[0][r.DB].MSet(r.Ctx, obj)
 			str, err := stutsCmd.Result()
 			log.Println(str, err)
 		} else {
 			timeEx := time.Duration(timeout) * time.Second
 			//设置超时
 			lenth := 0
-			cmds, err := r.CMap[r.DB].Pipelined(r.Ctx, func(pipe redis.Pipeliner) error {
+			cmds, err := r.CMap[0][r.DB].Pipelined(r.Ctx, func(pipe redis.Pipeliner) error {
 				if objs, ok := obj.([]interface{}); ok {
 					lenth = len(objs)
 					for _, _tmp := range objs {
@@ -324,13 +351,15 @@ func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
 
 //设置超时时间,单位秒
 func (r *GoRedis) SetExpire(key string, expire int) error {
-	boolCmd := r.CMap[r.GetDB(key)].Expire(r.Ctx, key, D(expire))
+	i, k := r.GetDB(key)
+	boolCmd := r.CMap[i][k].Expire(r.Ctx, key, D(expire))
 	return boolCmd.Err()
 }
 
 //判断一个key是否存在
 func (r *GoRedis) Exists(key string) bool {
-	intCmd := r.CMap[r.GetDB(key)].Exists(r.Ctx, key)
+	_i, _k := r.GetDB(key)
+	intCmd := r.CMap[_i][_k].Exists(r.Ctx, key)
 	i, err := intCmd.Result()
 	if err != nil {
 		log.Println("redisutil-exists", key, err)
@@ -340,7 +369,8 @@ func (r *GoRedis) Exists(key string) bool {
 
 //直接返回字节流
 func (r *GoRedis) GetBytes(key string) (ret *[]byte, err error) {
-	cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "GET", key)
+	i, k := r.GetDB(key)
+	cmd := r.CMap[i][k].Do(r.Ctx, "GET", key)
 	res, err := cmd.Result()
 	log.Println(res)
 	if err != nil {
@@ -361,14 +391,15 @@ func (r *GoRedis) GetBytes(key string) (ret *[]byte, err error) {
 //支持删除多个key
 func (r *GoRedis) Del(key ...string) (b bool) {
 	i := 0
-	if r.HashDb {
+	if r.HashDb > 0 {
 		for _, k := range key {
-			cmd := r.CMap[r.GetDB(k)].Del(r.Ctx, k)
+			_i, _k := r.GetDB(k)
+			cmd := r.CMap[_i][_k].Del(r.Ctx, k)
 			i1, _ := cmd.Result()
 			i += int(i1)
 		}
 	} else {
-		intCmd := r.CMap[r.DB].Del(r.Ctx, key...)
+		intCmd := r.CMap[0][r.DB].Del(r.Ctx, key...)
 		i1, _ := intCmd.Result()
 		i = int(i1)
 	}
@@ -385,43 +416,49 @@ func (r *GoRedis) DelByPattern(key string) {
 
 //自增计数器
 func (r *GoRedis) Incr(key string) (int64, error) {
-	intCmd := r.CMap[r.GetDB(key)].Incr(r.Ctx, key)
+	_i, _k := r.GetDB(key)
+	intCmd := r.CMap[_i][_k].Incr(r.Ctx, key)
 	i, err := intCmd.Result()
 	return i, err
 }
 
 //自减
 func (r *GoRedis) Decrby(key string, val int) (int64, error) {
-	intCmd := r.CMap[r.GetDB(key)].DecrBy(r.Ctx, key, int64(val))
+	_i, _k := r.GetDB(key)
+	intCmd := r.CMap[_i][_k].DecrBy(r.Ctx, key, int64(val))
 	i, err := intCmd.Result()
 	return i, err
 }
 
 //批量取多个key
 func (r *GoRedis) Mget(key []string) []interface{} {
-	if r.HashDb {
-		mdb := map[int][]string{}
+	if r.HashDb > 0 {
+		mdb := map[int]map[int][]string{}
 		for _, v := range key { //分组
-			arr := mdb[r.GetDB(v)]
+			_i, _k := r.GetDB(v)
+			mdb[_i] = map[int][]string{}
+			arr := mdb[_i][_k]
 			if arr == nil {
 				arr = []string{v}
 			} else {
 				arr = append(arr, v)
 			}
-			mdb[r.GetDB(v)] = arr
+			mdb[_i][_k] = arr
 		}
 		res := []interface{}{}
 		for k, v := range mdb {
-			sliceCmd := r.CMap[k].MGet(r.Ctx, v...)
-			res1, err := sliceCmd.Result()
-			if err != nil {
-				log.Println("Mget error", err)
+			for k1, v1 := range v {
+				sliceCmd := r.CMap[k][k1].MGet(r.Ctx, v1...)
+				res1, err := sliceCmd.Result()
+				if err != nil {
+					log.Println("Mget error", err)
+				}
+				res = append(res, res1...)
 			}
-			res = append(res, res1...)
 		}
 		return res
 	} else {
-		sliceCmd := r.CMap[r.DB].MGet(r.Ctx, key...)
+		sliceCmd := r.CMap[0][r.DB].MGet(r.Ctx, key...)
 		res, err := sliceCmd.Result()
 		if err != nil {
 			log.Println("Mget error", err)
@@ -432,7 +469,8 @@ func (r *GoRedis) Mget(key []string) []interface{} {
 
 //取出并删除Key
 func (r *GoRedis) Pop(key string) (result interface{}) {
-	strCmd := r.CMap[r.GetDB(key)].Get(r.Ctx, key)
+	_i, _k := r.GetDB(key)
+	strCmd := r.CMap[_i][_k].Get(r.Ctx, key)
 	b, err := strCmd.Bytes()
 	if err != nil {
 		log.Println("Poperr bytes", err)
@@ -443,7 +481,7 @@ func (r *GoRedis) Pop(key string) (result interface{}) {
 		log.Println("Poperr json ", err)
 		return
 	} else {
-		go r.CMap[r.GetDB(key)].Del(r.Ctx, key)
+		go r.CMap[_i][_k].Del(r.Ctx, key)
 	}
 
 	return
@@ -451,7 +489,8 @@ func (r *GoRedis) Pop(key string) (result interface{}) {
 
 //list操作
 func (r *GoRedis) LPOP(list string) (result interface{}) {
-	strCmd := r.CMap[r.GetDB(list)].LPop(r.Ctx, list)
+	_i, _k := r.GetDB(list)
+	strCmd := r.CMap[_i][_k].LPop(r.Ctx, list)
 	b, err := strCmd.Bytes()
 	if err != nil {
 		log.Println("LPOP bytes", err)
@@ -467,7 +506,8 @@ func (r *GoRedis) LPOP(list string) (result interface{}) {
 
 //将一个或多个值插入到列表的尾部
 func (r *GoRedis) RPUSH(list string, val ...interface{}) bool {
-	intCmd := r.CMap[r.GetDB(list)].RPush(r.Ctx, list, val...)
+	_i, _k := r.GetDB(list)
+	intCmd := r.CMap[_i][_k].RPush(r.Ctx, list, val...)
 	i, err := intCmd.Result()
 	if err != nil {
 		log.Println("RPUSH bytes", err)
@@ -478,7 +518,8 @@ func (r *GoRedis) RPUSH(list string, val ...interface{}) bool {
 
 //获取列表长度
 func (r *GoRedis) LLEN(list string) int64 {
-	intCmd := r.CMap[r.GetDB(list)].LLen(r.Ctx, list)
+	_i, _k := r.GetDB(list)
+	intCmd := r.CMap[_i][_k].LLen(r.Ctx, list)
 	i, err := intCmd.Result()
 	if err != nil {
 		log.Println("RPUSH bytes", err)