zhangjinkun 7 gadi atpakaļ
vecāks
revīzija
2e6f123618

+ 5 - 7
common/src/github.com/gocql/gocql/.travis.yml

@@ -25,15 +25,13 @@ env:
       AUTH=false
 
 go:
-  - 1.8
-  - 1.9
+  - "1.9"
+  - "1.10"
 
 install:
-  - pip install --user cql PyYAML six
-  - git clone https://github.com/pcmanus/ccm.git
-  - pushd ccm
-  - ./setup.py install --user
-  - popd
+  - ./install_test_deps.sh $TRAVIS_REPO_SLUG
+  - cd ../..
+  - cd gocql/gocql
   - go get .
 
 script:

+ 2 - 1
common/src/github.com/gocql/gocql/AUTHORS

@@ -101,4 +101,5 @@ Sascha Steinbiss <satta@debian.org>
 Seth Rosenblum <seth.t.rosenblum@gmail.com>
 Javier Zunzunegui <javier.zunzunegui.b@gmail.com>
 Luke Hines <lukehines@protonmail.com>
-Zhixin Wen <john.wenzhixin@hotmail.com>
+Zhixin Wen <john.wenzhixin@hotmail.com>
+Chang Liu <changliu.it@gmail.com>

+ 1 - 1
common/src/github.com/gocql/gocql/cassandra_test.go

@@ -1139,7 +1139,7 @@ func TestRebindQueryInfo(t *testing.T) {
 	}
 
 	if value != "w00t" {
-		t.Fatalf("expected %v but got %v", "quux", value)
+		t.Fatalf("expected %v but got %v", "w00t", value)
 	}
 }
 

+ 19 - 17
common/src/github.com/gocql/gocql/cluster.go

@@ -44,23 +44,24 @@ type ClusterConfig struct {
 	// If it is 0 or unset (the default) then the driver will attempt to discover the
 	// highest supported protocol for the cluster. In clusters with nodes of different
 	// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
-	ProtoVersion      int
-	Timeout           time.Duration     // connection timeout (default: 600ms)
-	ConnectTimeout    time.Duration     // initial connection timeout, used during initial dial to server (default: 600ms)
-	Port              int               // port (default: 9042)
-	Keyspace          string            // initial keyspace (optional)
-	NumConns          int               // number of connections per host (default: 2)
-	Consistency       Consistency       // default consistency level (default: Quorum)
-	Compressor        Compressor        // compression algorithm (default: nil)
-	Authenticator     Authenticator     // authenticator (default: nil)
-	RetryPolicy       RetryPolicy       // Default retry policy to use for queries (default: 0)
-	SocketKeepalive   time.Duration     // The keepalive period to use, enabled if > 0 (default: 0)
-	MaxPreparedStmts  int               // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
-	MaxRoutingKeyInfo int               // Sets the maximum cache size for query info about statements for each session (default: 1000)
-	PageSize          int               // Default page size to use for created sessions (default: 5000)
-	SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
-	SslOpts           *SslOptions
-	DefaultTimestamp  bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
+	ProtoVersion       int
+	Timeout            time.Duration      // connection timeout (default: 600ms)
+	ConnectTimeout     time.Duration      // initial connection timeout, used during initial dial to server (default: 600ms)
+	Port               int                // port (default: 9042)
+	Keyspace           string             // initial keyspace (optional)
+	NumConns           int                // number of connections per host (default: 2)
+	Consistency        Consistency        // default consistency level (default: Quorum)
+	Compressor         Compressor         // compression algorithm (default: nil)
+	Authenticator      Authenticator      // authenticator (default: nil)
+	RetryPolicy        RetryPolicy        // Default retry policy to use for queries (default: 0)
+	ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below)
+	SocketKeepalive    time.Duration      // The keepalive period to use, enabled if > 0 (default: 0)
+	MaxPreparedStmts   int                // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	MaxRoutingKeyInfo  int                // Sets the maximum cache size for query info about statements for each session (default: 1000)
+	PageSize           int                // Default page size to use for created sessions (default: 5000)
+	SerialConsistency  SerialConsistency  // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
+	SslOpts            *SslOptions
+	DefaultTimestamp   bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
 	// PoolConfig configures the underlying connection pool, allowing the
 	// configuration of host selection and connection selection policies.
 	PoolConfig PoolConfig
@@ -151,6 +152,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
 		DefaultTimestamp:       true,
 		MaxWaitSchemaAgreement: 60 * time.Second,
 		ReconnectInterval:      60 * time.Second,
+		ReconnectionPolicy:     &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
 	}
 	return cfg
 }

+ 3 - 0
common/src/github.com/gocql/gocql/cluster_test.go

