renzheng 3 tahun lalu
induk
melakukan
59a8b6930b
2 mengubah file dengan 405 tambahan dan 85 penghapusan
  1. 375 82
      redis/goredis.go
  2. 30 3
      redis/redis_test.go

+ 375 - 82
redis/goredis.go

@@ -2,6 +2,8 @@ package redis
 
 import (
 	"context"
+	"encoding/json"
+	"errors"
 	"hash/crc32"
 	"log"
 	"math"
@@ -14,23 +16,35 @@ import (
 )
 
 type GoRedis struct {
-	Addr         string
-	Password     string
-	Code         string
-	DB           int
-	DBS          []int //数据库列表
-	HashDb       bool  //是否是多个数据库
-	PoolSize     int
-	MinIdleConns int
-	IdleTimeout  int
-	Ctx          context.Context
-	C            *redis.Client
-	BakDb        bool     //是否有备用节点
-	Bak          *GoRedis //备用节点连接
-}
-
-//初始化单个"[other=]127.0.0.1:2203|127.0.0.1:2204=0-1=1-10=300" [代码]、地址[|备用地址,&集群地址]、库、最大池、空闲时间默认300秒
+	//Password string
+	Code   string
+	DB     int
+	HashDb bool  //是否是多个数据库
+	DBS    []int //数据库列表
+	Ctx    context.Context
+	CMap   map[int]*redis.Client
+	BakDb  bool     //是否有备用节点
+	Bak    *GoRedis //备用节点连接
+}
+
+//初始化单个"[other=]127.0.0.1:2203[|#]127.0.0.1:2204[/0]=0-1=1-10=300" [代码]、地址[|备用地址[/代表默认库][#多节点地址,默认库都为0]、库、最大池、空闲时间默认300秒
+/*
+示例
+127.0.0.1:2203
+127.0.0.1:2203=2=15  默认库为2,最大连接为15,最小连接是0.2*15为3
+other=127.0.0.1:2203 带code配置
+other=127.0.0.1:2203=0=10
+other=127.0.0.1:2203=0=10=300 空闲时间300秒
+other=127.0.0.1:2203=0-8=10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接是0.2*10为2,空闲时间300秒
+other=127.0.0.1:2203=0-8=1-10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
+other=127.0.0.1:2203|127.0.0.1:2204=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
+other=127.0.0.1:2203|127.0.0.1:2204/0=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用,备用节点默认库为0),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
+other=127.0.0.1:2203#127.0.0.1:2204#127.0.0.1:2205#127.0.0.1:2206=0=10=300 带code配置,使用多节点模式(多节点所有默认库为0),最大连接为10,最小连接2,空闲时间300秒
+注意:
+1、当使用多节点时,用正则获取keys时,再用Get取模获取数据时,一定要保障放入和取出是同一个hashcode,否则数据永远取不到
+*/
 
