|
@@ -0,0 +1,494 @@
|
|
|
+package goredis
|
|
|
+
|
|
|
+import "strings"
|
|
|
+import "strconv"
|
|
|
+import "errors"
|
|
|
+import "math/rand"
|
|
|
+import "os"
|
|
|
+import "fmt"
|
|
|
+
|
|
|
+const RedisClusterHashSlots = 16384
|
|
|
+const RedisClusterRequestTTL = 16
|
|
|
+const RedisClusterDefaultTimeout = 1
|
|
|
+
|
|
|
+type RedisCluster struct {
|
|
|
+ SeedHosts map[string]bool
|
|
|
+ Handles map[string]*RedisHandle
|
|
|
+ Slots map[uint16]string
|
|
|
+ RefreshTableASAP bool
|
|
|
+ SingleRedisMode bool
|
|
|
+ MaxIdle int
|
|
|
+ MaxActive int
|
|
|
+ Debug bool
|
|
|
+}
|
|
|
+
|
|
|
+func NewRedisCluster(addrs []string, max_idle, max_active int, debug bool) RedisCluster {
|
|
|
+ cluster := RedisCluster{RefreshTableASAP: false,
|
|
|
+ SingleRedisMode: false,
|
|
|
+ SeedHosts: make(map[string]bool),
|
|
|
+ Handles: make(map[string]*RedisHandle),
|
|
|
+ Slots: make(map[uint16]string),
|
|
|
+ MaxActive: max_active,
|
|
|
+ Debug: debug}
|
|
|
+
|
|
|
+ if cluster.Debug {
|
|
|
+ fmt.Println("[RedisCluster], PID", os.Getpid(), "StartingNewRedisCluster")
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, label := range addrs {
|
|
|
+ cluster.SeedHosts[label] = true
|
|
|
+ cluster.Handles[label] = NewRedisHandle(label, max_idle, max_active, debug)
|
|
|
+ }
|
|
|
+
|
|
|
+ for addr, _ := range cluster.SeedHosts {
|
|
|
+ node, ok := cluster.Handles[addr]
|
|
|
+ if !ok {
|
|
|
+ node = NewRedisHandle(addr, cluster.MaxIdle, cluster.MaxActive, cluster.Debug)
|
|
|
+ cluster.Handles[addr] = node
|
|
|
+ }
|
|
|
+
|
|
|
+ cluster_enabled := cluster.hasClusterEnabled(node)
|
|
|
+ if cluster_enabled == false {
|
|
|
+ if len(cluster.SeedHosts) == 1 {
|
|
|
+ cluster.SingleRedisMode = true
|
|
|
+ } else {
|
|
|
+ panic(errors.New("Multiple Seed Hosts Given, But Cluster Support Disabled in Redis"))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if cluster.SingleRedisMode == false {
|
|
|
+ cluster.populateSlotsCache()
|
|
|
+ }
|
|
|
+ return cluster
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) Update(max_idle, max_active int32) {
|
|
|
+ for _, rh := range self.Handles {
|
|
|
+ rh.Pool.Update(max_idle, max_active)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) SetWaitTime(t int) {
|
|
|
+ for _, rh := range self.Handles {
|
|
|
+ rh.Pool.SetWaitTime(t)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) SetLifeTime(t int) {
|
|
|
+ for _, rh := range self.Handles {
|
|
|
+ rh.Pool.SetLifeTime(t)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) SetPingTime(t int) {
|
|
|
+ for _, rh := range self.Handles {
|
|
|
+ rh.Pool.SetPingTime(t)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) TestCluster() error {
|
|
|
+ for _, rh := range self.Handles {
|
|
|
+ _, err := rh.Do("CLUSTER", "INFO")
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) GetHandle(key string) *RedisHandle {
|
|
|
+ return self.HandleForKey(key)
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
|
|
|
+ return self.SendClusterCommand(cmd, args...)
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) hasClusterEnabled(node *RedisHandle) bool {
|
|
|
+ _, err := node.Do("CLUSTER", "INFO")
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() == "ERR This instance has cluster support disabled" ||
|
|
|
+ err.Error() == "ERR unknown command 'CLUSTER'" {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// contact the startup nodes and try to fetch the hash slots -> instances
|
|
|
+// map in order to initialize the Slots map.
|
|
|
+func (self *RedisCluster) populateSlotsCache() {
|
|
|
+ if self.SingleRedisMode == true {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster], PID", os.Getpid(), "[PopulateSlots Running]")
|
|
|
+ }
|
|
|
+ seedHosts := make(map[string]bool)
|
|
|
+ handles := make(map[string]*RedisHandle)
|
|
|
+ slotsMap := make(map[uint16]string)
|
|
|
+ for k, v := range self.SeedHosts {
|
|
|
+ seedHosts[k] = v
|
|
|
+ }
|
|
|
+ for k, v := range self.Handles {
|
|
|
+ handles[k] = v
|
|
|
+ }
|
|
|
+ for k, v := range self.Slots {
|
|
|
+ slotsMap[k] = v
|
|
|
+ }
|
|
|
+ for name, _ := range self.SeedHosts {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] [PopulateSlots] Checking: ", name)
|
|
|
+ }
|
|
|
+ var (
|
|
|
+ node *RedisHandle
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
+ if node, ok = handles[name]; !ok {
|
|
|
+ node = NewRedisHandle(name, self.MaxIdle, self.MaxActive, self.Debug)
|
|
|
+ handles[name] = node
|
|
|
+ }
|
|
|
+ cluster_info, err := node.Do("CLUSTER", "NODES")
|
|
|
+ if err == nil {
|
|
|
+ lines := strings.Split(string(cluster_info.([]uint8)), "\n")
|
|
|
+ for _, line := range lines {
|
|
|
+ if line != "" {
|
|
|
+ fields := strings.Split(line, " ")
|
|
|
+ addr := fields[1]
|
|
|
+ if addr == ":0" {
|
|
|
+ addr = name
|
|
|
+ }
|
|
|
+ // add to seedlist if not in cluster
|
|
|
+ seedHosts[addr] = true
|
|
|
+
|
|
|
+ // add to handles if not in handles
|
|
|
+ if _, ok := handles[name]; !ok {
|
|
|
+ handles[name] = NewRedisHandle(addr, self.MaxIdle, self.MaxActive, self.Debug)
|
|
|
+ }
|
|
|
+
|
|
|
+ slots := fields[8:len(fields)]
|
|
|
+ for _, s_range := range slots {
|
|
|
+ slot_range := s_range
|
|
|
+ if slot_range != "[" {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Considering Slot Range", slot_range)
|
|
|
+ }
|
|
|
+ r_pieces := strings.Split(slot_range, "-")
|
|
|
+ min, _ := strconv.Atoi(r_pieces[0])
|
|
|
+ max, _ := strconv.Atoi(r_pieces[1])
|
|
|
+ for i := min; i <= max; i++ {
|
|
|
+ slotsMap[uint16(i)] = addr
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] [Initializing] DONE, ",
|
|
|
+ "Slots: ", len(slotsMap),
|
|
|
+ "Handles So Far:", len(handles),
|
|
|
+ "SeedList:", len(seedHosts))
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ self.SeedHosts = seedHosts
|
|
|
+ self.Handles = handles
|
|
|
+ self.Slots = slotsMap
|
|
|
+ self.switchToSingleModeIfNeeded()
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) switchToSingleModeIfNeeded() {
|
|
|
+ // catch case where we really intend to be on
|
|
|
+ // single redis mode, but redis was not
|
|
|
+ // started on time
|
|
|
+ if len(self.SeedHosts) == 1 &&
|
|
|
+ len(self.Slots) == 0 &&
|
|
|
+ len(self.Handles) == 1 {
|
|
|
+ for _, node := range self.Handles {
|
|
|
+ cluster_enabled := self.hasClusterEnabled(node)
|
|
|
+ if cluster_enabled == false {
|
|
|
+ self.SingleRedisMode = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) KeyForRequest(cmd string, args ...interface{}) string {
|
|
|
+ cmd = strings.ToLower(cmd)
|
|
|
+ if cmd == "info" ||
|
|
|
+ cmd == "multi" ||
|
|
|
+ cmd == "exec" ||
|
|
|
+ cmd == "slaveof" ||
|
|
|
+ cmd == "config" ||
|
|
|
+ cmd == "shutdown" {
|
|
|
+ return ""
|
|
|
+ }
|
|
|
+ if args[0] == nil {
|
|
|
+ return ""
|
|
|
+ }
|
|
|
+ strs := args[0].([]interface{})
|
|
|
+ if strs != nil && strs[0] != nil {
|
|
|
+ return strs[0].(string)
|
|
|
+ }
|
|
|
+ return ""
|
|
|
+}
|
|
|
+
|
|
|
+// Return the hash slot from the key.
|
|
|
+func (self *RedisCluster) SlotForKey(key string) uint16 {
|
|
|
+ checksum := ChecksumCRC16([]byte(key))
|
|
|
+ slot := checksum % RedisClusterHashSlots
|
|
|
+ return slot
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) RandomRedisHandle() *RedisHandle {
|
|
|
+ if len(self.Handles) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ addrs := make([]string, len(self.Handles))
|
|
|
+ i := 0
|
|
|
+ for addr, _ := range self.Handles {
|
|
|
+ addrs[i] = addr
|
|
|
+ i++
|
|
|
+ }
|
|
|
+ rand_addrs := make([]string, i)
|
|
|
+ perm := rand.Perm(i)
|
|
|
+ for j, v := range perm {
|
|
|
+ rand_addrs[v] = addrs[j]
|
|
|
+ }
|
|
|
+ handle := self.Handles[rand_addrs[0]]
|
|
|
+ self.switchToSingleModeIfNeeded()
|
|
|
+ return handle
|
|
|
+}
|
|
|
+
|
|
|
+// Given a slot return the link (Redis instance) to the mapped node.
|
|
|
+// Make sure to create a connection with the node if we don't have
|
|
|
+// one.
|
|
|
+func (self *RedisCluster) RedisHandleForSlot(slot uint16) *RedisHandle {
|
|
|
+
|
|
|
+ node, exists := self.Slots[slot]
|
|
|
+ // If we don't know what the mapping is, return a random node.
|
|
|
+ if !exists {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] No One Appears Responsible For Slot: ", slot, "our slotsize is: ", len(self.Slots))
|
|
|
+ }
|
|
|
+ return self.RandomRedisHandle()
|
|
|
+ }
|
|
|
+
|
|
|
+ r, cx_exists := self.Handles[node]
|
|
|
+ // add to cluster if not in cluster
|
|
|
+ if cx_exists {
|
|
|
+ return r
|
|
|
+ } else {
|
|
|
+ r = NewRedisHandle(node, self.MaxIdle, self.MaxActive, self.Debug)
|
|
|
+ handles := make(map[string]*RedisHandle)
|
|
|
+ for k, v := range self.Handles {
|
|
|
+ handles[k] = v
|
|
|
+ }
|
|
|
+ handles[node] = r
|
|
|
+ self.Handles = handles
|
|
|
+ }
|
|
|
+ // XXX consider returning random if failure
|
|
|
+ return r
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) disconnectAll() {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] PID:", os.Getpid(), " [Disconnect!] Had Handles:", len(self.Handles))
|
|
|
+ }
|
|
|
+ // disconnect anyone in handles
|
|
|
+ for _, handle := range self.Handles {
|
|
|
+ handle.Pool.Close()
|
|
|
+ }
|
|
|
+ self.Handles = make(map[string]*RedisHandle)
|
|
|
+
|
|
|
+ // nuke slots
|
|
|
+ self.Slots = make(map[uint16]string)
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) handleSingleMode(flush bool, cmd string, args ...interface{}) (reply interface{}, err error) {
|
|
|
+ for _, handle := range self.Handles {
|
|
|
+ return handle.Do(cmd, args...)
|
|
|
+ }
|
|
|
+ return nil, errors.New("no redis handle found for single mode")
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) SendClusterCommand(cmd string, args ...interface{}) (reply interface{}, err error) {
|
|
|
+ var flush bool = true
|
|
|
+ // forward onto first redis in the handle
|
|
|
+ // if we are set to single mode
|
|
|
+ if self.SingleRedisMode == true {
|
|
|
+ return self.handleSingleMode(flush, cmd, args...)
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.RefreshTableASAP == true {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Refresh Needed")
|
|
|
+ }
|
|
|
+ self.disconnectAll()
|
|
|
+ self.populateSlotsCache()
|
|
|
+ self.RefreshTableASAP = false
|
|
|
+ // in case we realized we were now in Single Mode
|
|
|
+ if self.SingleRedisMode == true {
|
|
|
+ return self.handleSingleMode(flush, cmd, args...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ttl := RedisClusterRequestTTL
|
|
|
+ key := self.KeyForRequest(cmd, args)
|
|
|
+ try_random_node := false
|
|
|
+ asking := false
|
|
|
+ for {
|
|
|
+ if ttl <= 0 {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ ttl -= 1
|
|
|
+ if key == "" {
|
|
|
+ panic(errors.New("no way to dispatch this type of command to redis cluster"))
|
|
|
+ }
|
|
|
+ slot := self.SlotForKey(key)
|
|
|
+
|
|
|
+ var redis *RedisHandle
|
|
|
+
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] slot: ", slot, "key", key, "ttl", ttl)
|
|
|
+ }
|
|
|
+
|
|
|
+ if try_random_node {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Trying Random Node")
|
|
|
+ }
|
|
|
+ redis = self.RandomRedisHandle()
|
|
|
+ try_random_node = false
|
|
|
+ } else {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Trying Specific Node")
|
|
|
+ }
|
|
|
+ redis = self.RedisHandleForSlot(slot)
|
|
|
+ }
|
|
|
+
|
|
|
+ if redis == nil {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] could not get redis handle, bailing this round")
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Got addr: ", redis.Addr)
|
|
|
+ }
|
|
|
+
|
|
|
+ if asking {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("ASKING")
|
|
|
+ }
|
|
|
+ // conn := redis.GetRedisConn()
|
|
|
+ redis.Do("ASKING")
|
|
|
+ // conn.Close()
|
|
|
+ asking = false
|
|
|
+ }
|
|
|
+
|
|
|
+ var err error
|
|
|
+ var resp interface{}
|
|
|
+
|
|
|
+ if flush {
|
|
|
+ resp, err = redis.Do(cmd, args...)
|
|
|
+ if err == nil {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Success")
|
|
|
+ }
|
|
|
+ return resp, nil
|
|
|
+ }
|
|
|
+ } /*else {
|
|
|
+ err = redis.Send(cmd, args...)
|
|
|
+ if err == nil {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Success")
|
|
|
+ }
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ }*/
|
|
|
+
|
|
|
+ // ok we are here so err is not nil
|
|
|
+ errv := strings.Split(err.Error(), " ")
|
|
|
+ if errv[0] == "MOVED" || errv[0] == "ASK" {
|
|
|
+ if errv[0] == "ASK" {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] ASK")
|
|
|
+ }
|
|
|
+ asking = true
|
|
|
+ } else {
|
|
|
+ // Serve replied with MOVED. It's better for us to
|
|
|
+ // ask for CLUSTER NODES the next time.
|
|
|
+ SetRefreshNeeded()
|
|
|
+ newslot, _ := strconv.Atoi(errv[1])
|
|
|
+ newaddr := errv[2]
|
|
|
+ slotsMap := make(map[uint16]string)
|
|
|
+ for k, v := range self.Slots {
|
|
|
+ slotsMap[k] = v
|
|
|
+ }
|
|
|
+ slotsMap[uint16(newslot)] = newaddr
|
|
|
+ self.Slots = slotsMap
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] MOVED newaddr: ", newaddr, "new slot: ", newslot, "my slots len: ", len(self.Slots))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Other Error: ", err.Error())
|
|
|
+ }
|
|
|
+ try_random_node = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if self.Debug {
|
|
|
+ fmt.Println("[RedisCluster] Failed Command")
|
|
|
+ }
|
|
|
+ return nil, errors.New("could not complete command")
|
|
|
+}
|
|
|
+
|
|
|
+//func (self *RedisCluster) Send(cmd string, args ...interface{}) (err error) {
|
|
|
+// _, err = self.SendClusterCommand(false, cmd, args...)
|
|
|
+// return err
|
|
|
+//}
|
|
|
+
|
|
|
+func (self *RedisCluster) SetRefreshNeeded() {
|
|
|
+ self.RefreshTableASAP = true
|
|
|
+}
|
|
|
+
|
|
|
+func (self *RedisCluster) HandleForKey(key string) *RedisHandle {
|
|
|
+ // forward onto first redis in the handle
|
|
|
+ // if we are set to single mode
|
|
|
+ if self.SingleRedisMode == true {
|
|
|
+ for _, handle := range self.Handles {
|
|
|
+ return handle
|
|
|
+ }
|
|
|
+ }
|
|
|
+ slot := self.SlotForKey(key)
|
|
|
+ handle := self.RedisHandleForSlot(slot)
|
|
|
+ return handle
|
|
|
+}
|
|
|
+
|
|
|
+type RedisClusterAccess interface {
|
|
|
+ Do(commandName string, args ...interface{}) (reply interface{}, err error)
|
|
|
+ // Send(cmd string, args ...interface{}) (err error)
|
|
|
+ SetRefreshNeeded()
|
|
|
+ HandleForKey(key string) *RedisHandle
|
|
|
+}
|
|
|
+
|
|
|
+var Instance RedisCluster
|
|
|
+
|
|
|
+func Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
|
|
+ return Instance.Do(commandName, args...)
|
|
|
+}
|
|
|
+
|
|
|
+//func Send(cmd string, args ...interface{}) (err error) {
|
|
|
+// return Instance.Send(cmd, args...)
|
|
|
+//}
|
|
|
+
|
|
|
+func SetRefreshNeeded() {
|
|
|
+ Instance.SetRefreshNeeded()
|
|
|
+}
|
|
|
+
|
|
|
+func HandleForKey(key string) *RedisHandle {
|
|
|
+ return Instance.HandleForKey(key)
|
|
|
+}
|