@@ -4,6 +4,7 @@ import (
 	"net"
 	"testing"
 	"time"
+	"reflect"
 )
 
 func TestNewCluster_Defaults(t *testing.T) {
@@ -19,6 +20,8 @@ func TestNewCluster_Defaults(t *testing.T) {
 	assertEqual(t, "cluster config default timestamp", true, cfg.DefaultTimestamp)
 	assertEqual(t, "cluster config max wait schema agreement", 60*time.Second, cfg.MaxWaitSchemaAgreement)
 	assertEqual(t, "cluster config reconnect interval", 60*time.Second, cfg.ReconnectInterval)
+	assertTrue(t, "cluster config reconnection policy",
+		reflect.DeepEqual(&ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, cfg.ReconnectionPolicy))
 }
 
 func TestNewCluster_WithHosts(t *testing.T) {

+ 7 - 2
common/src/github.com/gocql/gocql/connectionpool.go

@@ -497,10 +497,10 @@ func (pool *hostConnPool) connectMany(count int) error {
 func (pool *hostConnPool) connect() (err error) {
 	// TODO: provide a more robust connection retry mechanism, we should also
 	// be able to detect hosts that come up by trying to connect to downed ones.
-	const maxAttempts = 3
 	// try to connect
 	var conn *Conn
-	for i := 0; i < maxAttempts; i++ {
+	reconnectionPolicy := pool.session.cfg.ReconnectionPolicy
+	for i := 0; i < reconnectionPolicy.GetMaxRetries(); i++ {
 		conn, err = pool.session.connect(pool.host, pool)
 		if err == nil {
 			break
@@ -512,6 +512,11 @@ func (pool *hostConnPool) connect() (err error) {
 				break
 			}
 		}
+		if gocqlDebug {
+			Logger.Printf("connection failed %q: %v, reconnecting with %T\n",
+				pool.host.ConnectAddress(), err, reconnectionPolicy)
+		}
+		time.Sleep(reconnectionPolicy.GetInterval(i))
 	}
 
 	if err != nil {

+ 1 - 1
common/src/github.com/gocql/gocql/doc.go

@@ -4,6 +4,6 @@
 
 // Package gocql implements a fast and robust Cassandra driver for the
 // Go programming language.
-package gocql
+package gocql // import "github.com/gocql/gocql"
 
 // TODO(tux21b): write more docs.

+ 7 - 0
common/src/github.com/gocql/gocql/go.mod

@@ -0,0 +1,7 @@
+module "github.com/gocql/gocql"
+
+require (
+	"github.com/golang/snappy" v0.0.0-20170215233205-553a64147049
+	"github.com/hailocab/go-hostpool" v0.0.0-20160125115350-e80d13ce29ed
+	"gopkg.in/inf.v0" v1.9.1-gopkgin-v0.9.1
+)

+ 3 - 0
common/src/github.com/gocql/gocql/go.modverify

@@ -0,0 +1,3 @@
+github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk=
+github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
+gopkg.in/inf.v0 v1.9.1-gopkgin-v0.9.1 h1:v5V5uqBldcybGI9tCBuizXlXYQYLztR7zNxeL/5C3g8=

+ 16 - 0
common/src/github.com/gocql/gocql/install_test_deps.sh

@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+
+# This is not supposed to be an error-prone script; just a convenience.
+
+# Install CCM
+pip install --user cql PyYAML six
+git clone https://github.com/pcmanus/ccm.git
+pushd ccm
+./setup.py install --user
+popd
+
+if [ "$1" != "gocql/gocql" ]; then
+    USER=$(echo $1 | cut -f1 -d'/')
+    cd ../..
+    mv ${USER} gocql
+fi

+ 134 - 8
common/src/github.com/gocql/gocql/policies.go

@@ -128,9 +128,19 @@ func (c *cowHostList) remove(ip net.IP) bool {
 // exposes the correct functions for the retry policy logic to evaluate correctly.
 type RetryableQuery interface {
 	Attempts() int
+	SetConsistency(c Consistency)
 	GetConsistency() Consistency
 }
 
+type RetryType uint16
+
+const (
+	Retry         RetryType = 0x00 // retry on same connection
+	RetryNextHost RetryType = 0x01 // retry on another connection
+	Ignore        RetryType = 0x02 // ignore error and return result
+	Rethrow       RetryType = 0x03 // raise error and stop retrying
+)
+
 // RetryPolicy interface is used by gocql to determine if a query can be attempted
 // again after a retryable error has been received. The interface allows gocql
 // users to implement their own logic to determine if a query can be attempted
@@ -140,6 +150,7 @@ type RetryableQuery interface {
 // interface.
 type RetryPolicy interface {
 	Attempt(RetryableQuery) bool
+	GetRetryType(error) RetryType
 }
 
 // SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
@@ -162,6 +173,10 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
 	return q.Attempts() <= s.NumRetries
 }
 
+func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
+	return RetryNextHost
+}
+
 // ExponentialBackoffRetryPolicy sleeps between attempts
 type ExponentialBackoffRetryPolicy struct {
 	NumRetries int
@@ -176,23 +191,92 @@ func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool {
 	return true
 }
 
-func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
-	if e.Min <= 0 {
-		e.Min = 100 * time.Millisecond
+// used to calculate exponentially growing time
+func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration {
+	if min <= 0 {
+		min = 100 * time.Millisecond
 	}
-	if e.Max <= 0 {
-		e.Max = 10 * time.Second
+	if max <= 0 {
+		max = 10 * time.Second
 	}
-	minFloat := float64(e.Min)
+	minFloat := float64(min)
 	napDuration := minFloat * math.Pow(2, float64(attempts-1))
 	// add some jitter
 	napDuration += rand.Float64()*minFloat - (minFloat / 2)
-	if napDuration > float64(e.Max) {
-		return time.Duration(e.Max)
+	if napDuration > float64(max) {
+		return time.Duration(max)
 	}
 	return time.Duration(napDuration)
 }
 
+func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
+	return RetryNextHost
+}
+
+// DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level
+// provided in the slice
+//
+// On a read timeout: the operation is retried with the next provided consistency
+// level.
+//
+// On a write timeout: if the operation is an :attr:`~.UNLOGGED_BATCH`
+// and at least one replica acknowledged the write, the operation is
+// retried with the next consistency level.  Furthermore, for other
+// write types, if at least one replica acknowledged the write, the
+// timeout is ignored.
+//
+// On an unavailable exception: if at least one replica is alive, the
+// operation is retried with the next provided consistency level.
+
+type DowngradingConsistencyRetryPolicy struct {
+	ConsistencyLevelsToTry []Consistency
+}
+
+func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool {
+	currentAttempt := q.Attempts()
+
+	if currentAttempt > len(d.ConsistencyLevelsToTry) {
+		return false
+	} else if currentAttempt > 0 {
+		q.SetConsistency(d.ConsistencyLevelsToTry[currentAttempt-1])
+		if gocqlDebug {
+			Logger.Printf("%T: set consistency to %q\n",
+				d,
+				d.ConsistencyLevelsToTry[currentAttempt-1])
+		}
+	}
+	return true
+}
+
+func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType {
+	switch t := err.(type) {
+	case *RequestErrUnavailable:
+		if t.Alive > 0 {
+			return Retry
+		}
+		return Rethrow
+	case *RequestErrWriteTimeout:
+		if t.WriteType == "SIMPLE" || t.WriteType == "BATCH" || t.WriteType == "COUNTER" {
+			if t.Received > 0 {
+				return Ignore
+			}
+			return Rethrow
+		}
+		if t.WriteType == "UNLOGGED_BATCH" {
+			return Retry
+		}
+		return Rethrow
+	case *RequestErrReadTimeout:
+		return Retry
+	default:
+		return RetryNextHost
+	}
+}
+
+func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
+	return getExponentialTime(e.Min, e.Max, attempts)
+}
+
 type HostStateNotifier interface {
 	AddHost(host *HostInfo)
 	RemoveHost(host *HostInfo)
@@ -706,3 +790,45 @@ func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
 		return (*selectedHost)(host)
 	}
 }
+
+// ReconnectionPolicy interface is used by gocql to determine if reconnection
+// can be attempted after connection error. The interface allows gocql users
+// to implement their own logic to determine how to attempt reconnection.
+//
+type ReconnectionPolicy interface {
+	GetInterval(currentRetry int) time.Duration
+	GetMaxRetries() int
+}
+
+// ConstantReconnectionPolicy has simple logic for returning a fixed reconnection interval.
+//
+// Examples of usage:
+//
+//     cluster.ReconnectionPolicy = &gocql.ConstantReconnectionPolicy{MaxRetries: 10, Interval: 8 * time.Second}
+//
+type ConstantReconnectionPolicy struct {
+	MaxRetries int
+	Interval   time.Duration
+}
+
+func (c *ConstantReconnectionPolicy) GetInterval(currentRetry int) time.Duration {
+	return c.Interval
+}
+
+func (c *ConstantReconnectionPolicy) GetMaxRetries() int {
+	return c.MaxRetries
+}
+
+// ExponentialReconnectionPolicy returns a growing reconnection interval.
+type ExponentialReconnectionPolicy struct {
+	MaxRetries      int
+	InitialInterval time.Duration
+}
+
+func (e *ExponentialReconnectionPolicy) GetInterval(currentRetry int) time.Duration {
+	return getExponentialTime(e.InitialInterval, math.MaxInt16*time.Second, e.GetMaxRetries())
+}
+
+func (e *ExponentialReconnectionPolicy) GetMaxRetries() int {
+	return e.MaxRetries
+}

+ 59 - 0
common/src/github.com/gocql/gocql/policies_test.go

@@ -302,6 +302,65 @@ func TestExponentialBackoffPolicy(t *testing.T) {
 	}
 }
 
+func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
+
+	q := &Query{cons: LocalQuorum}
+
+	rewt0 := &RequestErrWriteTimeout{
+		Received:  0,
+		WriteType: "SIMPLE",
+	}
+
+	rewt1 := &RequestErrWriteTimeout{
+		Received:  1,
+		WriteType: "BATCH",
+	}
+
+	rewt2 := &RequestErrWriteTimeout{
+		WriteType: "UNLOGGED_BATCH",
+	}
+
+	rert := &RequestErrReadTimeout{}
+
+	reu0 := &RequestErrUnavailable{
+		Alive: 0,
+	}
+
+	reu1 := &RequestErrUnavailable{
+		Alive: 1,
+	}
+
+	// this should allow a total of 3 tries.
+	consistencyLevels := []Consistency{Three, Two, One}
+	rt := &DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: consistencyLevels}
+	cases := []struct {
+		attempts  int
+		allow     bool
+		err       error
+		retryType RetryType
+	}{
+		{0, true, rewt0, Rethrow},
+		{3, true, rewt1, Ignore},
+		{1, true, rewt2, Retry},
+		{2, true, rert, Retry},
+		{4, false, reu0, Rethrow},
+		{16, false, reu1, Retry},
+	}
+
+	for _, c := range cases {
+		q.attempts = c.attempts
+		if c.retryType != rt.GetRetryType(c.err) {
+			t.Fatalf("retry type should be %v", c.retryType)
+		}
+		if c.allow && !rt.Attempt(q) {
+			t.Fatalf("should allow retry after %d attempts", c.attempts)
+		}
+		if !c.allow && rt.Attempt(q) {
+			t.Fatalf("should not allow retry after %d attempts", c.attempts)
+		}
+	}
+}
+
 func TestHostPolicy_DCAwareRR(t *testing.T) {
 	p := DCAwareRoundRobinPolicy("local")
 

+ 26 - 2
common/src/github.com/gocql/gocql/query_executor.go

@@ -50,17 +50,41 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
 		}
 
 		iter = q.attemptQuery(qry, conn)
-
 		// Update host
 		hostResponse.Mark(iter.err)
 
+		if rt == nil {
+			break
+		}
+
+		switch rt.GetRetryType(iter.err) {
+		case Retry:
+			for rt.Attempt(qry) {
+				iter = q.attemptQuery(qry, conn)
+				hostResponse.Mark(iter.err)
+				if iter.err == nil {
+					iter.host = host
+					return iter, nil
+				}
+				if rt.GetRetryType(iter.err) != Retry {
+					break
+				}
+			}
+		case Rethrow:
+			return nil, iter.err
+		case Ignore:
+			return iter, nil
+		case RetryNextHost:
+		default:
+		}
+
 		// Exit for loop if the query was successful
 		if iter.err == nil {
 			iter.host = host
 			return iter, nil
 		}
 
-		if rt == nil || !rt.Attempt(qry) {
+		if !rt.Attempt(qry) {
 			// What do here? Should we just return an error here?
 			break
 		}

+ 15 - 1
common/src/github.com/gocql/gocql/session.go

@@ -709,6 +709,11 @@ func (q *Query) GetConsistency() Consistency {
 	return q.cons
 }
 
+// Same as Consistency but without a return value
+func (q *Query) SetConsistency(c Consistency) {
+	q.cons = c
+}
+
 // Trace enables tracing of this query. Look at the documentation of the
 // Tracer interface to learn more about tracing.
 func (q *Query) Trace(trace Tracer) *Query {
@@ -774,6 +779,9 @@ func (q *Query) execute(conn *Conn) *Iter {
 }
 
 func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter) {
+	if gocqlDebug {
+		Logger.Printf("Attempting query: %d", q.attempts)
+	}
 	q.attempts++
 	q.totalLatency += end.Sub(start).Nanoseconds()
 	// TODO: track latencies per host and things as well instead of just total
@@ -1383,7 +1391,7 @@ func (s *Session) NewBatch(typ BatchType) *Batch {
 		Type:             typ,
 		rt:               s.cfg.RetryPolicy,
 		serialCons:       s.cfg.SerialConsistency,
-		observer: s.batchObserver,
+		observer:         s.batchObserver,
 		Cons:             s.cons,
 		defaultTimestamp: s.cfg.DefaultTimestamp,
 		keyspace:         s.cfg.Keyspace,
@@ -1422,6 +1430,12 @@ func (b *Batch) GetConsistency() Consistency {
 	return b.Cons
 }
 
+// SetConsistency sets the currently configured consistency level for the batch
+// operation.
+func (b *Batch) SetConsistency(c Consistency) {
+	b.Cons = c
+}
+
 // Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})