+//解析配置并初始化
 func (r *GoRedis) Init(opt interface{}) {
 	check := false
 	code, addr, dbs, pool, idle := "", "", []int{0}, []int{2, 30}, 300
@@ -96,85 +110,83 @@ func (r *GoRedis) Init(opt interface{}) {
 	}
 	if check {
 		addrs := strings.Split(addr, "|")
-		r.Addr = addrs[0]
-		r.Code = code
-		r.DB = dbs[0]
-		//DBS          []int
-		r.PoolSize = pool[1]
-		r.MinIdleConns = pool[0]
-		r.IdleTimeout = idle
-		r.Ctx = context.Background()
-		r.C = redis.NewClient(&redis.Options{
-			Addr:         r.Addr,
-			DB:           r.DB,
-			PoolSize:     r.PoolSize,
-			MinIdleConns: r.MinIdleConns,
-			IdleTimeout:  time.Duration(r.IdleTimeout) * time.Second,
-		})
-		if len(dbs) > 1 {
-			r.HashDb = true
-			r.DBS = []int{}
-			for i := dbs[0]; i <= dbs[1]; i++ {
-				r.DBS = append(r.DBS, i)
-			}
-		}
-		if len(addrs) == 2 {
+		if len(addrs) == 2 { //备用节点的模式 2选1
+			r.init(addrs[0], code, dbs, pool[1], pool[0], idle)
 			r.BakDb = true
-			//有备用节点,集群先不考虑
+			//有备用节点,集群先不考虑,支持指定的库
+			addr1 := strings.Split(addrs[1], "/")
 			r.Bak = &GoRedis{}
-			r.Bak.Addr = addrs[1]
-			r.Bak.Code = code
-			r.Bak.DB = dbs[0]
-			r.Bak.DBS = r.DBS
-			r.Bak.HashDb = r.HashDb
-			r.Bak.PoolSize = pool[1]
-			r.Bak.MinIdleConns = pool[0]
-			r.Bak.IdleTimeout = idle
-			r.Bak.Ctx = context.Background()
-			r.Bak.C = redis.NewClient(&redis.Options{
-				Addr:         r.Bak.Addr,
-				DB:           r.Bak.DB,
-				PoolSize:     r.Bak.PoolSize,
-				MinIdleConns: r.Bak.MinIdleConns,
-				IdleTimeout:  time.Duration(r.Bak.IdleTimeout) * time.Second,
-			})
+			if len(addr1) == 2 { //没有指定库
+				i, _ := strconv.Atoi(addr1[1])
+				dbs = []int{i}
+			}
+			r.Bak.init(addr1[0], code, dbs, pool[1], pool[0], idle)
+		} else if len(addrs) == 1 {
+			addr1 := strings.Split(addrs[0], "#") //是多节点模式,所有库默认为0
+			if len(addr1) == 1 {
+				r.init(addr1[0], code, dbs, pool[1], pool[0], idle)
+			} else {
+				r.Code = code
+				r.DB = 0
+				r.Ctx = context.Background()
+				r.CMap = map[int]*redis.Client{}
+				r.HashDb = true
+				r.DBS = []int{}
+				for i := 0; i < len(addr1); i++ {
+					r.DBS = append(r.DBS, i)
+					r.CMap[i] = redis.NewClient(&redis.Options{
+						Addr:         addr1[i],
+						PoolSize:     pool[1],
+						MinIdleConns: pool[0],
+						IdleTimeout:  time.Duration(idle) * time.Second,
+					})
+				}
+			}
 		}
 	}
-
 	log.Println(check, code, addr, dbs, pool, idle, r.DBS)
 }
 
-func NewGoRedis(Addr, Password, Code string, DB, PoolSize, MinIdleConns, IdleTimeout int) *GoRedis {
-	r := &GoRedis{}
-	r.C = redis.NewClient(&redis.Options{
-		Addr:         Addr,
-		Password:     Password,
-		DB:           DB,
-		PoolSize:     PoolSize,
-		MinIdleConns: MinIdleConns,
-		IdleTimeout:  time.Duration(IdleTimeout) * time.Second,
-	})
-	return r
-}
-
-func (r *GoRedis) Put(key string, val interface{}) {
-	var stutsCmd *redis.StatusCmd
-	if r.HashDb { //有多个库
-		r.selectDB(key)
-		stutsCmd = r.C.Set(r.Ctx, key, val, 0)
+//初始化连接
+func (r *GoRedis) init(addr, code string, dbs []int, poolSize, minIdleConns, idleTimeOut int) {
+	r.Code = code
+	r.DB = dbs[0]
+	r.Ctx = context.Background()
+	r.CMap = map[int]*redis.Client{}
+	if len(dbs) > 1 {
+		r.HashDb = true
+		r.DBS = []int{}
+		for i := dbs[0]; i <= dbs[1]; i++ {
+			r.DBS = append(r.DBS, i)
+		}
 	} else {
-		stutsCmd = r.C.Set(r.Ctx, key, val, 0)
+		r.DBS = dbs
+	}
+	for k, v := range r.DBS {
+		r.CMap[k] = redis.NewClient(&redis.Options{
+			Addr:         addr,
+			DB:           v,
+			PoolSize:     poolSize,
+			MinIdleConns: minIdleConns,
+			IdleTimeout:  time.Duration(idleTimeOut) * time.Second,
+		})
 	}
-	str, err := stutsCmd.Result()
-	log.Println(str, err)
 }
 
-//此处切换失败!!!,只能切换取到的连接,不会切换Client,待验证
-func (r *GoRedis) selectDB(key string) {
-	db := r.DBS[hashCode(key)%len(r.DBS)] //取存哪个db,可以提取出来一个方法
-	r.C.Conn(r.Ctx).Select(r.Ctx, db)     //切换数据库
+func D(t int) time.Duration {
+	return time.Duration(t) * time.Second
 }
 
+//取db
+func (r *GoRedis) GetDB(key string) int {
+	if r.HashDb {
+		return hashCode(key) % len(r.DBS)
+	} else {
+		return r.DB
+	}
+}
+
+//根据key取hash
 func hashCode(key string) int {
 	v := int(crc32.ChecksumIEEE([]byte(key)))
 	if v < 0 {
@@ -182,3 +194,284 @@ func hashCode(key string) int {
 	}
 	return v
 }
+
+//-----具体方法
+//简单的Put方法
+func (r *GoRedis) Put(key string, val interface{}) (string, error) {
+	stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, 0)
+	str, err := stutsCmd.Result()
+	return str, err
+}
+func (r *GoRedis) Set(key string, val interface{}, timeout int) (string, error) {
+	stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, D(timeout))
+	//cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "setex", key, timeout, val)
+	str, err := stutsCmd.Result()
+	return str, err
+}
+
+//简单的Get方法,无结果返回空串,err: redis: nil
+func (r *GoRedis) Get(key string) (string, error) {
+	stutsCmd := r.CMap[r.GetDB(key)].Get(r.Ctx, key)
+	str, err := stutsCmd.Result()
+	// log.Println("1", str, "-", err)
+	if err != nil {
+		//有备用节点
+		if r.BakDb {
+			str, err = r.Bak.Get(key)
+		}
+	}
+	return str, err
+}
+
+//根据正则取key,未考虑负载、多库
+func (r *GoRedis) GetByPattern(key string) (res []string, err error) {
+	serr := ""
+	for _, v := range r.CMap {
+		strSlice := v.Keys(r.Ctx, key)
+		arr, err1 := strSlice.Result()
+		if len(arr) > 0 {
+			res = append(res, arr...)
+		} else if err1 != nil {
+			serr += err1.Error()
+		}
+	}
+	if serr != "" {
+		err = errors.New(serr)
+	}
+	return
+}
+
+//批量保存数据
+//   不支持- MSet("key1", "value1", "key2", "value2")
+//   不支持- MSet([]string{"key1", "value1", "key2", "value2"})
+//  []interface{[]interface{k1,v1},[]interface{k2,v2},[]interface{k3,v3} }
+//   - MSet(map[string]interface{}{"key1": "value1", "key2": "value2"})
+func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
+	if r.HashDb {
+		if timeout < 1 {
+			timeout = 0
+		}
+		timeEx := time.Duration(timeout) * time.Second
+		switch obj.(type) {
+		case []interface{}:
+			if objs, ok := obj.([]interface{}); ok {
+				for _, _tmp := range objs {
+					tmp, ok := _tmp.([]interface{})
+					if ok && len(tmp) == 2 {
+						key, ok1 := tmp[0].(string)
+						if ok1 {
+							r.CMap[r.GetDB(key)].Set(r.Ctx, key, tmp[1], timeEx)
+						}
+					}
+				}
+			}
+		case map[string]interface{}:
+			if objs, ok := obj.(map[string]interface{}); ok {
+				for k, v := range objs {
+					r.CMap[r.GetDB(k)].Set(r.Ctx, k, v, timeEx)
+				}
+			}
+		}
+	} else {
+		//单库直接保存
+		if timeout < 1 {
+			stutsCmd := r.CMap[r.DB].MSet(r.Ctx, obj)
+			str, err := stutsCmd.Result()
+			log.Println(str, err)
+		} else {
+			timeEx := time.Duration(timeout) * time.Second
+			//设置超时
+			lenth := 0
+			cmds, err := r.CMap[r.DB].Pipelined(r.Ctx, func(pipe redis.Pipeliner) error {
+				if objs, ok := obj.([]interface{}); ok {
+					lenth = len(objs)
+					for _, _tmp := range objs {
+						tmp, ok := _tmp.([]interface{})
+						if ok && len(tmp) == 2 {
+							key, ok1 := tmp[0].(string)
+							if ok1 {
+								pipe.Set(r.Ctx, key, tmp[1], timeEx)
+							}
+						}
+					}
+
+				} else if objs, ok := obj.(map[string]interface{}); ok {
+					lenth = len(objs)
+					for k, v := range objs {
+						pipe.Set(r.Ctx, k, v, timeEx)
+					}
+				} else {
+					log.Println("bulkPut type error")
+				}
+				return nil
+			})
+			if err != nil {
+				log.Println("bulkPut error", err.Error())
+			}
+			if len(cmds) == lenth { //数据相等
+				for _, v := range cmds {
+					log.Println("cmd", v.Args(), v.String())
+				}
+			}
+		}
+	}
+}
+
+//设置超时时间,单位秒
+func (r *GoRedis) SetExpire(key string, expire int) error {
+	boolCmd := r.CMap[r.GetDB(key)].Expire(r.Ctx, key, D(expire))
+	return boolCmd.Err()
+}
+
+//判断一个key是否存在
+func (r *GoRedis) Exists(key string) bool {
+	intCmd := r.CMap[r.GetDB(key)].Exists(r.Ctx, key)
+	i, err := intCmd.Result()
+	if err != nil {
+		log.Println("redisutil-exists", key, err)
+	}
+	return i == 1
+}
+
+//直接返回字节流
+func (r *GoRedis) GetBytes(key string) (ret *[]byte, err error) {
+	cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "GET", key)
+	res, err := cmd.Result()
+	log.Println(res)
+	if err != nil {
+		log.Println("redisutil-GetBytesError", err)
+	} else {
+		if tmp, ok := res.([]byte); ok {
+			ret = &tmp
+		} else if tmp, ok := res.(string); ok {
+			bs := []byte(tmp)
+			ret = &bs
+		} else {
+			err = errors.New("redis返回数据格式不对")
+		}
+	}
+	return
+}
+
+//支持删除多个key
+func (r *GoRedis) Del(key ...string) (b bool) {
+	i := 0
+	if r.HashDb {
+		for _, k := range key {
+			cmd := r.CMap[r.GetDB(k)].Del(r.Ctx, k)
+			i1, _ := cmd.Result()
+			i += int(i1)
+		}
+	} else {
+		intCmd := r.CMap[r.DB].Del(r.Ctx, key...)
+		i1, _ := intCmd.Result()
+		i = int(i1)
+	}
+	return i == len(key)
+}
+
+//根据代码和前辍key删除多个
+func (r *GoRedis) DelByPattern(key string) {
+	res, _ := r.GetByPattern(key)
+	if len(res) > 0 {
+		r.Del(res...)
+	}
+}
+
+//自增计数器
+func (r *GoRedis) Incr(key string) (int64, error) {
+	intCmd := r.CMap[r.GetDB(key)].Incr(r.Ctx, key)
+	i, err := intCmd.Result()
+	return i, err
+}
+
+//自减
+func (r *GoRedis) Decrby(key string, val int) (int64, error) {
+	intCmd := r.CMap[r.GetDB(key)].DecrBy(r.Ctx, key, int64(val))
+	i, err := intCmd.Result()
+	return i, err
+}
+
+//批量取多个key
+func (r *GoRedis) Mget(key []string) []interface{} {
+	if r.HashDb {
+		mdb := map[int][]string{}
+		for _, v := range key { //分组
+			arr := mdb[r.GetDB(v)]
+			if arr == nil {
+				arr = []string{v}
+			} else {
+				arr = append(arr, v)
+			}
+			mdb[r.GetDB(v)] = arr
+		}
+		res := []interface{}{}
+		for k, v := range mdb {
+			sliceCmd := r.CMap[k].MGet(r.Ctx, v...)
+			res1, err := sliceCmd.Result()
+			if err != nil {
+				log.Println("Mget error", err)
+			}
+			res = append(res, res1...)
+		}
+		return res
+	} else {
+		sliceCmd := r.CMap[r.DB].MGet(r.Ctx, key...)
+		res, err := sliceCmd.Result()
+		if err != nil {
+			log.Println("Mget error", err)
+		}
+		return res
+	}
+}
+
+//取出并删除Key
+func (r *GoRedis) Pop(key string) (result interface{}) {
+	strCmd := r.CMap[r.GetDB(key)].GetDel(r.Ctx, key)
+	b, err := strCmd.Bytes()
+	if err != nil {
+		log.Println("Poperr bytes", err)
+		return
+	}
+	err1 := json.Unmarshal(b, &result)
+	if err1 != nil {
+		log.Println("Poperr json ", err)
+		return
+	}
+	return
+}
+
+//list操作
+func (r *GoRedis) LPOP(list string) (result interface{}) {
+	strCmd := r.CMap[r.GetDB(list)].LPop(r.Ctx, list)
+	b, err := strCmd.Bytes()
+	if err != nil {
+		log.Println("LPOP bytes", err)
+		return
+	}
+	err1 := json.Unmarshal(b, &result)
+	if err1 != nil {
+		log.Println("LPOP json ", err)
+		return
+	}
+	return
+}
+
+func (r *GoRedis) RPUSH(list string, val interface{}) bool {
+	intCmd := r.CMap[r.GetDB(list)].RPush(r.Ctx, list, val)
+	i, err := intCmd.Result()
+	if err != nil {
+		log.Println("RPUSH bytes", err)
+		return false
+	}
+	return i == 1
+}
+
+func (r *GoRedis) LLEN(list string) int64 {
+	intCmd := r.CMap[r.GetDB(list)].LLen(r.Ctx, list)
+	i, err := intCmd.Result()
+	if err != nil {
+		log.Println("RPUSH bytes", err)
+	}
+	return i
+}

+ 30 - 3
redis/redis_test.go

@@ -1,7 +1,9 @@
 package redis
 
 import (
+	"encoding/json"
 	"log"
+	"strings"
 	"testing"
 )
 
@@ -12,7 +14,32 @@ func TestInit(t *testing.T) {
 	// r1.Init("other=127.0.0.1:2203|127.0.0.1:2204=0-14=1-10=400")
 	// r1.Init("other=127.0.0.1:2203=0-1=1-10=400")
 	// r1.Init("other=127.0.0.1:2203=1=1")
-	r1.Init("192.168.3.207:6379=1-4=2=600")
-	r1.Put(`ddd1`, "fff")
-	log.Println(hashCode("aaaaOppkdlkj9i9)Jkjfdksfhjhajkhdsa8d98uiuhjkdsnjknbaaa4554645654645ajjjhh12uih"))
+	r1.Init("192.168.3.207:6379=0=2=600")
+	res, err := r1.GetByPattern(`dd*`)
+	log.Println(res, err)
+	r1.BulkPut(120, map[string]interface{}{"a1": 1, "a2": "222", "a3": 456, "a4": "fdsfdsfds"})
+	r1.BulkPut(220, []interface{}{[]interface{}{"b1", 1}, []interface{}{"b2", "222"}, []interface{}{"b3", 5666}})
+	b, _ := json.Marshal(map[string]interface{}{"key": "值val1"})
+	log.Println(b)
+	b1 := make([]byte, 1000)
+	strings.NewReader("aabbccdd").Read(b1)
+	r1.Set("aabb", b1, 400)
+	v, e := r1.GetBytes("aabb")
+	log.Println("ffff", v, e)
+
+	r1.Set("aaa", "ddddddd", 500)
+	r1.SetExpire("aaa", 900)
+	log.Println("exists", r1.Exists("aaa"))
+	r1.Set("bbb", "ddddddd", 500)
+	r1.Del("bbb")
+	r1.Set("bbcc", "ddddddd", 500)
+	r1.Set("bbcc1", "ddddddd", 500)
+	r1.DelByPattern("bbcc*")
+	r1.Incr("aa1")
+	r1.Incr("aa1")
+	r1.Incr("aa1")
+	r1.Incr("aa1")
+	r1.Decrby("aa1", 2)
+
+	log.Println(hashCode("aaaaOppkdlk"))
 }