package goredis import "strings" import "strconv" import "errors" import "math/rand" import "os" import "fmt" const RedisClusterHashSlots = 16384 const RedisClusterRequestTTL = 16 const RedisClusterDefaultTimeout = 1 type RedisCluster struct { SeedHosts map[string]bool Handles map[string]*RedisHandle Slots map[uint16]string RefreshTableASAP bool SingleRedisMode bool MaxIdle int MaxActive int Debug bool } func NewRedisCluster(addrs []string, max_idle, max_active int, debug bool) RedisCluster { cluster := RedisCluster{RefreshTableASAP: false, SingleRedisMode: false, SeedHosts: make(map[string]bool), Handles: make(map[string]*RedisHandle), Slots: make(map[uint16]string), MaxActive: max_active, Debug: debug} if cluster.Debug { fmt.Println("[RedisCluster], PID", os.Getpid(), "StartingNewRedisCluster") } for _, label := range addrs { cluster.SeedHosts[label] = true cluster.Handles[label] = NewRedisHandle(label, max_idle, max_active, debug) } for addr, _ := range cluster.SeedHosts { node, ok := cluster.Handles[addr] if !ok { node = NewRedisHandle(addr, cluster.MaxIdle, cluster.MaxActive, cluster.Debug) cluster.Handles[addr] = node } cluster_enabled := cluster.hasClusterEnabled(node) if cluster_enabled == false { if len(cluster.SeedHosts) == 1 { cluster.SingleRedisMode = true } else { panic(errors.New("Multiple Seed Hosts Given, But Cluster Support Disabled in Redis")) } } } if cluster.SingleRedisMode == false { cluster.populateSlotsCache() } return cluster } func (self *RedisCluster) Update(max_idle, max_active int32) { for _, rh := range self.Handles { rh.Pool.Update(max_idle, max_active) } } func (self *RedisCluster) SetWaitTime(t int) { for _, rh := range self.Handles { rh.Pool.SetWaitTime(t) } } func (self *RedisCluster) SetLifeTime(t int) { for _, rh := range self.Handles { rh.Pool.SetLifeTime(t) } } func (self *RedisCluster) SetPingTime(t int) { for _, rh := range self.Handles { rh.Pool.SetPingTime(t) } } func (self *RedisCluster) TestCluster() error { for _, rh := range self.Handles { _, err := rh.Do("CLUSTER", "INFO") if err != nil { return err } } return nil } func (self *RedisCluster) GetHandle(key string) *RedisHandle { return self.HandleForKey(key) } func (self *RedisCluster) Do(cmd string, args ...interface{}) (reply interface{}, err error) { return self.SendClusterCommand(cmd, args...) } func (self *RedisCluster) hasClusterEnabled(node *RedisHandle) bool { _, err := node.Do("CLUSTER", "INFO") if err != nil { if err.Error() == "ERR This instance has cluster support disabled" || err.Error() == "ERR unknown command 'CLUSTER'" { return false } } return true } // contact the startup nodes and try to fetch the hash slots -> instances // map in order to initialize the Slots map. func (self *RedisCluster) populateSlotsCache() { if self.SingleRedisMode == true { return } if self.Debug { fmt.Println("[RedisCluster], PID", os.Getpid(), "[PopulateSlots Running]") } seedHosts := make(map[string]bool) handles := make(map[string]*RedisHandle) slotsMap := make(map[uint16]string) for k, v := range self.SeedHosts { seedHosts[k] = v } for k, v := range self.Handles { handles[k] = v } for k, v := range self.Slots { slotsMap[k] = v } for name, _ := range self.SeedHosts { if self.Debug { fmt.Println("[RedisCluster] [PopulateSlots] Checking: ", name) } var ( node *RedisHandle ok bool ) if node, ok = handles[name]; !ok { node = NewRedisHandle(name, self.MaxIdle, self.MaxActive, self.Debug) handles[name] = node } cluster_info, err := node.Do("CLUSTER", "NODES") if err == nil { lines := strings.Split(string(cluster_info.([]uint8)), "\n") for _, line := range lines { if line != "" { fields := strings.Split(line, " ") addr := fields[1] if addr == ":0" { addr = name } // add to seedlist if not in cluster seedHosts[addr] = true // add to handles if not in handles if _, ok := handles[name]; !ok { handles[name] = NewRedisHandle(addr, self.MaxIdle, self.MaxActive, self.Debug) } slots := fields[8:len(fields)] for _, s_range := range slots { slot_range := s_range if slot_range != "[" { if self.Debug { fmt.Println("[RedisCluster] Considering Slot Range", slot_range) } r_pieces := strings.Split(slot_range, "-") min, _ := strconv.Atoi(r_pieces[0]) max, _ := strconv.Atoi(r_pieces[1]) for i := min; i <= max; i++ { slotsMap[uint16(i)] = addr } } } } } if self.Debug { fmt.Println("[RedisCluster] [Initializing] DONE, ", "Slots: ", len(slotsMap), "Handles So Far:", len(handles), "SeedList:", len(seedHosts)) } break } } self.SeedHosts = seedHosts self.Handles = handles self.Slots = slotsMap self.switchToSingleModeIfNeeded() } func (self *RedisCluster) switchToSingleModeIfNeeded() { // catch case where we really intend to be on // single redis mode, but redis was not // started on time if len(self.SeedHosts) == 1 && len(self.Slots) == 0 && len(self.Handles) == 1 { for _, node := range self.Handles { cluster_enabled := self.hasClusterEnabled(node) if cluster_enabled == false { self.SingleRedisMode = true } } } } func (self *RedisCluster) KeyForRequest(cmd string, args ...interface{}) string { cmd = strings.ToLower(cmd) if cmd == "info" || cmd == "multi" || cmd == "exec" || cmd == "slaveof" || cmd == "config" || cmd == "shutdown" { return "" } if args[0] == nil { return "" } strs := args[0].([]interface{}) if strs != nil && strs[0] != nil { return strs[0].(string) } return "" } // Return the hash slot from the key. func (self *RedisCluster) SlotForKey(key string) uint16 { checksum := ChecksumCRC16([]byte(key)) slot := checksum % RedisClusterHashSlots return slot } func (self *RedisCluster) RandomRedisHandle() *RedisHandle { if len(self.Handles) == 0 { return nil } addrs := make([]string, len(self.Handles)) i := 0 for addr, _ := range self.Handles { addrs[i] = addr i++ } rand_addrs := make([]string, i) perm := rand.Perm(i) for j, v := range perm { rand_addrs[v] = addrs[j] } handle := self.Handles[rand_addrs[0]] self.switchToSingleModeIfNeeded() return handle } // Given a slot return the link (Redis instance) to the mapped node. // Make sure to create a connection with the node if we don't have // one. func (self *RedisCluster) RedisHandleForSlot(slot uint16) *RedisHandle { node, exists := self.Slots[slot] // If we don't know what the mapping is, return a random node. if !exists { if self.Debug { fmt.Println("[RedisCluster] No One Appears Responsible For Slot: ", slot, "our slotsize is: ", len(self.Slots)) } return self.RandomRedisHandle() } r, cx_exists := self.Handles[node] // add to cluster if not in cluster if cx_exists { return r } else { r = NewRedisHandle(node, self.MaxIdle, self.MaxActive, self.Debug) handles := make(map[string]*RedisHandle) for k, v := range self.Handles { handles[k] = v } handles[node] = r self.Handles = handles } // XXX consider returning random if failure return r } func (self *RedisCluster) disconnectAll() { if self.Debug { fmt.Println("[RedisCluster] PID:", os.Getpid(), " [Disconnect!] Had Handles:", len(self.Handles)) } // disconnect anyone in handles for _, handle := range self.Handles { handle.Pool.Close() } self.Handles = make(map[string]*RedisHandle) // nuke slots self.Slots = make(map[uint16]string) } func (self *RedisCluster) handleSingleMode(flush bool, cmd string, args ...interface{}) (reply interface{}, err error) { for _, handle := range self.Handles { return handle.Do(cmd, args...) } return nil, errors.New("no redis handle found for single mode") } func (self *RedisCluster) SendClusterCommand(cmd string, args ...interface{}) (reply interface{}, err error) { var flush bool = true // forward onto first redis in the handle // if we are set to single mode if self.SingleRedisMode == true { return self.handleSingleMode(flush, cmd, args...) } if self.RefreshTableASAP == true { if self.Debug { fmt.Println("[RedisCluster] Refresh Needed") } self.disconnectAll() self.populateSlotsCache() self.RefreshTableASAP = false // in case we realized we were now in Single Mode if self.SingleRedisMode == true { return self.handleSingleMode(flush, cmd, args...) } } ttl := RedisClusterRequestTTL key := self.KeyForRequest(cmd, args) try_random_node := false asking := false for { if ttl <= 0 { break } ttl -= 1 if key == "" { panic(errors.New("no way to dispatch this type of command to redis cluster")) } slot := self.SlotForKey(key) var redis *RedisHandle if self.Debug { fmt.Println("[RedisCluster] slot: ", slot, "key", key, "ttl", ttl) } if try_random_node { if self.Debug { fmt.Println("[RedisCluster] Trying Random Node") } redis = self.RandomRedisHandle() try_random_node = false } else { if self.Debug { fmt.Println("[RedisCluster] Trying Specific Node") } redis = self.RedisHandleForSlot(slot) } if redis == nil { if self.Debug { fmt.Println("[RedisCluster] could not get redis handle, bailing this round") } break } if self.Debug { fmt.Println("[RedisCluster] Got addr: ", redis.Addr) } if asking { if self.Debug { fmt.Println("ASKING") } // conn := redis.GetRedisConn() redis.Do("ASKING") // conn.Close() asking = false } var err error var resp interface{} if flush { resp, err = redis.Do(cmd, args...) if err == nil { if self.Debug { fmt.Println("[RedisCluster] Success") } return resp, nil } } /*else { err = redis.Send(cmd, args...) if err == nil { if self.Debug { fmt.Println("[RedisCluster] Success") } return nil, nil } }*/ // ok we are here so err is not nil errv := strings.Split(err.Error(), " ") if errv[0] == "MOVED" || errv[0] == "ASK" { if errv[0] == "ASK" { if self.Debug { fmt.Println("[RedisCluster] ASK") } asking = true } else { // Serve replied with MOVED. It's better for us to // ask for CLUSTER NODES the next time. SetRefreshNeeded() newslot, _ := strconv.Atoi(errv[1]) newaddr := errv[2] slotsMap := make(map[uint16]string) for k, v := range self.Slots { slotsMap[k] = v } slotsMap[uint16(newslot)] = newaddr self.Slots = slotsMap if self.Debug { fmt.Println("[RedisCluster] MOVED newaddr: ", newaddr, "new slot: ", newslot, "my slots len: ", len(self.Slots)) } } } else { if self.Debug { fmt.Println("[RedisCluster] Other Error: ", err.Error()) } try_random_node = true } } if self.Debug { fmt.Println("[RedisCluster] Failed Command") } return nil, errors.New("could not complete command") } //func (self *RedisCluster) Send(cmd string, args ...interface{}) (err error) { // _, err = self.SendClusterCommand(false, cmd, args...) // return err //} func (self *RedisCluster) SetRefreshNeeded() { self.RefreshTableASAP = true } func (self *RedisCluster) HandleForKey(key string) *RedisHandle { // forward onto first redis in the handle // if we are set to single mode if self.SingleRedisMode == true { for _, handle := range self.Handles { return handle } } slot := self.SlotForKey(key) handle := self.RedisHandleForSlot(slot) return handle } type RedisClusterAccess interface { Do(commandName string, args ...interface{}) (reply interface{}, err error) // Send(cmd string, args ...interface{}) (err error) SetRefreshNeeded() HandleForKey(key string) *RedisHandle } var Instance RedisCluster func Do(commandName string, args ...interface{}) (reply interface{}, err error) { return Instance.Do(commandName, args...) } //func Send(cmd string, args ...interface{}) (err error) { // return Instance.Send(cmd, args...) //} func SetRefreshNeeded() { Instance.SetRefreshNeeded() } func HandleForKey(key string) *RedisHandle { return Instance.HandleForKey(key) }