rediscluster.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package goredis
  2. import "strings"
  3. import "strconv"
  4. import "errors"
  5. import "math/rand"
  6. import "os"
  7. import "fmt"
  8. const RedisClusterHashSlots = 16384
  9. const RedisClusterRequestTTL = 16
  10. const RedisClusterDefaultTimeout = 1
  11. type RedisCluster struct {
  12. SeedHosts map[string]bool
  13. Handles map[string]*RedisHandle
  14. Slots map[uint16]string
  15. RefreshTableASAP bool
  16. SingleRedisMode bool
  17. MaxIdle int
  18. MaxActive int
  19. Debug bool
  20. }
  21. func NewRedisCluster(addrs []string, max_idle, max_active int, debug bool) RedisCluster {
  22. cluster := RedisCluster{RefreshTableASAP: false,
  23. SingleRedisMode: false,
  24. SeedHosts: make(map[string]bool),
  25. Handles: make(map[string]*RedisHandle),
  26. Slots: make(map[uint16]string),
  27. MaxActive: max_active,
  28. Debug: debug}
  29. if cluster.Debug {
  30. fmt.Println("[RedisCluster], PID", os.Getpid(), "StartingNewRedisCluster")
  31. }
  32. for _, label := range addrs {
  33. cluster.SeedHosts[label] = true
  34. cluster.Handles[label] = NewRedisHandle(label, max_idle, max_active, debug)
  35. }
  36. for addr, _ := range cluster.SeedHosts {
  37. node, ok := cluster.Handles[addr]
  38. if !ok {
  39. node = NewRedisHandle(addr, cluster.MaxIdle, cluster.MaxActive, cluster.Debug)
  40. cluster.Handles[addr] = node
  41. }
  42. cluster_enabled := cluster.hasClusterEnabled(node)
  43. if cluster_enabled == false {
  44. if len(cluster.SeedHosts) == 1 {
  45. cluster.SingleRedisMode = true
  46. } else {
  47. panic(errors.New("Multiple Seed Hosts Given, But Cluster Support Disabled in Redis"))
  48. }
  49. }
  50. }
  51. if cluster.SingleRedisMode == false {
  52. cluster.populateSlotsCache()
  53. }
  54. return cluster
  55. }
  56. func (self *RedisCluster) Update(max_idle, max_active int32) {
  57. for _, rh := range self.Handles {
  58. rh.Pool.Update(max_idle, max_active)
  59. }
  60. }
  61. func (self *RedisCluster) SetWaitTime(t int) {
  62. for _, rh := range self.Handles {
  63. rh.Pool.SetWaitTime(t)
  64. }
  65. }
  66. func (self *RedisCluster) SetLifeTime(t int) {
  67. for _, rh := range self.Handles {
  68. rh.Pool.SetLifeTime(t)
  69. }
  70. }
  71. func (self *RedisCluster) SetPingTime(t int) {
  72. for _, rh := range self.Handles {
  73. rh.Pool.SetPingTime(t)
  74. }
  75. }
  76. func (self *RedisCluster) TestCluster() error {
  77. for _, rh := range self.Handles {
  78. _, err := rh.Do("CLUSTER", "INFO")
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. return nil
  84. }
  85. func (self *RedisCluster) GetHandle(key string) *RedisHandle {
  86. return self.HandleForKey(key)
  87. }
  88. func (self *RedisCluster) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
  89. return self.SendClusterCommand(cmd, args...)
  90. }
  91. func (self *RedisCluster) hasClusterEnabled(node *RedisHandle) bool {
  92. _, err := node.Do("CLUSTER", "INFO")
  93. if err != nil {
  94. if err.Error() == "ERR This instance has cluster support disabled" ||
  95. err.Error() == "ERR unknown command 'CLUSTER'" {
  96. return false
  97. }
  98. }
  99. return true
  100. }
  101. // contact the startup nodes and try to fetch the hash slots -> instances
  102. // map in order to initialize the Slots map.
  103. func (self *RedisCluster) populateSlotsCache() {
  104. if self.SingleRedisMode == true {
  105. return
  106. }
  107. if self.Debug {
  108. fmt.Println("[RedisCluster], PID", os.Getpid(), "[PopulateSlots Running]")
  109. }
  110. seedHosts := make(map[string]bool)
  111. handles := make(map[string]*RedisHandle)
  112. slotsMap := make(map[uint16]string)
  113. for k, v := range self.SeedHosts {
  114. seedHosts[k] = v
  115. }
  116. for k, v := range self.Handles {
  117. handles[k] = v
  118. }
  119. for k, v := range self.Slots {
  120. slotsMap[k] = v
  121. }
  122. for name, _ := range self.SeedHosts {
  123. if self.Debug {
  124. fmt.Println("[RedisCluster] [PopulateSlots] Checking: ", name)
  125. }
  126. var (
  127. node *RedisHandle
  128. ok bool
  129. )
  130. if node, ok = handles[name]; !ok {
  131. node = NewRedisHandle(name, self.MaxIdle, self.MaxActive, self.Debug)
  132. handles[name] = node
  133. }
  134. cluster_info, err := node.Do("CLUSTER", "NODES")
  135. if err == nil {
  136. lines := strings.Split(string(cluster_info.([]uint8)), "\n")
  137. for _, line := range lines {
  138. if line != "" {
  139. fields := strings.Split(line, " ")
  140. addr := fields[1]
  141. if addr == ":0" {
  142. addr = name
  143. }
  144. // add to seedlist if not in cluster
  145. seedHosts[addr] = true
  146. // add to handles if not in handles
  147. if _, ok := handles[name]; !ok {
  148. handles[name] = NewRedisHandle(addr, self.MaxIdle, self.MaxActive, self.Debug)
  149. }
  150. slots := fields[8:len(fields)]
  151. for _, s_range := range slots {
  152. slot_range := s_range
  153. if slot_range != "[" {
  154. if self.Debug {
  155. fmt.Println("[RedisCluster] Considering Slot Range", slot_range)
  156. }
  157. r_pieces := strings.Split(slot_range, "-")
  158. min, _ := strconv.Atoi(r_pieces[0])
  159. max, _ := strconv.Atoi(r_pieces[1])
  160. for i := min; i <= max; i++ {
  161. slotsMap[uint16(i)] = addr
  162. }
  163. }
  164. }
  165. }
  166. }
  167. if self.Debug {
  168. fmt.Println("[RedisCluster] [Initializing] DONE, ",
  169. "Slots: ", len(slotsMap),
  170. "Handles So Far:", len(handles),
  171. "SeedList:", len(seedHosts))
  172. }
  173. break
  174. }
  175. }
  176. self.SeedHosts = seedHosts
  177. self.Handles = handles
  178. self.Slots = slotsMap
  179. self.switchToSingleModeIfNeeded()
  180. }
  181. func (self *RedisCluster) switchToSingleModeIfNeeded() {
  182. // catch case where we really intend to be on
  183. // single redis mode, but redis was not
  184. // started on time
  185. if len(self.SeedHosts) == 1 &&
  186. len(self.Slots) == 0 &&
  187. len(self.Handles) == 1 {
  188. for _, node := range self.Handles {
  189. cluster_enabled := self.hasClusterEnabled(node)
  190. if cluster_enabled == false {
  191. self.SingleRedisMode = true
  192. }
  193. }
  194. }
  195. }
  196. func (self *RedisCluster) KeyForRequest(cmd string, args ...interface{}) string {
  197. cmd = strings.ToLower(cmd)
  198. if cmd == "info" ||
  199. cmd == "multi" ||
  200. cmd == "exec" ||
  201. cmd == "slaveof" ||
  202. cmd == "config" ||
  203. cmd == "shutdown" {
  204. return ""
  205. }
  206. if args[0] == nil {
  207. return ""
  208. }
  209. strs := args[0].([]interface{})
  210. if strs != nil && strs[0] != nil {
  211. return strs[0].(string)
  212. }
  213. return ""
  214. }
  215. // Return the hash slot from the key.
  216. func (self *RedisCluster) SlotForKey(key string) uint16 {
  217. checksum := ChecksumCRC16([]byte(key))
  218. slot := checksum % RedisClusterHashSlots
  219. return slot
  220. }
  221. func (self *RedisCluster) RandomRedisHandle() *RedisHandle {
  222. if len(self.Handles) == 0 {
  223. return nil
  224. }
  225. addrs := make([]string, len(self.Handles))
  226. i := 0
  227. for addr, _ := range self.Handles {
  228. addrs[i] = addr
  229. i++
  230. }
  231. rand_addrs := make([]string, i)
  232. perm := rand.Perm(i)
  233. for j, v := range perm {
  234. rand_addrs[v] = addrs[j]
  235. }
  236. handle := self.Handles[rand_addrs[0]]
  237. self.switchToSingleModeIfNeeded()
  238. return handle
  239. }
  240. // Given a slot return the link (Redis instance) to the mapped node.
  241. // Make sure to create a connection with the node if we don't have
  242. // one.
  243. func (self *RedisCluster) RedisHandleForSlot(slot uint16) *RedisHandle {
  244. node, exists := self.Slots[slot]
  245. // If we don't know what the mapping is, return a random node.
  246. if !exists {
  247. if self.Debug {
  248. fmt.Println("[RedisCluster] No One Appears Responsible For Slot: ", slot, "our slotsize is: ", len(self.Slots))
  249. }
  250. return self.RandomRedisHandle()
  251. }
  252. r, cx_exists := self.Handles[node]
  253. // add to cluster if not in cluster
  254. if cx_exists {
  255. return r
  256. } else {
  257. r = NewRedisHandle(node, self.MaxIdle, self.MaxActive, self.Debug)
  258. handles := make(map[string]*RedisHandle)
  259. for k, v := range self.Handles {
  260. handles[k] = v
  261. }
  262. handles[node] = r
  263. self.Handles = handles
  264. }
  265. // XXX consider returning random if failure
  266. return r
  267. }
  268. func (self *RedisCluster) disconnectAll() {
  269. if self.Debug {
  270. fmt.Println("[RedisCluster] PID:", os.Getpid(), " [Disconnect!] Had Handles:", len(self.Handles))
  271. }
  272. // disconnect anyone in handles
  273. for _, handle := range self.Handles {
  274. handle.Pool.Close()
  275. }
  276. self.Handles = make(map[string]*RedisHandle)
  277. // nuke slots
  278. self.Slots = make(map[uint16]string)
  279. }
  280. func (self *RedisCluster) handleSingleMode(flush bool, cmd string, args ...interface{}) (reply interface{}, err error) {
  281. for _, handle := range self.Handles {
  282. return handle.Do(cmd, args...)
  283. }
  284. return nil, errors.New("no redis handle found for single mode")
  285. }
  286. func (self *RedisCluster) SendClusterCommand(cmd string, args ...interface{}) (reply interface{}, err error) {
  287. var flush bool = true
  288. // forward onto first redis in the handle
  289. // if we are set to single mode
  290. if self.SingleRedisMode == true {
  291. return self.handleSingleMode(flush, cmd, args...)
  292. }
  293. if self.RefreshTableASAP == true {
  294. if self.Debug {
  295. fmt.Println("[RedisCluster] Refresh Needed")
  296. }
  297. self.disconnectAll()
  298. self.populateSlotsCache()
  299. self.RefreshTableASAP = false
  300. // in case we realized we were now in Single Mode
  301. if self.SingleRedisMode == true {
  302. return self.handleSingleMode(flush, cmd, args...)
  303. }
  304. }
  305. ttl := RedisClusterRequestTTL
  306. key := self.KeyForRequest(cmd, args)
  307. try_random_node := false
  308. asking := false
  309. for {
  310. if ttl <= 0 {
  311. break
  312. }
  313. ttl -= 1
  314. if key == "" {
  315. panic(errors.New("no way to dispatch this type of command to redis cluster"))
  316. }
  317. slot := self.SlotForKey(key)
  318. var redis *RedisHandle
  319. if self.Debug {
  320. fmt.Println("[RedisCluster] slot: ", slot, "key", key, "ttl", ttl)
  321. }
  322. if try_random_node {
  323. if self.Debug {
  324. fmt.Println("[RedisCluster] Trying Random Node")
  325. }
  326. redis = self.RandomRedisHandle()
  327. try_random_node = false
  328. } else {
  329. if self.Debug {
  330. fmt.Println("[RedisCluster] Trying Specific Node")
  331. }
  332. redis = self.RedisHandleForSlot(slot)
  333. }
  334. if redis == nil {
  335. if self.Debug {
  336. fmt.Println("[RedisCluster] could not get redis handle, bailing this round")
  337. }
  338. break
  339. }
  340. if self.Debug {
  341. fmt.Println("[RedisCluster] Got addr: ", redis.Addr)
  342. }
  343. if asking {
  344. if self.Debug {
  345. fmt.Println("ASKING")
  346. }
  347. // conn := redis.GetRedisConn()
  348. redis.Do("ASKING")
  349. // conn.Close()
  350. asking = false
  351. }
  352. var err error
  353. var resp interface{}
  354. if flush {
  355. resp, err = redis.Do(cmd, args...)
  356. if err == nil {
  357. if self.Debug {
  358. fmt.Println("[RedisCluster] Success")
  359. }
  360. return resp, nil
  361. }
  362. } /*else {
  363. err = redis.Send(cmd, args...)
  364. if err == nil {
  365. if self.Debug {
  366. fmt.Println("[RedisCluster] Success")
  367. }
  368. return nil, nil
  369. }
  370. }*/
  371. // ok we are here so err is not nil
  372. errv := strings.Split(err.Error(), " ")
  373. if errv[0] == "MOVED" || errv[0] == "ASK" {
  374. if errv[0] == "ASK" {
  375. if self.Debug {
  376. fmt.Println("[RedisCluster] ASK")
  377. }
  378. asking = true
  379. } else {
  380. // Serve replied with MOVED. It's better for us to
  381. // ask for CLUSTER NODES the next time.
  382. SetRefreshNeeded()
  383. newslot, _ := strconv.Atoi(errv[1])
  384. newaddr := errv[2]
  385. slotsMap := make(map[uint16]string)
  386. for k, v := range self.Slots {
  387. slotsMap[k] = v
  388. }
  389. slotsMap[uint16(newslot)] = newaddr
  390. self.Slots = slotsMap
  391. if self.Debug {
  392. fmt.Println("[RedisCluster] MOVED newaddr: ", newaddr, "new slot: ", newslot, "my slots len: ", len(self.Slots))
  393. }
  394. }
  395. } else {
  396. if self.Debug {
  397. fmt.Println("[RedisCluster] Other Error: ", err.Error())
  398. }
  399. try_random_node = true
  400. }
  401. }
  402. if self.Debug {
  403. fmt.Println("[RedisCluster] Failed Command")
  404. }
  405. return nil, errors.New("could not complete command")
  406. }
  407. //func (self *RedisCluster) Send(cmd string, args ...interface{}) (err error) {
  408. // _, err = self.SendClusterCommand(false, cmd, args...)
  409. // return err
  410. //}
  411. func (self *RedisCluster) SetRefreshNeeded() {
  412. self.RefreshTableASAP = true
  413. }
  414. func (self *RedisCluster) HandleForKey(key string) *RedisHandle {
  415. // forward onto first redis in the handle
  416. // if we are set to single mode
  417. if self.SingleRedisMode == true {
  418. for _, handle := range self.Handles {
  419. return handle
  420. }
  421. }
  422. slot := self.SlotForKey(key)
  423. handle := self.RedisHandleForSlot(slot)
  424. return handle
  425. }
  426. type RedisClusterAccess interface {
  427. Do(commandName string, args ...interface{}) (reply interface{}, err error)
  428. // Send(cmd string, args ...interface{}) (err error)
  429. SetRefreshNeeded()
  430. HandleForKey(key string) *RedisHandle
  431. }
  432. var Instance RedisCluster
  433. func Do(commandName string, args ...interface{}) (reply interface{}, err error) {
  434. return Instance.Do(commandName, args...)
  435. }
  436. //func Send(cmd string, args ...interface{}) (err error) {
  437. // return Instance.Send(cmd, args...)
  438. //}
  439. func SetRefreshNeeded() {
  440. Instance.SetRefreshNeeded()
  441. }
  442. func HandleForKey(key string) *RedisHandle {
  443. return Instance.HandleForKey(key)
  444. }