1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240 |
- // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
- // Use of this source code is governed by a MIT-license.
- // See http://olivere.mit-license.org/license.txt for details.
- package elastic
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- "math/rand"
- "net/http"
- "net/http/httputil"
- "net/url"
- "regexp"
- "strings"
- "sync"
- "time"
- )
- const (
- // Version is the current version of Elastic.
- Version = "2.0.0"
- // DefaultUrl is the default endpoint of Elasticsearch on the local machine.
- // It is used e.g. when initializing a new Client without a specific URL.
- DefaultURL = "http://127.0.0.1:9200"
- // DefaultScheme is the default protocol scheme to use when sniffing
- // the Elasticsearch cluster.
- DefaultScheme = "http"
- // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
- DefaultHealthcheckEnabled = true
- // DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
- // for a response from Elasticsearch on startup, i.e. when creating a
- // client. After the client is started, a shorter timeout is commonly used
- // (its default is specified in DefaultHealthcheckTimeout).
- DefaultHealthcheckTimeoutStartup = 5 * time.Second
- // DefaultHealthcheckTimeout specifies the time a running client waits for
- // a response from Elasticsearch. Notice that the healthcheck timeout
- // when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
- DefaultHealthcheckTimeout = 1 * time.Second
- // DefaultHealthcheckInterval is the default interval between
- // two health checks of the nodes in the cluster.
- DefaultHealthcheckInterval = 60 * time.Second
- // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
- DefaultSnifferEnabled = true
- // DefaultSnifferInterval is the interval between two sniffing procedures,
- // i.e. the lookup of all nodes in the cluster and their addition/removal
- // from the list of actual connections.
- DefaultSnifferInterval = 15 * time.Minute
- // DefaultSnifferTimeoutStartup is the default timeout for the sniffing
- // process that is initiated while creating a new client. For subsequent
- // sniffing processes, DefaultSnifferTimeout is used (by default).
- DefaultSnifferTimeoutStartup = 5 * time.Second
- // DefaultSnifferTimeout is the default timeout after which the
- // sniffing process times out. Notice that for the initial sniffing
- // process, DefaultSnifferTimeoutStartup is used.
- DefaultSnifferTimeout = 2 * time.Second
- // DefaultMaxRetries is the number of retries for a single request after
- // Elastic will give up and return an error. It is zero by default, so
- // retry is disabled by default.
- DefaultMaxRetries = 0
- )
- var (
- // ErrNoClient is raised when no Elasticsearch node is available.
- ErrNoClient = errors.New("no Elasticsearch node available")
- // ErrRetry is raised when a request cannot be executed after the configured
- // number of retries.
- ErrRetry = errors.New("cannot connect after several retries")
- )
- // ClientOptionFunc is a function that configures a Client.
- // It is used in NewClient.
- type ClientOptionFunc func(*Client) error
- // Client is an Elasticsearch client. Create one by calling NewClient.
- type Client struct {
- c *http.Client // net/http Client to use for requests
- connsMu sync.RWMutex // connsMu guards the next block
- conns []*conn // all connections
- cindex int // index into conns
- mu sync.RWMutex // guards the next block
- urls []string // set of URLs passed initially to the client
- running bool // true if the client's background processes are running
- errorlog *log.Logger // error log for critical messages
- infolog *log.Logger // information log for e.g. response times
- tracelog *log.Logger // trace log for debugging
- maxRetries int // max. number of retries
- scheme string // http or https
- healthcheckEnabled bool // healthchecks enabled or disabled
- healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup
- healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch
- healthcheckInterval time.Duration // interval between healthchecks
- healthcheckStop chan bool // notify healthchecker to stop, and notify back
- snifferEnabled bool // sniffer enabled or disabled
- snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup
- snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
- snifferInterval time.Duration // interval between sniffing
- snifferStop chan bool // notify sniffer to stop, and notify back
- decoder Decoder // used to decode data sent from Elasticsearch
- }
- // NewClient creates a new client to work with Elasticsearch.
- //
- // The caller can configure the new client by passing configuration options
- // to the func.
- //
- // Example:
- //
- // client, err := elastic.NewClient(
- // elastic.SetURL("http://localhost:9200", "http://localhost:9201"),
- // elastic.SetMaxRetries(10))
- //
- // If no URL is configured, Elastic uses DefaultURL by default.
- //
- // If the sniffer is enabled (the default), the new client then sniffes
- // the cluster via the Nodes Info API
- // (see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info).
- // It uses the URLs specified by the caller. The caller is responsible
- // to only pass a list of URLs of nodes that belong to the same cluster.
- // This sniffing process is run on startup and periodically.
- // Use SnifferInterval to set the interval between two sniffs (default is
- // 15 minutes). In other words: By default, the client will find new nodes
- // in the cluster and remove those that are no longer available every
- // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
- //
- // The list of nodes found in the sniffing process will be used to make
- // connections to the REST API of Elasticsearch. These nodes are also
- // periodically checked in a shorter time frame. This process is called
- // a health check. By default, a health check is done every 60 seconds.
- // You can set a shorter or longer interval by SetHealthcheckInterval.
- // Disabling health checks is not recommended, but can be done by
- // SetHealthcheck(false).
- //
- // Connections are automatically marked as dead or healthy while
- // making requests to Elasticsearch. When a request fails, Elastic will
- // retry up to a maximum number of retries configured with SetMaxRetries.
- // Retries are disabled by default.
- //
- // If no HttpClient is configured, then http.DefaultClient is used.
- // You can use your own http.Client with some http.Transport for
- // advanced scenarios.
- //
- // An error is also returned when some configuration option is invalid or
- // the new client cannot sniff the cluster (if enabled).
- func NewClient(options ...ClientOptionFunc) (*Client, error) {
- // Set up the client
- c := &Client{
- c: http.DefaultClient,
- conns: make([]*conn, 0),
- cindex: -1,
- scheme: DefaultScheme,
- decoder: &DefaultDecoder{},
- maxRetries: DefaultMaxRetries,
- healthcheckEnabled: DefaultHealthcheckEnabled,
- healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
- healthcheckTimeout: DefaultHealthcheckTimeout,
- healthcheckInterval: DefaultHealthcheckInterval,
- healthcheckStop: make(chan bool),
- snifferEnabled: DefaultSnifferEnabled,
- snifferTimeoutStartup: DefaultSnifferTimeoutStartup,
- snifferTimeout: DefaultSnifferTimeout,
- snifferInterval: DefaultSnifferInterval,
- snifferStop: make(chan bool),
- }
- // Run the options on it
- for _, option := range options {
- if err := option(c); err != nil {
- return nil, err
- }
- }
- if len(c.urls) == 0 {
- c.urls = []string{DefaultURL}
- }
- c.urls = canonicalize(c.urls...)
- if c.snifferEnabled {
- // Sniff the cluster initially
- if err := c.sniff(c.snifferTimeoutStartup); err != nil {
- return nil, err
- }
- } else {
- // Do not sniff the cluster initially. Use the provided URLs instead.
- for _, url := range c.urls {
- c.conns = append(c.conns, newConn(url, url))
- }
- }
- // Perform an initial health check and
- // ensure that we have at least one connection available
- if c.healthcheckEnabled {
- c.healthcheck(c.healthcheckTimeoutStartup, true)
- }
- if err := c.mustActiveConn(); err != nil {
- return nil, err
- }
- if c.snifferEnabled {
- go c.sniffer() // periodically update cluster information
- }
- if c.healthcheckEnabled {
- go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
- }
- c.mu.Lock()
- c.running = true
- c.mu.Unlock()
- return c, nil
- }
- // SetHttpClient can be used to specify the http.Client to use when making
- // HTTP requests to Elasticsearch.
- func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
- return func(c *Client) error {
- if httpClient != nil {
- c.c = httpClient
- } else {
- c.c = http.DefaultClient
- }
- return nil
- }
- }
- // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
- // when sniffing is enabled, these URLs are used to initially sniff the
- // cluster on startup.
- func SetURL(urls ...string) ClientOptionFunc {
- return func(c *Client) error {
- switch len(urls) {
- case 0:
- c.urls = []string{DefaultURL}
- default:
- c.urls = urls
- }
- return nil
- }
- }
- // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
- // This is http by default.
- func SetScheme(scheme string) ClientOptionFunc {
- return func(c *Client) error {
- c.scheme = scheme
- return nil
- }
- }
- // SetSniff enables or disables the sniffer (enabled by default).
- func SetSniff(enabled bool) ClientOptionFunc {
- return func(c *Client) error {
- c.snifferEnabled = enabled
- return nil
- }
- }
- // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
- // when creating a new client. The default is 5 seconds. Notice that the
- // timeout being used for subsequent sniffing processes is set with
- // SetSnifferTimeout.
- func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.snifferTimeoutStartup = timeout
- return nil
- }
- }
- // SetSnifferTimeout sets the timeout for the sniffer that finds the
- // nodes in a cluster. The default is 2 seconds. Notice that the timeout
- // used when creating a new client on startup is usually greater and can
- // be set with SetSnifferTimeoutStartup.
- func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.snifferTimeout = timeout
- return nil
- }
- }
- // SetSnifferInterval sets the interval between two sniffing processes.
- // The default interval is 15 minutes.
- func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.snifferInterval = interval
- return nil
- }
- }
- // SetHealthcheck enables or disables healthchecks (enabled by default).
- func SetHealthcheck(enabled bool) ClientOptionFunc {
- return func(c *Client) error {
- c.healthcheckEnabled = enabled
- return nil
- }
- }
- // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
- // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
- // Notice that timeouts for subsequent health checks can be modified with
- // SetHealthcheckTimeout.
- func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.healthcheckTimeoutStartup = timeout
- return nil
- }
- }
- // SetHealthcheckTimeout sets the timeout for periodic health checks.
- // The default timeout is 1 second (see DefaultHealthcheckTimeout).
- // Notice that a different (usually larger) timeout is used for the initial
- // healthcheck, which is initiated while creating a new client.
- // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
- func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.healthcheckTimeout = timeout
- return nil
- }
- }
- // SetHealthcheckInterval sets the interval between two health checks.
- // The default interval is 60 seconds.
- func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
- return func(c *Client) error {
- c.healthcheckInterval = interval
- return nil
- }
- }
- // SetMaxRetries sets the maximum number of retries before giving up when
- // performing a HTTP request to Elasticsearch.
- func SetMaxRetries(maxRetries int) func(*Client) error {
- return func(c *Client) error {
- if maxRetries < 0 {
- return errors.New("MaxRetries must be greater than or equal to 0")
- }
- c.maxRetries = maxRetries
- return nil
- }
- }
- // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
- // DefaultDecoder is used by default.
- func SetDecoder(decoder Decoder) func(*Client) error {
- return func(c *Client) error {
- if decoder != nil {
- c.decoder = decoder
- } else {
- c.decoder = &DefaultDecoder{}
- }
- return nil
- }
- }
- // SetErrorLog sets the logger for critical messages like nodes joining
- // or leaving the cluster or failing requests. It is nil by default.
- func SetErrorLog(logger *log.Logger) func(*Client) error {
- return func(c *Client) error {
- c.errorlog = logger
- return nil
- }
- }
- // SetInfoLog sets the logger for informational messages, e.g. requests
- // and their response times. It is nil by default.
- func SetInfoLog(logger *log.Logger) func(*Client) error {
- return func(c *Client) error {
- c.infolog = logger
- return nil
- }
- }
- // SetTraceLog specifies the log.Logger to use for output of HTTP requests
- // and responses which is helpful during debugging. It is nil by default.
- func SetTraceLog(logger *log.Logger) func(*Client) error {
- return func(c *Client) error {
- c.tracelog = logger
- return nil
- }
- }
- // String returns a string representation of the client status.
- func (c *Client) String() string {
- c.connsMu.Lock()
- conns := c.conns
- c.connsMu.Unlock()
- var buf bytes.Buffer
- for i, conn := range conns {
- if i > 0 {
- buf.WriteString(", ")
- }
- buf.WriteString(conn.String())
- }
- return buf.String()
- }
- // IsRunning returns true if the background processes of the client are
- // running, false otherwise.
- func (c *Client) IsRunning() bool {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.running
- }
- // Start starts the background processes like sniffing the cluster and
- // periodic health checks. You don't need to run Start when creating a
- // client with NewClient; the background processes are run by default.
- //
- // If the background processes are already running, this is a no-op.
- func (c *Client) Start() {
- c.mu.RLock()
- if c.running {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- if c.snifferEnabled {
- go c.sniffer()
- }
- if c.healthcheckEnabled {
- go c.healthchecker()
- }
- c.mu.Lock()
- c.running = true
- c.mu.Unlock()
- c.infof("elastic: client started")
- }
- // Stop stops the background processes that the client is running,
- // i.e. sniffing the cluster periodically and running health checks
- // on the nodes.
- //
- // If the background processes are not running, this is a no-op.
- func (c *Client) Stop() {
- c.mu.RLock()
- if !c.running {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- if c.healthcheckEnabled {
- c.healthcheckStop <- true
- <-c.healthcheckStop
- }
- if c.snifferEnabled {
- c.snifferStop <- true
- <-c.snifferStop
- }
- c.mu.Lock()
- c.running = false
- c.mu.Unlock()
- c.infof("elastic: client stopped")
- }
- // errorf logs to the error log.
- func (c *Client) errorf(format string, args ...interface{}) {
- if c.errorlog != nil {
- c.errorlog.Printf(format, args...)
- }
- }
- // infof logs informational messages.
- func (c *Client) infof(format string, args ...interface{}) {
- if c.infolog != nil {
- c.infolog.Printf(format, args...)
- }
- }
- // tracef logs to the trace log.
- func (c *Client) tracef(format string, args ...interface{}) {
- if c.tracelog != nil {
- c.tracelog.Printf(format, args...)
- }
- }
- // dumpRequest dumps the given HTTP request to the trace log.
- func (c *Client) dumpRequest(r *http.Request) {
- if c.tracelog != nil {
- out, err := httputil.DumpRequestOut(r, true)
- if err == nil {
- c.tracef("%s\n", string(out))
- }
- }
- }
- // dumpResponse dumps the given HTTP response to the trace log.
- func (c *Client) dumpResponse(resp *http.Response) {
- if c.tracelog != nil {
- out, err := httputil.DumpResponse(resp, true)
- if err == nil {
- c.tracef("%s\n", string(out))
- }
- }
- }
- // sniffer periodically runs sniff.
- func (c *Client) sniffer() {
- c.mu.RLock()
- timeout := c.snifferTimeout
- interval := c.snifferInterval
- c.mu.RUnlock()
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-c.snifferStop:
- // we are asked to stop, so we signal back that we're stopping now
- c.snifferStop <- true
- return
- case <-ticker.C:
- c.sniff(timeout)
- }
- }
- }
- // sniff uses the Node Info API to return the list of nodes in the cluster.
- // It uses the list of URLs passed on startup plus the list of URLs found
- // by the preceding sniffing process (if sniffing is enabled).
- //
- // If sniffing is disabled, this is a no-op.
- func (c *Client) sniff(timeout time.Duration) error {
- c.mu.RLock()
- if !c.snifferEnabled {
- c.mu.RUnlock()
- return nil
- }
- // Use all available URLs provided to sniff the cluster.
- urlsMap := make(map[string]bool)
- urls := make([]string, 0)
- // Add all URLs provided on startup
- for _, url := range c.urls {
- urlsMap[url] = true
- urls = append(urls, url)
- }
- c.mu.RUnlock()
- // Add all URLs found by sniffing
- c.connsMu.RLock()
- for _, conn := range c.conns {
- if !conn.IsDead() {
- url := conn.URL()
- if _, found := urlsMap[url]; !found {
- urls = append(urls, url)
- }
- }
- }
- c.connsMu.RUnlock()
- if len(urls) == 0 {
- return ErrNoClient
- }
- // Start sniffing on all found URLs
- ch := make(chan []*conn, len(urls))
- for _, url := range urls {
- go func(url string) { ch <- c.sniffNode(url) }(url)
- }
- // Wait for the results to come back, or the process times out.
- for {
- select {
- case conns := <-ch:
- if len(conns) > 0 {
- c.updateConns(conns)
- return nil
- }
- case <-time.After(timeout):
- // We get here if no cluster responds in time
- return ErrNoClient
- }
- }
- }
- // reSniffHostAndPort is used to extract hostname and port from a result
- // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
- var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
- // sniffNode sniffs a single node. This method is run as a goroutine
- // in sniff. If successful, it returns the list of node URLs extracted
- // from the result of calling Nodes Info API. Otherwise, an empty array
- // is returned.
- func (c *Client) sniffNode(url string) []*conn {
- nodes := make([]*conn, 0)
- // Call the Nodes Info API at /_nodes/http
- req, err := NewRequest("GET", url+"/_nodes/http")
- if err != nil {
- return nodes
- }
- res, err := c.c.Do((*http.Request)(req))
- if err != nil {
- return nodes
- }
- if res == nil {
- return nodes
- }
- if res.Body != nil {
- defer res.Body.Close()
- }
- var info NodesInfoResponse
- if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
- if len(info.Nodes) > 0 {
- switch c.scheme {
- case "https":
- for nodeID, node := range info.Nodes {
- m := reSniffHostAndPort.FindStringSubmatch(node.HTTPSAddress)
- if len(m) == 3 {
- url := fmt.Sprintf("https://%s:%s", m[1], m[2])
- nodes = append(nodes, newConn(nodeID, url))
- }
- }
- default:
- for nodeID, node := range info.Nodes {
- m := reSniffHostAndPort.FindStringSubmatch(node.HTTPAddress)
- if len(m) == 3 {
- url := fmt.Sprintf("http://%s:%s", m[1], m[2])
- nodes = append(nodes, newConn(nodeID, url))
- }
- }
- }
- }
- }
- return nodes
- }
- // updateConns updates the clients' connections with new information
- // gather by a sniff operation.
- func (c *Client) updateConns(conns []*conn) {
- c.connsMu.Lock()
- newConns := make([]*conn, 0)
- // Build up new connections:
- // If we find an existing connection, use that (including no. of failures etc.).
- // If we find a new connection, add it.
- for _, conn := range conns {
- var found bool
- for _, oldConn := range c.conns {
- if oldConn.NodeID() == conn.NodeID() {
- // Take over the old connection
- newConns = append(newConns, oldConn)
- found = true
- break
- }
- }
- if !found {
- // New connection didn't exist, so add it to our list of new conns.
- c.errorf("elastic: %s joined the cluster", conn.URL())
- newConns = append(newConns, conn)
- }
- }
- c.conns = newConns
- c.cindex = -1
- c.connsMu.Unlock()
- }
- // healthchecker periodically runs healthcheck.
- func (c *Client) healthchecker() {
- c.mu.RLock()
- timeout := c.healthcheckTimeout
- interval := c.healthcheckInterval
- c.mu.RUnlock()
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-c.healthcheckStop:
- // we are asked to stop, so we signal back that we're stopping now
- c.healthcheckStop <- true
- return
- case <-ticker.C:
- c.healthcheck(timeout, false)
- }
- }
- }
- // healthcheck does a health check on all nodes in the cluster. Depending on
- // the node state, it marks connections as dead, sets them alive etc.
- // If healthchecks are disabled and force is false, this is a no-op.
- // The timeout specifies how long to wait for a response from Elasticsearch.
- func (c *Client) healthcheck(timeout time.Duration, force bool) {
- c.mu.RLock()
- if !c.healthcheckEnabled && !force {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- c.connsMu.RLock()
- conns := c.conns
- c.connsMu.RUnlock()
- timeoutInMillis := int64(timeout / time.Millisecond)
- for _, conn := range conns {
- params := make(url.Values)
- params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis))
- req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode())
- if err == nil {
- res, err := c.c.Do((*http.Request)(req))
- if err == nil {
- if res.Body != nil {
- defer res.Body.Close()
- }
- if res.StatusCode >= 200 && res.StatusCode < 300 {
- conn.MarkAsAlive()
- } else {
- conn.MarkAsDead()
- c.errorf("elastic: %s is dead [status=%d]", conn.URL(), res.StatusCode)
- }
- } else {
- c.errorf("elastic: %s is dead", conn.URL())
- conn.MarkAsDead()
- }
- } else {
- c.errorf("elastic: %s is dead", conn.URL())
- conn.MarkAsDead()
- }
- }
- }
- // next returns the next available connection, or ErrNoClient.
- func (c *Client) next() (*conn, error) {
- // We do round-robin here.
- // TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
- c.connsMu.Lock()
- defer c.connsMu.Unlock()
- i := 0
- numConns := len(c.conns)
- for {
- i += 1
- if i > numConns {
- break // we visited all conns: they all seem to be dead
- }
- c.cindex += 1
- if c.cindex >= numConns {
- c.cindex = 0
- }
- conn := c.conns[c.cindex]
- if !conn.IsDead() {
- return conn, nil
- }
- }
- // TODO(oe) As a last resort, we could try to awake a dead connection here.
- // We tried hard, but there is no node available
- return nil, ErrNoClient
- }
- // mustActiveConn returns nil if there is an active connection,
- // otherwise ErrNoClient is returned.
- func (c *Client) mustActiveConn() error {
- c.connsMu.Lock()
- defer c.connsMu.Unlock()
- for _, c := range c.conns {
- if !c.IsDead() {
- return nil
- }
- }
- return ErrNoClient
- }
- // PerformRequest does a HTTP request to Elasticsearch.
- // It returns a response and an error on failure.
- func (c *Client) PerformRequest(method, path string, params url.Values, body interface{}) (*Response, error) {
- start := time.Now().UTC()
- c.mu.RLock()
- timeout := c.healthcheckTimeout
- retries := c.maxRetries
- c.mu.RUnlock()
- var err error
- var conn *conn
- var req *Request
- var resp *Response
- var retried bool
- // We wait between retries, using simple exponential back-off.
- // TODO: Make this configurable, including the jitter.
- retryWaitMsec := int64(100 + (rand.Intn(20) - 10))
- for {
- pathWithParams := path
- if len(params) > 0 {
- pathWithParams += "?" + params.Encode()
- }
- // Get a connection
- conn, err = c.next()
- if err == ErrNoClient {
- if !retried {
- // Force a healtcheck as all connections seem to be dead.
- c.healthcheck(timeout, false)
- }
- retries -= 1
- if retries <= 0 {
- return nil, err
- }
- retried = true
- time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
- retryWaitMsec += retryWaitMsec
- continue // try again
- }
- if err != nil {
- c.errorf("elastic: cannot get connection from pool")
- return nil, err
- }
- req, err = NewRequest(method, conn.URL()+pathWithParams)
- if err != nil {
- c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err)
- return nil, err
- }
- // Set body
- if body != nil {
- switch b := body.(type) {
- case string:
- req.SetBodyString(b)
- break
- default:
- req.SetBodyJson(body)
- break
- }
- }
- // Tracing
- c.dumpRequest((*http.Request)(req))
- // Get response
- res, err := c.c.Do((*http.Request)(req))
- if err != nil {
- retries -= 1
- if retries <= 0 {
- c.errorf("elastic: %s is dead", conn.URL())
- conn.MarkAsDead()
- return nil, err
- }
- retried = true
- time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
- retryWaitMsec += retryWaitMsec
- continue // try again
- }
- if res.Body != nil {
- defer res.Body.Close()
- }
- // Check for errors
- if err := checkResponse(res); err != nil {
- retries -= 1
- if retries <= 0 {
- return nil, err
- }
- retried = true
- time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
- retryWaitMsec += retryWaitMsec
- continue // try again
- }
- // Tracing
- c.dumpResponse(res)
- // We successfully made a request with this connection
- conn.MarkAsHealthy()
- resp, err = c.newResponse(res)
- if err != nil {
- return nil, err
- }
- break
- }
- duration := time.Now().UTC().Sub(start)
- c.infof("%s %s [status:%d, request:%.3fs]",
- strings.ToUpper(method),
- req.URL,
- resp.StatusCode,
- float64(int64(duration/time.Millisecond))/1000)
- return resp, nil
- }
- // ElasticsearchVersion returns the version number of Elasticsearch
- // running on the given URL.
- func (c *Client) ElasticsearchVersion(url string) (string, error) {
- res, _, err := c.Ping().URL(url).Do()
- if err != nil {
- return "", err
- }
- return res.Version.Number, nil
- }
- // IndexNames returns the names of all indices in the cluster.
- func (c *Client) IndexNames() ([]string, error) {
- res, err := c.IndexGetSettings().Index("_all").Do()
- if err != nil {
- return nil, err
- }
- var names []string
- for name, _ := range res {
- names = append(names, name)
- }
- return names, nil
- }
- // Ping checks if a given node in a cluster exists and (optionally)
- // returns some basic information about the Elasticsearch server,
- // e.g. the Elasticsearch version number.
- func (c *Client) Ping() *PingService {
- return NewPingService(c)
- }
- // CreateIndex returns a service to create a new index.
- func (c *Client) CreateIndex(name string) *CreateIndexService {
- builder := NewCreateIndexService(c)
- builder.Index(name)
- return builder
- }
- // DeleteIndex returns a service to delete an index.
- func (c *Client) DeleteIndex(name string) *DeleteIndexService {
- builder := NewDeleteIndexService(c)
- builder.Index(name)
- return builder
- }
- // IndexExists allows to check if an index exists.
- func (c *Client) IndexExists(name string) *IndexExistsService {
- builder := NewIndexExistsService(c)
- builder.Index(name)
- return builder
- }
- // TypeExists allows to check if one or more types exist in one or more indices.
- func (c *Client) TypeExists() *IndicesExistsTypeService {
- return NewIndicesExistsTypeService(c)
- }
- // IndexStats provides statistics on different operations happining
- // in one or more indices.
- func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
- builder := NewIndicesStatsService(c)
- builder = builder.Index(indices...)
- return builder
- }
- // OpenIndex opens an index.
- func (c *Client) OpenIndex(name string) *OpenIndexService {
- builder := NewOpenIndexService(c)
- builder.Index(name)
- return builder
- }
- // CloseIndex closes an index.
- func (c *Client) CloseIndex(name string) *CloseIndexService {
- builder := NewCloseIndexService(c)
- builder.Index(name)
- return builder
- }
- // Index a document.
- func (c *Client) Index() *IndexService {
- builder := NewIndexService(c)
- return builder
- }
- // IndexGet retrieves information about one or more indices.
- // IndexGet is only available for Elasticsearch 1.4 or later.
- func (c *Client) IndexGet() *IndicesGetService {
- builder := NewIndicesGetService(c)
- return builder
- }
- // IndexGetSettings retrieves settings about one or more indices.
- func (c *Client) IndexGetSettings() *IndicesGetSettingsService {
- builder := NewIndicesGetSettingsService(c)
- return builder
- }
- // Update a document.
- func (c *Client) Update() *UpdateService {
- builder := NewUpdateService(c)
- return builder
- }
- // Delete a document.
- func (c *Client) Delete() *DeleteService {
- builder := NewDeleteService(c)
- return builder
- }
- // DeleteByQuery deletes documents as found by a query.
- func (c *Client) DeleteByQuery() *DeleteByQueryService {
- builder := NewDeleteByQueryService(c)
- return builder
- }
- // Get a document.
- func (c *Client) Get() *GetService {
- builder := NewGetService(c)
- return builder
- }
- // MultiGet retrieves multiple documents in one roundtrip.
- func (c *Client) MultiGet() *MultiGetService {
- builder := NewMultiGetService(c)
- return builder
- }
- // Exists checks if a document exists.
- func (c *Client) Exists() *ExistsService {
- builder := NewExistsService(c)
- return builder
- }
- // Count documents.
- func (c *Client) Count(indices ...string) *CountService {
- builder := NewCountService(c)
- builder.Indices(indices...)
- return builder
- }
- // Search is the entry point for searches.
- func (c *Client) Search(indices ...string) *SearchService {
- builder := NewSearchService(c)
- builder.Indices(indices...)
- return builder
- }
- // Percolate allows to send a document and return matching queries.
- // See http://www.elastic.co/guide/en/elasticsearch/reference/current/search-percolate.html.
- func (c *Client) Percolate() *PercolateService {
- builder := NewPercolateService(c)
- return builder
- }
- // MultiSearch is the entry point for multi searches.
- func (c *Client) MultiSearch() *MultiSearchService {
- return NewMultiSearchService(c)
- }
- // Suggest returns a service to return suggestions.
- func (c *Client) Suggest(indices ...string) *SuggestService {
- builder := NewSuggestService(c)
- builder.Indices(indices...)
- return builder
- }
- // Scan through documents. Use this to iterate inside a server process
- // where the results will be processed without returning them to a client.
- func (c *Client) Scan(indices ...string) *ScanService {
- builder := NewScanService(c)
- builder.Indices(indices...)
- return builder
- }
- // Scroll through documents. Use this to efficiently scroll through results
- // while returning the results to a client. Use Scan when you don't need
- // to return requests to a client (i.e. not paginating via request/response).
- func (c *Client) Scroll(indices ...string) *ScrollService {
- builder := NewScrollService(c)
- builder.Indices(indices...)
- return builder
- }
- // ClearScroll can be used to clear search contexts manually.
- func (c *Client) ClearScroll() *ClearScrollService {
- builder := NewClearScrollService(c)
- return builder
- }
- // Optimize asks Elasticsearch to optimize one or more indices.
- func (c *Client) Optimize(indices ...string) *OptimizeService {
- builder := NewOptimizeService(c)
- builder.Indices(indices...)
- return builder
- }
- // Refresh asks Elasticsearch to refresh one or more indices.
- func (c *Client) Refresh(indices ...string) *RefreshService {
- builder := NewRefreshService(c)
- builder.Indices(indices...)
- return builder
- }
- // Flush asks Elasticsearch to free memory from the index and
- // flush data to disk.
- func (c *Client) Flush() *FlushService {
- builder := NewFlushService(c)
- return builder
- }
- // Explain computes a score explanation for a query and a specific document.
- func (c *Client) Explain(index, typ, id string) *ExplainService {
- builder := NewExplainService(c)
- builder = builder.Index(index).Type(typ).Id(id)
- return builder
- }
- // Bulk is the entry point to mass insert/update/delete documents.
- func (c *Client) Bulk() *BulkService {
- builder := NewBulkService(c)
- return builder
- }
- // Alias enables the caller to add and/or remove aliases.
- func (c *Client) Alias() *AliasService {
- builder := NewAliasService(c)
- return builder
- }
- // Aliases returns aliases by index name(s).
- func (c *Client) Aliases() *AliasesService {
- builder := NewAliasesService(c)
- return builder
- }
- // GetTemplate gets a search template.
- // Use IndexXXXTemplate funcs to manage index templates.
- func (c *Client) GetTemplate() *GetTemplateService {
- return NewGetTemplateService(c)
- }
- // PutTemplate creates or updates a search template.
- // Use IndexXXXTemplate funcs to manage index templates.
- func (c *Client) PutTemplate() *PutTemplateService {
- return NewPutTemplateService(c)
- }
- // DeleteTemplate deletes a search template.
- // Use IndexXXXTemplate funcs to manage index templates.
- func (c *Client) DeleteTemplate() *DeleteTemplateService {
- return NewDeleteTemplateService(c)
- }
- // IndexGetTemplate gets an index template.
- // Use XXXTemplate funcs to manage search templates.
- func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
- builder := NewIndicesGetTemplateService(c)
- builder = builder.Name(names...)
- return builder
- }
- // IndexTemplateExists gets check if an index template exists.
- // Use XXXTemplate funcs to manage search templates.
- func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
- builder := NewIndicesExistsTemplateService(c)
- builder = builder.Name(name)
- return builder
- }
- // IndexPutTemplate creates or updates an index template.
- // Use XXXTemplate funcs to manage search templates.
- func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
- builder := NewIndicesPutTemplateService(c)
- builder = builder.Name(name)
- return builder
- }
- // IndexDeleteTemplate deletes an index template.
- // Use XXXTemplate funcs to manage search templates.
- func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
- builder := NewIndicesDeleteTemplateService(c)
- builder = builder.Name(name)
- return builder
- }
- // GetMapping gets a mapping.
- func (c *Client) GetMapping() *GetMappingService {
- return NewGetMappingService(c)
- }
- // PutMapping registers a mapping.
- func (c *Client) PutMapping() *PutMappingService {
- return NewPutMappingService(c)
- }
- // DeleteMapping deletes a mapping.
- func (c *Client) DeleteMapping() *DeleteMappingService {
- return NewDeleteMappingService(c)
- }
- // ClusterHealth retrieves the health of the cluster.
- func (c *Client) ClusterHealth() *ClusterHealthService {
- return NewClusterHealthService(c)
- }
- // ClusterState retrieves the state of the cluster.
- func (c *Client) ClusterState() *ClusterStateService {
- return NewClusterStateService(c)
- }
- // NodesInfo retrieves one or more or all of the cluster nodes information.
- func (c *Client) NodesInfo() *NodesInfoService {
- return NewNodesInfoService(c)
- }
- // Reindex returns a service that will reindex documents from a source
- // index into a target index. See
- // http://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
- // for more information about reindexing.
- func (c *Client) Reindex(sourceIndex, targetIndex string) *Reindexer {
- return NewReindexer(c, sourceIndex, targetIndex)
- }
|