goredis.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package redis
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "hash/crc32"
  7. "log"
  8. "math"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/go-redis/redis/v8"
  14. )
  15. //goredis 工具类对象
  16. type GoRedis struct {
  17. //Password string
  18. Code string
  19. DB int //默认库
  20. DBS []int //数据库列表
  21. Nodes []int //节点列表
  22. HashDb int //是否是多个数据库 0单库 1单节点多库 2多节点单库 3多节点多库
  23. Ctx context.Context
  24. CMap map[int]map[int]*redis.Client
  25. BakDb bool //是否有备用节点
  26. Bak *GoRedis //备用节点连接
  27. }
  28. //初始化单个"[other=]127.0.0.1:2203[|#]127.0.0.1:2204[/0]=0-1=1-10=300" [代码]、地址[|备用地址[/代表默认库][#多节点地址,默认库都为0]、库、最大池、空闲时间默认300秒
  29. /*
  30. 示例
  31. 127.0.0.1:2203
  32. 127.0.0.1:2203=2=15 默认库为2,最大连接为15,最小连接是0.2*15为3
  33. other=127.0.0.1:2203 带code配置
  34. other=127.0.0.1:2203=0=10
  35. other=127.0.0.1:2203=0=10=300 空闲时间300秒
  36. other=127.0.0.1:2203=0-8=10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接是0.2*10为2,空闲时间300秒
  37. other=127.0.0.1:2203=0-8=1-10=300 带code配置,使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  38. other=127.0.0.1:2203|127.0.0.1:2204=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  39. other=127.0.0.1:2203|127.0.0.1:2204/0=0-8=10=300 带code配置,使用备用节点(仅供取数据时使用,备用节点默认库为0),使用多个库即0 1 2 3 4 5 6 7 8,最大连接为10,最小连接1,空闲时间300秒
  40. other=127.0.0.1:2203#127.0.0.1:2204#127.0.0.1:2205#127.0.0.1:2206=0=10=300 带code配置,使用多节点模式(多节点所有默认库为0),最大连接为10,最小连接2,空闲时间300秒
  41. 注意:
  42. 1、当使用多节点时,用正则获取keys时,再用Get取模获取数据时,一定要保障放入和取出是同一个hashcode,否则数据永远取不到
  43. */
  44. //解析配置并初始化,opt为string或map
  45. func (r *GoRedis) Init(opt interface{}) {
  46. check := false
  47. //代码 地址 []库范围 []池配置 空闲时间
  48. code, addr, dbs, pool, idle := "", "", []int{0}, []int{2, 30}, 300
  49. if so, ok := opt.(string); ok {
  50. arr := strings.Split(so, "=")
  51. regAddr := regexp.MustCompile("[0-9.a-zA-Z/]+:[0-9]+.*")
  52. if len(arr) == 1 { //只是一个串
  53. if regAddr.MatchString(arr[0]) {
  54. check = true
  55. addr = arr[0]
  56. }
  57. } else if len(arr) > 1 {
  58. index := 0
  59. if regAddr.MatchString(arr[0]) { //第1个是地址
  60. index = 1
  61. addr = arr[0]
  62. check = true
  63. } else if regAddr.MatchString(arr[1]) { //第二个是地址
  64. check = true
  65. addr = arr[1]
  66. code = arr[0]
  67. }
  68. //解析库配置
  69. if len(arr) > 2-index { //dbs配置
  70. dbs1 := strings.Split(arr[2-index], "-") //库范围的配置
  71. if len(dbs1) == 1 || len(dbs1) == 2 {
  72. check = true
  73. dbs[0], _ = strconv.Atoi(dbs1[0])
  74. if len(dbs1) == 2 {
  75. tmp, _ := strconv.Atoi(dbs1[1])
  76. dbs = append(dbs, tmp)
  77. }
  78. } else {
  79. check = false
  80. }
  81. //解析连接池配置
  82. if len(arr) > 3-index {
  83. pool1 := strings.Split(arr[3-index], "-")
  84. if len(pool1) == 1 || len(pool1) == 2 {
  85. check = true
  86. if len(pool1) == 1 {
  87. pool[1], _ = strconv.Atoi(pool1[0])
  88. pool[0] = int(math.Ceil(float64(pool[1]) * 0.2))
  89. } else {
  90. pool[0], _ = strconv.Atoi(pool1[0])
  91. pool[1], _ = strconv.Atoi(pool1[1])
  92. }
  93. } else {
  94. check = false
  95. }
  96. //解析最大空闲时间配置
  97. if len(arr) > 4-index {
  98. idle, _ = strconv.Atoi(arr[4-index])
  99. if idle == 0 {
  100. idle = 300
  101. }
  102. }
  103. }
  104. }
  105. }
  106. } else if _, ok := opt.(map[string]interface{}); ok {
  107. }
  108. if check {
  109. log.Println("代码 地址 []库范围 []池配置 空闲时间", code, addr, dbs, pool, idle)
  110. addrs := strings.Split(addr, "|") //备用节点模式
  111. if len(addrs) == 2 { //备用节点的模式 2选1
  112. r.init(addrs[0], code, dbs, pool[1], pool[0], idle)
  113. r.BakDb = true
  114. //有备用节点,集群先不考虑,支持指定的库
  115. addr1 := strings.Split(addrs[1], "/")
  116. r.Bak = &GoRedis{}
  117. if len(addr1) == 2 { //没有指定库 根据/区分
  118. i, _ := strconv.Atoi(addr1[1])
  119. dbs = []int{i}
  120. }
  121. r.Bak.init(addr1[0], code, dbs, pool[1], pool[0], idle)
  122. } else if len(addrs) == 1 {
  123. addr1 := strings.Split(addrs[0], "#") //是多节点模式,所有库默认为0
  124. if len(addr1) == 1 {
  125. r.init(addr1[0], code, dbs, pool[1], pool[0], idle)
  126. } else { //多节点模式
  127. r.Code = code
  128. r.DB = dbs[0]
  129. r.HashDb = 1 + len(dbs)
  130. if len(dbs) > 1 {
  131. r.DBS = []int{}
  132. for i := dbs[0]; i <= dbs[1]; i++ {
  133. r.DBS = append(r.DBS, i)
  134. }
  135. } else {
  136. r.DBS = dbs
  137. }
  138. r.Ctx = context.Background()
  139. r.CMap = map[int]map[int]*redis.Client{}
  140. r.Nodes = []int{}
  141. for i := 0; i < len(addr1); i++ {
  142. r.Nodes = append(r.Nodes, i)
  143. r.CMap[i] = map[int]*redis.Client{}
  144. for k, v := range r.DBS { //按节点的每个库初始化
  145. r.CMap[i][k] = redis.NewClient(&redis.Options{
  146. Addr: addr1[i],
  147. DB: v,
  148. PoolSize: pool[1],
  149. MinIdleConns: pool[0],
  150. IdleTimeout: time.Duration(idle) * time.Second,
  151. })
  152. }
  153. }
  154. }
  155. }
  156. }
  157. log.Println(check, code, addr, dbs, pool, idle, r.DBS)
  158. }
  159. //初始化连接
  160. func (r *GoRedis) init(addr, code string, dbs []int, poolSize, minIdleConns, idleTimeOut int) {
  161. r.Code = code
  162. r.DB = dbs[0]
  163. r.Ctx = context.Background()
  164. r.CMap = map[int]map[int]*redis.Client{}
  165. if len(dbs) > 1 {
  166. r.HashDb = len(dbs) - 1
  167. r.DBS = []int{}
  168. for i := dbs[0]; i <= dbs[1]; i++ {
  169. r.DBS = append(r.DBS, i)
  170. }
  171. } else {
  172. r.DBS = dbs
  173. }
  174. for k, v := range r.DBS { //按节点的每个库初始化
  175. r.CMap[0][k] = redis.NewClient(&redis.Options{
  176. Addr: addr,
  177. DB: v,
  178. PoolSize: poolSize,
  179. MinIdleConns: minIdleConns,
  180. IdleTimeout: time.Duration(idleTimeOut) * time.Second,
  181. })
  182. }
  183. }
  184. //int转timeDuration
  185. func D(t int) time.Duration {
  186. return time.Duration(t) * time.Second
  187. }
  188. //取db
  189. func (r *GoRedis) GetDB(key string) (int, int) {
  190. switch r.HashDb {
  191. case 0:
  192. return 0, r.DB
  193. case 1:
  194. return 0, hashCode(key) % len(r.DBS)
  195. case 2:
  196. return hashCode(key) % len(r.Nodes), r.DB
  197. case 3:
  198. hk := hashCode(key)
  199. return hk % len(r.Nodes), hk % len(r.DBS)
  200. }
  201. // if r.HashDb {
  202. // //return int(key[len(key)-1]) % len(r.DBS)
  203. // return hashCode(key) % len(r.DBS)
  204. // } else {
  205. // return r.DB
  206. // }
  207. return 0, 0
  208. }
  209. //根据key取hash
  210. func hashCode(key string) int {
  211. v := int(crc32.ChecksumIEEE([]byte(key)))
  212. if v < 0 {
  213. v = -v
  214. }
  215. return v
  216. }
  217. //-----具体方法
  218. //简单的Put方法
  219. func (r *GoRedis) Put(key string, val interface{}) (string, error) {
  220. i, k := r.GetDB(key)
  221. stutsCmd := r.CMap[i][k].Set(r.Ctx, key, val, 0)
  222. str, err := stutsCmd.Result()
  223. return str, err
  224. }
  225. //存key,加过期时间
  226. func (r *GoRedis) Set(key string, val interface{}, timeout int) (string, error) {
  227. i, k := r.GetDB(key)
  228. stutsCmd := r.CMap[i][k].Set(r.Ctx, key, val, D(timeout))
  229. //cmd := r.CMap[r.GetDB(key)].Do(r.Ctx, "setex", key, timeout, val)
  230. str, err := stutsCmd.Result()
  231. return str, err
  232. }
  233. //简单的Get方法,无结果返回空串,err: redis: nil
  234. func (r *GoRedis) Get(key string) (string, error) {
  235. i, k := r.GetDB(key)
  236. stutsCmd := r.CMap[i][k].Get(r.Ctx, key)
  237. str, err := stutsCmd.Result()
  238. // log.Println("1", str, "-", err)
  239. if err != nil {
  240. //有备用节点
  241. if r.BakDb {
  242. str, err = r.Bak.Get(key)
  243. }
  244. }
  245. return str, err
  246. }
  247. //根据正则取key,未考虑负载、多库
  248. func (r *GoRedis) GetByPattern(key string) (res []string, err error) {
  249. serr := ""
  250. for _, v1 := range r.CMap {
  251. for _, v := range v1 {
  252. strSlice := v.Keys(r.Ctx, key)
  253. arr, err1 := strSlice.Result()
  254. if len(arr) > 0 {
  255. res = append(res, arr...)
  256. } else if err1 != nil {
  257. serr += err1.Error()
  258. }
  259. }
  260. }
  261. if serr != "" {
  262. err = errors.New(serr)
  263. }
  264. return
  265. }
  266. //批量保存数据
  267. // 不支持- MSet("key1", "value1", "key2", "value2")
  268. // 不支持- MSet([]string{"key1", "value1", "key2", "value2"})
  269. // []interface{[]interface{k1,v1},[]interface{k2,v2},[]interface{k3,v3} }
  270. // - MSet(map[string]interface{}{"key1": "value1", "key2": "value2"})
  271. func (r *GoRedis) BulkPut(timeout int, obj interface{}) {
  272. if r.HashDb > 0 {
  273. if timeout < 1 {
  274. timeout = 0
  275. }
  276. timeEx := time.Duration(timeout) * time.Second
  277. switch obj.(type) {
  278. case []interface{}:
  279. if objs, ok := obj.([]interface{}); ok {
  280. for _, _tmp := range objs {
  281. tmp, ok := _tmp.([]interface{})
  282. if ok && len(tmp) == 2 {
  283. key, ok1 := tmp[0].(string)
  284. if ok1 {
  285. _i, _k := r.GetDB(key)
  286. r.CMap[_i][_k].Set(r.Ctx, key, tmp[1], timeEx)
  287. }
  288. }
  289. }
  290. }
  291. case map[string]interface{}:
  292. if objs, ok := obj.(map[string]interface{}); ok {
  293. for k, v := range objs {
  294. _i, _k := r.GetDB(k)
  295. r.CMap[_i][_k].Set(r.Ctx, k, v, timeEx)
  296. }
  297. }
  298. }
  299. } else {
  300. //单库直接保存
  301. if timeout < 1 {
  302. stutsCmd := r.CMap[0][r.DB].MSet(r.Ctx, obj)
  303. str, err := stutsCmd.Result()
  304. log.Println(str, err)
  305. } else {
  306. timeEx := time.Duration(timeout) * time.Second
  307. //设置超时
  308. lenth := 0
  309. cmds, err := r.CMap[0][r.DB].Pipelined(r.Ctx, func(pipe redis.Pipeliner) error {
  310. if objs, ok := obj.([]interface{}); ok {
  311. lenth = len(objs)
  312. for _, _tmp := range objs {
  313. tmp, ok := _tmp.([]interface{})
  314. if ok && len(tmp) == 2 {
  315. key, ok1 := tmp[0].(string)
  316. if ok1 {
  317. pipe.Set(r.Ctx, key, tmp[1], timeEx)
  318. }
  319. }
  320. }
  321. } else if objs, ok := obj.(map[string]interface{}); ok {
  322. lenth = len(objs)
  323. for k, v := range objs {
  324. pipe.Set(r.Ctx, k, v, timeEx)
  325. }
  326. } else {
  327. log.Println("bulkPut type error")
  328. }
  329. return nil
  330. })
  331. if err != nil {
  332. log.Println("bulkPut error", err.Error())
  333. }
  334. if len(cmds) == lenth { //数据相等
  335. for _, v := range cmds {
  336. log.Println("cmd", v.Args(), v.String())
  337. }
  338. }
  339. }
  340. }
  341. }
  342. //设置超时时间,单位秒
  343. func (r *GoRedis) SetExpire(key string, expire int) error {
  344. i, k := r.GetDB(key)
  345. boolCmd := r.CMap[i][k].Expire(r.Ctx, key, D(expire))
  346. return boolCmd.Err()
  347. }
  348. //判断一个key是否存在
  349. func (r *GoRedis) Exists(key string) bool {
  350. _i, _k := r.GetDB(key)
  351. intCmd := r.CMap[_i][_k].Exists(r.Ctx, key)
  352. i, err := intCmd.Result()
  353. if err != nil {
  354. log.Println("redisutil-exists", key, err)
  355. }
  356. return i == 1
  357. }
  358. //直接返回字节流
  359. func (r *GoRedis) GetBytes(key string) (ret *[]byte, err error) {
  360. i, k := r.GetDB(key)
  361. cmd := r.CMap[i][k].Do(r.Ctx, "GET", key)
  362. res, err := cmd.Result()
  363. log.Println(res)
  364. if err != nil {
  365. log.Println("redisutil-GetBytesError", err)
  366. } else {
  367. if tmp, ok := res.([]byte); ok {
  368. ret = &tmp
  369. } else if tmp, ok := res.(string); ok {
  370. bs := []byte(tmp)
  371. ret = &bs
  372. } else {
  373. err = errors.New("redis返回数据格式不对")
  374. }
  375. }
  376. return
  377. }
  378. //支持删除多个key
  379. func (r *GoRedis) Del(key ...string) (b bool) {
  380. i := 0
  381. if r.HashDb > 0 {
  382. for _, k := range key {
  383. _i, _k := r.GetDB(k)
  384. cmd := r.CMap[_i][_k].Del(r.Ctx, k)
  385. i1, _ := cmd.Result()
  386. i += int(i1)
  387. }
  388. } else {
  389. intCmd := r.CMap[0][r.DB].Del(r.Ctx, key...)
  390. i1, _ := intCmd.Result()
  391. i = int(i1)
  392. }
  393. return i == len(key)
  394. }
  395. //根据代码和前辍key删除多个
  396. func (r *GoRedis) DelByPattern(key string) {
  397. res, _ := r.GetByPattern(key)
  398. if len(res) > 0 {
  399. r.Del(res...)
  400. }
  401. }
  402. //自增计数器
  403. func (r *GoRedis) Incr(key string) (int64, error) {
  404. _i, _k := r.GetDB(key)
  405. intCmd := r.CMap[_i][_k].Incr(r.Ctx, key)
  406. i, err := intCmd.Result()
  407. return i, err
  408. }
  409. //自减
  410. func (r *GoRedis) Decrby(key string, val int) (int64, error) {
  411. _i, _k := r.GetDB(key)
  412. intCmd := r.CMap[_i][_k].DecrBy(r.Ctx, key, int64(val))
  413. i, err := intCmd.Result()
  414. return i, err
  415. }
  416. //批量取多个key
  417. func (r *GoRedis) Mget(key []string) []interface{} {
  418. if r.HashDb > 0 {
  419. mdb := map[int]map[int][]string{}
  420. for _, v := range key { //分组
  421. _i, _k := r.GetDB(v)
  422. mdb[_i] = map[int][]string{}
  423. arr := mdb[_i][_k]
  424. if arr == nil {
  425. arr = []string{v}
  426. } else {
  427. arr = append(arr, v)
  428. }
  429. mdb[_i][_k] = arr
  430. }
  431. res := []interface{}{}
  432. for k, v := range mdb {
  433. for k1, v1 := range v {
  434. sliceCmd := r.CMap[k][k1].MGet(r.Ctx, v1...)
  435. res1, err := sliceCmd.Result()
  436. if err != nil {
  437. log.Println("Mget error", err)
  438. }
  439. res = append(res, res1...)
  440. }
  441. }
  442. return res
  443. } else {
  444. sliceCmd := r.CMap[0][r.DB].MGet(r.Ctx, key...)
  445. res, err := sliceCmd.Result()
  446. if err != nil {
  447. log.Println("Mget error", err)
  448. }
  449. return res
  450. }
  451. }
  452. //取出并删除Key
  453. func (r *GoRedis) Pop(key string) (result interface{}) {
  454. _i, _k := r.GetDB(key)
  455. strCmd := r.CMap[_i][_k].Get(r.Ctx, key)
  456. b, err := strCmd.Bytes()
  457. if err != nil {
  458. log.Println("Poperr bytes", err)
  459. return
  460. }
  461. err1 := json.Unmarshal(b, &result)
  462. if err1 != nil {
  463. log.Println("Poperr json ", err)
  464. return
  465. } else {
  466. go r.CMap[_i][_k].Del(r.Ctx, key)
  467. }
  468. return
  469. }
  470. //list操作
  471. func (r *GoRedis) LPOP(list string) (result interface{}) {
  472. _i, _k := r.GetDB(list)
  473. strCmd := r.CMap[_i][_k].LPop(r.Ctx, list)
  474. b, err := strCmd.Bytes()
  475. if err != nil {
  476. log.Println("LPOP bytes", err)
  477. return
  478. }
  479. err1 := json.Unmarshal(b, &result)
  480. if err1 != nil {
  481. log.Println("LPOP json ", err)
  482. return
  483. }
  484. return
  485. }
  486. //将一个或多个值插入到列表的尾部
  487. func (r *GoRedis) RPUSH(list string, val ...interface{}) bool {
  488. _i, _k := r.GetDB(list)
  489. intCmd := r.CMap[_i][_k].RPush(r.Ctx, list, val...)
  490. i, err := intCmd.Result()
  491. if err != nil {
  492. log.Println("RPUSH bytes", err)
  493. return false
  494. }
  495. return i == 1
  496. }
  497. //获取列表长度
  498. func (r *GoRedis) LLEN(list string) int64 {
  499. _i, _k := r.GetDB(list)
  500. intCmd := r.CMap[_i][_k].LLen(r.Ctx, list)
  501. i, err := intCmd.Result()
  502. if err != nil {
  503. log.Println("RPUSH bytes", err)
  504. }
  505. return i
  506. }