goredis.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package redis
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "hash/crc32"
  7. "log"
  8. "math"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/go-redis/redis/v8"
  14. )
  15. //goredis 工具类对象
  16. type GoRedis struct {
  17. //Password string
  18. Code string
  19. DB int
  20. HashDb bool //是否是多个数据库
  21. DBS []int //数据库列表
  22. Ctx context.Context
  23. CMap map[int]*redis.Client
  24. BakDb bool //是否有备用节点
  25. Bak *GoRedis //备用节点连接
  26. }
  27. //初始化单个"[other=]127.0.0.1:2203[|#]127.0.0.1:2204[/0]=0-1=1-10=300" [代码]、地址[|备用地址[/代表默认库][#多节点地址,默认库都为0]、库、最大池、空闲时间默认300秒
  28. /*
  29. 示例
  30. 127.0.0.1:2203
  31. 127.0.0.1:2203=2=15 默认库为2,最大连接为15,最小连接是0.2*15为3
  32. other=127.0.0.1:2203 带code配置
  33. other=127.0.0.1:2203=0=10
  34. other=127.0.0.1:2203=0=10=300 空闲时间300秒
  35. other=127.0.0.1:2203=0-8=10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接是0.2*10为2,空闲时间300秒
  36. other=127.0.0.1:2203=0-8=1-10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  37. other=127.0.0.1:2203|127.0.0.1:2204=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  38. other=127.0.0.1:2203|127.0.0.1:2204/0=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用,备用节点默认库为0),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  39. other=127.0.0.1:2203#127.0.0.1:2204#127.0.0.1:2205#127.0.0.1:2206=0=10=300 带code配置,使用多节点模式(多节点所有默认库为0),最大连接为10,最小连接2,空闲时间300秒
  40. 注意:
  41. 1、当使用多节点时,用正则获取keys时,再用Get取模获取数据时,一定要保障放入和取出是同一个hashcode,否则数据永远取不到
  42. */
  43. //解析配置并初始化,opt为string或map
  44. func (r *GoRedis) Init(opt interface{}) {
  45. check := false
  46. code, addr, dbs, pool, idle := "", "", []int{0}, []int{2, 30}, 300
  47. if so, ok := opt.(string); ok {
  48. arr := strings.Split(so, "=")
  49. regAddr := regexp.MustCompile("[0-9.a-zA-Z/]+:[0-9]+.*")
  50. if len(arr) == 1 {
  51. if regAddr.MatchString(arr[0]) {
  52. check = true
  53. addr = arr[0]
  54. }
  55. } else if len(arr) > 1 {
  56. index := 0
  57. if regAddr.MatchString(arr[0]) {
  58. index = 1
  59. addr = arr[0]
  60. check = true
  61. } else if regAddr.MatchString(arr[1]) {
  62. check = true
  63. addr = arr[1]
  64. code = arr[0]
  65. }
  66. //解析库配置
  67. if len(arr) > 2-index { //dbs配置
  68. dbs1 := strings.Split(arr[2-index], "-")
  69. if len(dbs1) == 1 || len(dbs1) == 2 {
  70. check = true
  71. dbs[0], _ = strconv.Atoi(dbs1[0])
  72. if len(dbs1) == 2 {
  73. tmp, _ := strconv.Atoi(dbs1[1])
  74. dbs = append(dbs, tmp)
  75. }
  76. } else {
  77. check = false
  78. }
  79. //解析连接池配置
  80. if len(arr) > 3-index {
  81. pool1 := strings.Split(arr[3-index], "-")
  82. if len(pool1) == 1 || len(pool1) == 2 {
  83. check = true
  84. if len(pool1) == 1 {
  85. pool[1], _ = strconv.Atoi(pool1[0])
  86. pool[0] = int(math.Ceil(float64(pool[1]) * 0.2))
  87. } else {
  88. pool[0], _ = strconv.Atoi(pool1[0])
  89. pool[1], _ = strconv.Atoi(pool1[1])
  90. }
  91. } else {
  92. check = false
  93. }
  94. //解析最大空闲时间配置
  95. if len(arr) > 4-index {
  96. idle, _ = strconv.Atoi(arr[4-index])
  97. if idle == 0 {
  98. idle = 300
  99. }
  100. }
  101. }
  102. }
  103. }
  104. } else if _, ok := opt.(map[string]interface{}); ok {
  105. }
  106. if check {
  107. addrs := strings.Split(addr, "|")
  108. if len(addrs) == 2 { //备用节点的模式 2选1
  109. r.init(addrs[0], code, dbs, pool[1], pool[0], idle)
  110. r.BakDb = true
  111. //有备用节点,集群先不考虑,支持指定的库
  112. addr1 := strings.Split(addrs[1], "/")
  113. r.Bak = &GoRedis{}
  114. if len(addr1) == 2 { //没有指定库
  115. i, _ := strconv.Atoi(addr1[1])
  116. dbs = []int{i}
  117. }
  118. r.Bak.init(addr1[0], code, dbs, pool[1], pool[0], idle)
  119. } else if len(addrs) == 1 {
  120. addr1 := strings.Split(addrs[0], "#") //是多节点模式,所有库默认为0
  121. if len(addr1) == 1 {
  122. r.init(addr1[0], code, dbs, pool[1], pool[0], idle)
  123. } else {
  124. r.Code = code
  125. r.DB = 0
  126. r.Ctx = context.Background()
  127. r.CMap = map[int]*redis.Client{}
  128. r.HashDb = true
  129. r.DBS = []int{}
  130. for i := 0; i < len(addr1); i++ {
  131. r.DBS = append(r.DBS, i)
  132. r.CMap[i] = redis.NewClient(&redis.Options{
  133. Addr: addr1[i],
  134. PoolSize: pool[1],
  135. MinIdleConns: pool[0],
  136. IdleTimeout: time.Duration(idle) * time.Second,
  137. })
  138. }
  139. }
  140. }
  141. }
  142. log.Println(check, code, addr, dbs, pool, idle, r.DBS)
  143. }
  144. //初始化连接
  145. func (r *GoRedis) init(addr, code string, dbs []int, poolSize, minIdleConns, idleTimeOut int) {
  146. r.Code = code
  147. r.DB = dbs[0]
  148. r.Ctx = context.Background()
  149. r.CMap = map[int]*redis.Client{}
  150. if len(dbs) > 1 {
  151. r.HashDb = true
  152. r.DBS = []int{}
  153. for i := dbs[0]; i <= dbs[1]; i++ {
  154. r.DBS = append(r.DBS, i)
  155. }
  156. } else {
  157. r.DBS = dbs
  158. }
  159. for k, v := range r.DBS {
  160. r.CMap[k] = redis.NewClient(&redis.Options{
  161. Addr: addr,
  162. DB: v,
  163. PoolSize: poolSize,
  164. MinIdleConns: minIdleConns,
  165. IdleTimeout: time.Duration(idleTimeOut) * time.Second,
  166. })
  167. }
  168. }
  169. //int转timeDuration
  170. func D(t int) time.Duration {
  171. return time.Duration(t) * time.Second
  172. }
  173. //取db
  174. func (r *GoRedis) GetDB(key string) int {
  175. if r.HashDb {
  176. //return int(key[len(key)-1]) % len(r.DBS)
  177. return hashCode(key) % len(r.DBS)
  178. } else {
  179. return r.DB
  180. }
  181. }
  182. //根据key取hash
  183. func hashCode(key string) int {
  184. v := int(crc32.ChecksumIEEE([]byte(key)))
  185. if v < 0 {
  186. v = -v
  187. }
  188. return v
  189. }
  190. //-----具体方法
  191. //简单的Put方法
  192. func (r *GoRedis) Put(key string, val interface{}) (string, error) {
  193. stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, 0)
  194. str, err := stutsCmd.Result()
  195. return str, err
  196. }
  197. //存key,加过期时间
  198. func (r *GoRedis) Set(key string, val interface{}, timeout int) (string, error) {
  199. stutsCmd := r.CMap[r.GetDB(key)].Set(r.Ctx, key, val, D(timeout))
  200. //cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "setex", key, timeout, val)
  201. str, err := stutsCmd.Result()
  202. return str, err
  203. }
  204. //简单的Get方法,无结果返回空串,err: redis: nil
  205. func (r *GoRedis) Get(key string) (string, error) {
  206. stutsCmd := r.CMap[r.GetDB(key)].Get(r.Ctx, key)
  207. str, err := stutsCmd.Result()
  208. // log.Println("1", str, "-", err)
  209. if err != nil {
  210. //有备用节点
  211. if r.BakDb {
  212. str, err = r.Bak.Get(key)
  213. }
  214. }
  215. return str, err
  216. }
  217. //根据正则取key,未考虑负载、多库
  218. func (r *GoRedis) GetByPattern(key string) (res []string, err error) {
  219. serr := ""
  220. for _, v := range r.CMap {
  221. strSlice := v.Keys(r.Ctx, key)
  222. arr, err1 := strSlice.Result()
  223. if len(arr) > 0 {
  224. res = append(res, arr...)
  225. } else if err1 != nil {
  226. serr += err1.Error()
  227. }
  228. }
  229. if serr != "" {
  230. err = errors.New(serr)
  231. }
  232. return
  233. }
  234. //批量保存数据
  235. // 不支持- MSet("key1", "value1", "key2", "value2")
  236. // 不支持- MSet([]string{"key1", "value1", "key2", "value2"})
  237. // []interface{[]interface{k1,v1},[]interface{k2,v2},[]interface{k3,v3} }
  238. // - MSet(map[string]interface{}{"key1": "value1", "key2": "value2"})
  239. func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
  240. if r.HashDb {
  241. if timeout < 1 {
  242. timeout = 0
  243. }
  244. timeEx := time.Duration(timeout) * time.Second
  245. switch obj.(type) {
  246. case []interface{}:
  247. if objs, ok := obj.([]interface{}); ok {
  248. for _, _tmp := range objs {
  249. tmp, ok := _tmp.([]interface{})
  250. if ok && len(tmp) == 2 {
  251. key, ok1 := tmp[0].(string)
  252. if ok1 {
  253. r.CMap[r.GetDB(key)].Set(r.Ctx, key, tmp[1], timeEx)
  254. }
  255. }
  256. }
  257. }
  258. case map[string]interface{}:
  259. if objs, ok := obj.(map[string]interface{}); ok {
  260. for k, v := range objs {
  261. r.CMap[r.GetDB(k)].Set(r.Ctx, k, v, timeEx)
  262. }
  263. }
  264. }
  265. } else {
  266. //单库直接保存
  267. if timeout < 1 {
  268. stutsCmd := r.CMap[r.DB].MSet(r.Ctx, obj)
  269. str, err := stutsCmd.Result()
  270. log.Println(str, err)
  271. } else {
  272. timeEx := time.Duration(timeout) * time.Second
  273. //设置超时
  274. lenth := 0
  275. cmds, err := r.CMap[r.DB].Pipelined(r.Ctx, func(pipe redis.Pipeliner) error {
  276. if objs, ok := obj.([]interface{}); ok {
  277. lenth = len(objs)
  278. for _, _tmp := range objs {
  279. tmp, ok := _tmp.([]interface{})
  280. if ok && len(tmp) == 2 {
  281. key, ok1 := tmp[0].(string)
  282. if ok1 {
  283. pipe.Set(r.Ctx, key, tmp[1], timeEx)
  284. }
  285. }
  286. }
  287. } else if objs, ok := obj.(map[string]interface{}); ok {
  288. lenth = len(objs)
  289. for k, v := range objs {
  290. pipe.Set(r.Ctx, k, v, timeEx)
  291. }
  292. } else {
  293. log.Println("bulkPut type error")
  294. }
  295. return nil
  296. })
  297. if err != nil {
  298. log.Println("bulkPut error", err.Error())
  299. }
  300. if len(cmds) == lenth { //数据相等
  301. for _, v := range cmds {
  302. log.Println("cmd", v.Args(), v.String())
  303. }
  304. }
  305. }
  306. }
  307. }
  308. //设置超时时间,单位秒
  309. func (r *GoRedis) SetExpire(key string, expire int) error {
  310. boolCmd := r.CMap[r.GetDB(key)].Expire(r.Ctx, key, D(expire))
  311. return boolCmd.Err()
  312. }
  313. //判断一个key是否存在
  314. func (r *GoRedis) Exists(key string) bool {
  315. intCmd := r.CMap[r.GetDB(key)].Exists(r.Ctx, key)
  316. i, err := intCmd.Result()
  317. if err != nil {
  318. log.Println("redisutil-exists", key, err)
  319. }
  320. return i == 1
  321. }
  322. //直接返回字节流
  323. func (r *GoRedis) GetBytes(key string) (ret *[]byte, err error) {
  324. cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "GET", key)
  325. res, err := cmd.Result()
  326. log.Println(res)
  327. if err != nil {
  328. log.Println("redisutil-GetBytesError", err)
  329. } else {
  330. if tmp, ok := res.([]byte); ok {
  331. ret = &tmp
  332. } else if tmp, ok := res.(string); ok {
  333. bs := []byte(tmp)
  334. ret = &bs
  335. } else {
  336. err = errors.New("redis返回数据格式不对")
  337. }
  338. }
  339. return
  340. }
  341. //支持删除多个key
  342. func (r *GoRedis) Del(key ...string) (b bool) {
  343. i := 0
  344. if r.HashDb {
  345. for _, k := range key {
  346. cmd := r.CMap[r.GetDB(k)].Del(r.Ctx, k)
  347. i1, _ := cmd.Result()
  348. i += int(i1)
  349. }
  350. } else {
  351. intCmd := r.CMap[r.DB].Del(r.Ctx, key...)
  352. i1, _ := intCmd.Result()
  353. i = int(i1)
  354. }
  355. return i == len(key)
  356. }
  357. //根据代码和前辍key删除多个
  358. func (r *GoRedis) DelByPattern(key string) {
  359. res, _ := r.GetByPattern(key)
  360. if len(res) > 0 {
  361. r.Del(res...)
  362. }
  363. }
  364. //自增计数器
  365. func (r *GoRedis) Incr(key string) (int64, error) {
  366. intCmd := r.CMap[r.GetDB(key)].Incr(r.Ctx, key)
  367. i, err := intCmd.Result()
  368. return i, err
  369. }
  370. //自减
  371. func (r *GoRedis) Decrby(key string, val int) (int64, error) {
  372. intCmd := r.CMap[r.GetDB(key)].DecrBy(r.Ctx, key, int64(val))
  373. i, err := intCmd.Result()
  374. return i, err
  375. }
  376. //批量取多个key
  377. func (r *GoRedis) Mget(key []string) []interface{} {
  378. if r.HashDb {
  379. mdb := map[int][]string{}
  380. for _, v := range key { //分组
  381. arr := mdb[r.GetDB(v)]
  382. if arr == nil {
  383. arr = []string{v}
  384. } else {
  385. arr = append(arr, v)
  386. }
  387. mdb[r.GetDB(v)] = arr
  388. }
  389. res := []interface{}{}
  390. for k, v := range mdb {
  391. sliceCmd := r.CMap[k].MGet(r.Ctx, v...)
  392. res1, err := sliceCmd.Result()
  393. if err != nil {
  394. log.Println("Mget error", err)
  395. }
  396. res = append(res, res1...)
  397. }
  398. return res
  399. } else {
  400. sliceCmd := r.CMap[r.DB].MGet(r.Ctx, key...)
  401. res, err := sliceCmd.Result()
  402. if err != nil {
  403. log.Println("Mget error", err)
  404. }
  405. return res
  406. }
  407. }
  408. //取出并删除Key
  409. func (r *GoRedis) Pop(key string) (result interface{}) {
  410. strCmd := r.CMap[r.GetDB(key)].Get(r.Ctx, key)
  411. b, err := strCmd.Bytes()
  412. if err != nil {
  413. log.Println("Poperr bytes", err)
  414. return
  415. }
  416. err1 := json.Unmarshal(b, &result)
  417. if err1 != nil {
  418. log.Println("Poperr json ", err)
  419. return
  420. } else {
  421. go r.CMap[r.GetDB(key)].Del(r.Ctx, key)
  422. }
  423. return
  424. }
  425. //list操作
  426. func (r *GoRedis) LPOP(list string) (result interface{}) {
  427. strCmd := r.CMap[r.GetDB(list)].LPop(r.Ctx, list)
  428. b, err := strCmd.Bytes()
  429. if err != nil {
  430. log.Println("LPOP bytes", err)
  431. return
  432. }
  433. err1 := json.Unmarshal(b, &result)
  434. if err1 != nil {
  435. log.Println("LPOP json ", err)
  436. return
  437. }
  438. return
  439. }
  440. //将一个或多个值插入到列表的尾部
  441. func (r *GoRedis) RPUSH(list string, val ...interface{}) bool {
  442. intCmd := r.CMap[r.GetDB(list)].RPush(r.Ctx, list, val...)
  443. i, err := intCmd.Result()
  444. if err != nil {
  445. log.Println("RPUSH bytes", err)
  446. return false
  447. }
  448. return i == 1
  449. }
  450. //获取列表长度
  451. func (r *GoRedis) LLEN(list string) int64 {
  452. intCmd := r.CMap[r.GetDB(list)].LLen(r.Ctx, list)
  453. i, err := intCmd.Result()
  454. if err != nil {
  455. log.Println("RPUSH bytes", err)
  456. }
  457. return i
  458. }