client.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "log"
  11. "math/rand"
  12. "net/http"
  13. "net/http/httputil"
  14. "net/url"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. const (
  21. // Version is the current version of Elastic.
  22. Version = "2.0.0"
  23. // DefaultUrl is the default endpoint of Elasticsearch on the local machine.
  24. // It is used e.g. when initializing a new Client without a specific URL.
  25. DefaultURL = "http://127.0.0.1:9200"
  26. // DefaultScheme is the default protocol scheme to use when sniffing
  27. // the Elasticsearch cluster.
  28. DefaultScheme = "http"
  29. // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
  30. DefaultHealthcheckEnabled = true
  31. // DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
  32. // for a response from Elasticsearch on startup, i.e. when creating a
  33. // client. After the client is started, a shorter timeout is commonly used
  34. // (its default is specified in DefaultHealthcheckTimeout).
  35. DefaultHealthcheckTimeoutStartup = 5 * time.Second
  36. // DefaultHealthcheckTimeout specifies the time a running client waits for
  37. // a response from Elasticsearch. Notice that the healthcheck timeout
  38. // when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
  39. DefaultHealthcheckTimeout = 1 * time.Second
  40. // DefaultHealthcheckInterval is the default interval between
  41. // two health checks of the nodes in the cluster.
  42. DefaultHealthcheckInterval = 60 * time.Second
  43. // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
  44. DefaultSnifferEnabled = true
  45. // DefaultSnifferInterval is the interval between two sniffing procedures,
  46. // i.e. the lookup of all nodes in the cluster and their addition/removal
  47. // from the list of actual connections.
  48. DefaultSnifferInterval = 15 * time.Minute
  49. // DefaultSnifferTimeoutStartup is the default timeout for the sniffing
  50. // process that is initiated while creating a new client. For subsequent
  51. // sniffing processes, DefaultSnifferTimeout is used (by default).
  52. DefaultSnifferTimeoutStartup = 5 * time.Second
  53. // DefaultSnifferTimeout is the default timeout after which the
  54. // sniffing process times out. Notice that for the initial sniffing
  55. // process, DefaultSnifferTimeoutStartup is used.
  56. DefaultSnifferTimeout = 2 * time.Second
  57. // DefaultMaxRetries is the number of retries for a single request after
  58. // Elastic will give up and return an error. It is zero by default, so
  59. // retry is disabled by default.
  60. DefaultMaxRetries = 0
  61. )
  62. var (
  63. // ErrNoClient is raised when no Elasticsearch node is available.
  64. ErrNoClient = errors.New("no Elasticsearch node available")
  65. // ErrRetry is raised when a request cannot be executed after the configured
  66. // number of retries.
  67. ErrRetry = errors.New("cannot connect after several retries")
  68. )
  69. // ClientOptionFunc is a function that configures a Client.
  70. // It is used in NewClient.
  71. type ClientOptionFunc func(*Client) error
  72. // Client is an Elasticsearch client. Create one by calling NewClient.
  73. type Client struct {
  74. c *http.Client // net/http Client to use for requests
  75. connsMu sync.RWMutex // connsMu guards the next block
  76. conns []*conn // all connections
  77. cindex int // index into conns
  78. mu sync.RWMutex // guards the next block
  79. urls []string // set of URLs passed initially to the client
  80. running bool // true if the client's background processes are running
  81. errorlog *log.Logger // error log for critical messages
  82. infolog *log.Logger // information log for e.g. response times
  83. tracelog *log.Logger // trace log for debugging
  84. maxRetries int // max. number of retries
  85. scheme string // http or https
  86. healthcheckEnabled bool // healthchecks enabled or disabled
  87. healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup
  88. healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch
  89. healthcheckInterval time.Duration // interval between healthchecks
  90. healthcheckStop chan bool // notify healthchecker to stop, and notify back
  91. snifferEnabled bool // sniffer enabled or disabled
  92. snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup
  93. snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
  94. snifferInterval time.Duration // interval between sniffing
  95. snifferStop chan bool // notify sniffer to stop, and notify back
  96. decoder Decoder // used to decode data sent from Elasticsearch
  97. }
  98. // NewClient creates a new client to work with Elasticsearch.
  99. //
  100. // The caller can configure the new client by passing configuration options
  101. // to the func.
  102. //
  103. // Example:
  104. //
  105. // client, err := elastic.NewClient(
  106. // elastic.SetURL("http://localhost:9200", "http://localhost:9201"),
  107. // elastic.SetMaxRetries(10))
  108. //
  109. // If no URL is configured, Elastic uses DefaultURL by default.
  110. //
  111. // If the sniffer is enabled (the default), the new client then sniffes
  112. // the cluster via the Nodes Info API
  113. // (see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info).
  114. // It uses the URLs specified by the caller. The caller is responsible
  115. // to only pass a list of URLs of nodes that belong to the same cluster.
  116. // This sniffing process is run on startup and periodically.
  117. // Use SnifferInterval to set the interval between two sniffs (default is
  118. // 15 minutes). In other words: By default, the client will find new nodes
  119. // in the cluster and remove those that are no longer available every
  120. // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
  121. //
  122. // The list of nodes found in the sniffing process will be used to make
  123. // connections to the REST API of Elasticsearch. These nodes are also
  124. // periodically checked in a shorter time frame. This process is called
  125. // a health check. By default, a health check is done every 60 seconds.
  126. // You can set a shorter or longer interval by SetHealthcheckInterval.
  127. // Disabling health checks is not recommended, but can be done by
  128. // SetHealthcheck(false).
  129. //
  130. // Connections are automatically marked as dead or healthy while
  131. // making requests to Elasticsearch. When a request fails, Elastic will
  132. // retry up to a maximum number of retries configured with SetMaxRetries.
  133. // Retries are disabled by default.
  134. //
  135. // If no HttpClient is configured, then http.DefaultClient is used.
  136. // You can use your own http.Client with some http.Transport for
  137. // advanced scenarios.
  138. //
  139. // An error is also returned when some configuration option is invalid or
  140. // the new client cannot sniff the cluster (if enabled).
  141. func NewClient(options ...ClientOptionFunc) (*Client, error) {
  142. // Set up the client
  143. c := &Client{
  144. c: http.DefaultClient,
  145. conns: make([]*conn, 0),
  146. cindex: -1,
  147. scheme: DefaultScheme,
  148. decoder: &DefaultDecoder{},
  149. maxRetries: DefaultMaxRetries,
  150. healthcheckEnabled: DefaultHealthcheckEnabled,
  151. healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
  152. healthcheckTimeout: DefaultHealthcheckTimeout,
  153. healthcheckInterval: DefaultHealthcheckInterval,
  154. healthcheckStop: make(chan bool),
  155. snifferEnabled: DefaultSnifferEnabled,
  156. snifferTimeoutStartup: DefaultSnifferTimeoutStartup,
  157. snifferTimeout: DefaultSnifferTimeout,
  158. snifferInterval: DefaultSnifferInterval,
  159. snifferStop: make(chan bool),
  160. }
  161. // Run the options on it
  162. for _, option := range options {
  163. if err := option(c); err != nil {
  164. return nil, err
  165. }
  166. }
  167. if len(c.urls) == 0 {
  168. c.urls = []string{DefaultURL}
  169. }
  170. c.urls = canonicalize(c.urls...)
  171. if c.snifferEnabled {
  172. // Sniff the cluster initially
  173. if err := c.sniff(c.snifferTimeoutStartup); err != nil {
  174. return nil, err
  175. }
  176. } else {
  177. // Do not sniff the cluster initially. Use the provided URLs instead.
  178. for _, url := range c.urls {
  179. c.conns = append(c.conns, newConn(url, url))
  180. }
  181. }
  182. // Perform an initial health check and
  183. // ensure that we have at least one connection available
  184. if c.healthcheckEnabled {
  185. c.healthcheck(c.healthcheckTimeoutStartup, true)
  186. }
  187. if err := c.mustActiveConn(); err != nil {
  188. return nil, err
  189. }
  190. if c.snifferEnabled {
  191. go c.sniffer() // periodically update cluster information
  192. }
  193. if c.healthcheckEnabled {
  194. go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
  195. }
  196. c.mu.Lock()
  197. c.running = true
  198. c.mu.Unlock()
  199. return c, nil
  200. }
  201. // SetHttpClient can be used to specify the http.Client to use when making
  202. // HTTP requests to Elasticsearch.
  203. func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
  204. return func(c *Client) error {
  205. if httpClient != nil {
  206. c.c = httpClient
  207. } else {
  208. c.c = http.DefaultClient
  209. }
  210. return nil
  211. }
  212. }
  213. // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
  214. // when sniffing is enabled, these URLs are used to initially sniff the
  215. // cluster on startup.
  216. func SetURL(urls ...string) ClientOptionFunc {
  217. return func(c *Client) error {
  218. switch len(urls) {
  219. case 0:
  220. c.urls = []string{DefaultURL}
  221. default:
  222. c.urls = urls
  223. }
  224. return nil
  225. }
  226. }
  227. // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
  228. // This is http by default.
  229. func SetScheme(scheme string) ClientOptionFunc {
  230. return func(c *Client) error {
  231. c.scheme = scheme
  232. return nil
  233. }
  234. }
  235. // SetSniff enables or disables the sniffer (enabled by default).
  236. func SetSniff(enabled bool) ClientOptionFunc {
  237. return func(c *Client) error {
  238. c.snifferEnabled = enabled
  239. return nil
  240. }
  241. }
  242. // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
  243. // when creating a new client. The default is 5 seconds. Notice that the
  244. // timeout being used for subsequent sniffing processes is set with
  245. // SetSnifferTimeout.
  246. func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  247. return func(c *Client) error {
  248. c.snifferTimeoutStartup = timeout
  249. return nil
  250. }
  251. }
  252. // SetSnifferTimeout sets the timeout for the sniffer that finds the
  253. // nodes in a cluster. The default is 2 seconds. Notice that the timeout
  254. // used when creating a new client on startup is usually greater and can
  255. // be set with SetSnifferTimeoutStartup.
  256. func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
  257. return func(c *Client) error {
  258. c.snifferTimeout = timeout
  259. return nil
  260. }
  261. }
  262. // SetSnifferInterval sets the interval between two sniffing processes.
  263. // The default interval is 15 minutes.
  264. func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
  265. return func(c *Client) error {
  266. c.snifferInterval = interval
  267. return nil
  268. }
  269. }
  270. // SetHealthcheck enables or disables healthchecks (enabled by default).
  271. func SetHealthcheck(enabled bool) ClientOptionFunc {
  272. return func(c *Client) error {
  273. c.healthcheckEnabled = enabled
  274. return nil
  275. }
  276. }
  277. // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
  278. // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
  279. // Notice that timeouts for subsequent health checks can be modified with
  280. // SetHealthcheckTimeout.
  281. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  282. return func(c *Client) error {
  283. c.healthcheckTimeoutStartup = timeout
  284. return nil
  285. }
  286. }
  287. // SetHealthcheckTimeout sets the timeout for periodic health checks.
  288. // The default timeout is 1 second (see DefaultHealthcheckTimeout).
  289. // Notice that a different (usually larger) timeout is used for the initial
  290. // healthcheck, which is initiated while creating a new client.
  291. // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
  292. func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
  293. return func(c *Client) error {
  294. c.healthcheckTimeout = timeout
  295. return nil
  296. }
  297. }
  298. // SetHealthcheckInterval sets the interval between two health checks.
  299. // The default interval is 60 seconds.
  300. func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
  301. return func(c *Client) error {
  302. c.healthcheckInterval = interval
  303. return nil
  304. }
  305. }
  306. // SetMaxRetries sets the maximum number of retries before giving up when
  307. // performing a HTTP request to Elasticsearch.
  308. func SetMaxRetries(maxRetries int) func(*Client) error {
  309. return func(c *Client) error {
  310. if maxRetries < 0 {
  311. return errors.New("MaxRetries must be greater than or equal to 0")
  312. }
  313. c.maxRetries = maxRetries
  314. return nil
  315. }
  316. }
  317. // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
  318. // DefaultDecoder is used by default.
  319. func SetDecoder(decoder Decoder) func(*Client) error {
  320. return func(c *Client) error {
  321. if decoder != nil {
  322. c.decoder = decoder
  323. } else {
  324. c.decoder = &DefaultDecoder{}
  325. }
  326. return nil
  327. }
  328. }
  329. // SetErrorLog sets the logger for critical messages like nodes joining
  330. // or leaving the cluster or failing requests. It is nil by default.
  331. func SetErrorLog(logger *log.Logger) func(*Client) error {
  332. return func(c *Client) error {
  333. c.errorlog = logger
  334. return nil
  335. }
  336. }
  337. // SetInfoLog sets the logger for informational messages, e.g. requests
  338. // and their response times. It is nil by default.
  339. func SetInfoLog(logger *log.Logger) func(*Client) error {
  340. return func(c *Client) error {
  341. c.infolog = logger
  342. return nil
  343. }
  344. }
  345. // SetTraceLog specifies the log.Logger to use for output of HTTP requests
  346. // and responses which is helpful during debugging. It is nil by default.
  347. func SetTraceLog(logger *log.Logger) func(*Client) error {
  348. return func(c *Client) error {
  349. c.tracelog = logger
  350. return nil
  351. }
  352. }
  353. // String returns a string representation of the client status.
  354. func (c *Client) String() string {
  355. c.connsMu.Lock()
  356. conns := c.conns
  357. c.connsMu.Unlock()
  358. var buf bytes.Buffer
  359. for i, conn := range conns {
  360. if i > 0 {
  361. buf.WriteString(", ")
  362. }
  363. buf.WriteString(conn.String())
  364. }
  365. return buf.String()
  366. }
  367. // IsRunning returns true if the background processes of the client are
  368. // running, false otherwise.
  369. func (c *Client) IsRunning() bool {
  370. c.mu.RLock()
  371. defer c.mu.RUnlock()
  372. return c.running
  373. }
  374. // Start starts the background processes like sniffing the cluster and
  375. // periodic health checks. You don't need to run Start when creating a
  376. // client with NewClient; the background processes are run by default.
  377. //
  378. // If the background processes are already running, this is a no-op.
  379. func (c *Client) Start() {
  380. c.mu.RLock()
  381. if c.running {
  382. c.mu.RUnlock()
  383. return
  384. }
  385. c.mu.RUnlock()
  386. if c.snifferEnabled {
  387. go c.sniffer()
  388. }
  389. if c.healthcheckEnabled {
  390. go c.healthchecker()
  391. }
  392. c.mu.Lock()
  393. c.running = true
  394. c.mu.Unlock()
  395. c.infof("elastic: client started")
  396. }
  397. // Stop stops the background processes that the client is running,
  398. // i.e. sniffing the cluster periodically and running health checks
  399. // on the nodes.
  400. //
  401. // If the background processes are not running, this is a no-op.
  402. func (c *Client) Stop() {
  403. c.mu.RLock()
  404. if !c.running {
  405. c.mu.RUnlock()
  406. return
  407. }
  408. c.mu.RUnlock()
  409. if c.healthcheckEnabled {
  410. c.healthcheckStop <- true
  411. <-c.healthcheckStop
  412. }
  413. if c.snifferEnabled {
  414. c.snifferStop <- true
  415. <-c.snifferStop
  416. }
  417. c.mu.Lock()
  418. c.running = false
  419. c.mu.Unlock()
  420. c.infof("elastic: client stopped")
  421. }
  422. // errorf logs to the error log.
  423. func (c *Client) errorf(format string, args ...interface{}) {
  424. if c.errorlog != nil {
  425. c.errorlog.Printf(format, args...)
  426. }
  427. }
  428. // infof logs informational messages.
  429. func (c *Client) infof(format string, args ...interface{}) {
  430. if c.infolog != nil {
  431. c.infolog.Printf(format, args...)
  432. }
  433. }
  434. // tracef logs to the trace log.
  435. func (c *Client) tracef(format string, args ...interface{}) {
  436. if c.tracelog != nil {
  437. c.tracelog.Printf(format, args...)
  438. }
  439. }
  440. // dumpRequest dumps the given HTTP request to the trace log.
  441. func (c *Client) dumpRequest(r *http.Request) {
  442. if c.tracelog != nil {
  443. out, err := httputil.DumpRequestOut(r, true)
  444. if err == nil {
  445. c.tracef("%s\n", string(out))
  446. }
  447. }
  448. }
  449. // dumpResponse dumps the given HTTP response to the trace log.
  450. func (c *Client) dumpResponse(resp *http.Response) {
  451. if c.tracelog != nil {
  452. out, err := httputil.DumpResponse(resp, true)
  453. if err == nil {
  454. c.tracef("%s\n", string(out))
  455. }
  456. }
  457. }
  458. // sniffer periodically runs sniff.
  459. func (c *Client) sniffer() {
  460. c.mu.RLock()
  461. timeout := c.snifferTimeout
  462. interval := c.snifferInterval
  463. c.mu.RUnlock()
  464. ticker := time.NewTicker(interval)
  465. defer ticker.Stop()
  466. for {
  467. select {
  468. case <-c.snifferStop:
  469. // we are asked to stop, so we signal back that we're stopping now
  470. c.snifferStop <- true
  471. return
  472. case <-ticker.C:
  473. c.sniff(timeout)
  474. }
  475. }
  476. }
  477. // sniff uses the Node Info API to return the list of nodes in the cluster.
  478. // It uses the list of URLs passed on startup plus the list of URLs found
  479. // by the preceding sniffing process (if sniffing is enabled).
  480. //
  481. // If sniffing is disabled, this is a no-op.
  482. func (c *Client) sniff(timeout time.Duration) error {
  483. c.mu.RLock()
  484. if !c.snifferEnabled {
  485. c.mu.RUnlock()
  486. return nil
  487. }
  488. // Use all available URLs provided to sniff the cluster.
  489. urlsMap := make(map[string]bool)
  490. urls := make([]string, 0)
  491. // Add all URLs provided on startup
  492. for _, url := range c.urls {
  493. urlsMap[url] = true
  494. urls = append(urls, url)
  495. }
  496. c.mu.RUnlock()
  497. // Add all URLs found by sniffing
  498. c.connsMu.RLock()
  499. for _, conn := range c.conns {
  500. if !conn.IsDead() {
  501. url := conn.URL()
  502. if _, found := urlsMap[url]; !found {
  503. urls = append(urls, url)
  504. }
  505. }
  506. }
  507. c.connsMu.RUnlock()
  508. if len(urls) == 0 {
  509. return ErrNoClient
  510. }
  511. // Start sniffing on all found URLs
  512. ch := make(chan []*conn, len(urls))
  513. for _, url := range urls {
  514. go func(url string) { ch <- c.sniffNode(url) }(url)
  515. }
  516. // Wait for the results to come back, or the process times out.
  517. for {
  518. select {
  519. case conns := <-ch:
  520. if len(conns) > 0 {
  521. c.updateConns(conns)
  522. return nil
  523. }
  524. case <-time.After(timeout):
  525. // We get here if no cluster responds in time
  526. return ErrNoClient
  527. }
  528. }
  529. }
  530. // reSniffHostAndPort is used to extract hostname and port from a result
  531. // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
  532. var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
  533. // sniffNode sniffs a single node. This method is run as a goroutine
  534. // in sniff. If successful, it returns the list of node URLs extracted
  535. // from the result of calling Nodes Info API. Otherwise, an empty array
  536. // is returned.
  537. func (c *Client) sniffNode(url string) []*conn {
  538. nodes := make([]*conn, 0)
  539. // Call the Nodes Info API at /_nodes/http
  540. req, err := NewRequest("GET", url+"/_nodes/http")
  541. if err != nil {
  542. return nodes
  543. }
  544. res, err := c.c.Do((*http.Request)(req))
  545. if err != nil {
  546. return nodes
  547. }
  548. if res == nil {
  549. return nodes
  550. }
  551. if res.Body != nil {
  552. defer res.Body.Close()
  553. }
  554. var info NodesInfoResponse
  555. if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
  556. if len(info.Nodes) > 0 {
  557. switch c.scheme {
  558. case "https":
  559. for nodeID, node := range info.Nodes {
  560. m := reSniffHostAndPort.FindStringSubmatch(node.HTTPSAddress)
  561. if len(m) == 3 {
  562. url := fmt.Sprintf("https://%s:%s", m[1], m[2])
  563. nodes = append(nodes, newConn(nodeID, url))
  564. }
  565. }
  566. default:
  567. for nodeID, node := range info.Nodes {
  568. m := reSniffHostAndPort.FindStringSubmatch(node.HTTPAddress)
  569. if len(m) == 3 {
  570. url := fmt.Sprintf("http://%s:%s", m[1], m[2])
  571. nodes = append(nodes, newConn(nodeID, url))
  572. }
  573. }
  574. }
  575. }
  576. }
  577. return nodes
  578. }
  579. // updateConns updates the clients' connections with new information
  580. // gather by a sniff operation.
  581. func (c *Client) updateConns(conns []*conn) {
  582. c.connsMu.Lock()
  583. newConns := make([]*conn, 0)
  584. // Build up new connections:
  585. // If we find an existing connection, use that (including no. of failures etc.).
  586. // If we find a new connection, add it.
  587. for _, conn := range conns {
  588. var found bool
  589. for _, oldConn := range c.conns {
  590. if oldConn.NodeID() == conn.NodeID() {
  591. // Take over the old connection
  592. newConns = append(newConns, oldConn)
  593. found = true
  594. break
  595. }
  596. }
  597. if !found {
  598. // New connection didn't exist, so add it to our list of new conns.
  599. c.errorf("elastic: %s joined the cluster", conn.URL())
  600. newConns = append(newConns, conn)
  601. }
  602. }
  603. c.conns = newConns
  604. c.cindex = -1
  605. c.connsMu.Unlock()
  606. }
  607. // healthchecker periodically runs healthcheck.
  608. func (c *Client) healthchecker() {
  609. c.mu.RLock()
  610. timeout := c.healthcheckTimeout
  611. interval := c.healthcheckInterval
  612. c.mu.RUnlock()
  613. ticker := time.NewTicker(interval)
  614. defer ticker.Stop()
  615. for {
  616. select {
  617. case <-c.healthcheckStop:
  618. // we are asked to stop, so we signal back that we're stopping now
  619. c.healthcheckStop <- true
  620. return
  621. case <-ticker.C:
  622. c.healthcheck(timeout, false)
  623. }
  624. }
  625. }
  626. // healthcheck does a health check on all nodes in the cluster. Depending on
  627. // the node state, it marks connections as dead, sets them alive etc.
  628. // If healthchecks are disabled and force is false, this is a no-op.
  629. // The timeout specifies how long to wait for a response from Elasticsearch.
  630. func (c *Client) healthcheck(timeout time.Duration, force bool) {
  631. c.mu.RLock()
  632. if !c.healthcheckEnabled && !force {
  633. c.mu.RUnlock()
  634. return
  635. }
  636. c.mu.RUnlock()
  637. c.connsMu.RLock()
  638. conns := c.conns
  639. c.connsMu.RUnlock()
  640. timeoutInMillis := int64(timeout / time.Millisecond)
  641. for _, conn := range conns {
  642. params := make(url.Values)
  643. params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis))
  644. req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode())
  645. if err == nil {
  646. res, err := c.c.Do((*http.Request)(req))
  647. if err == nil {
  648. if res.Body != nil {
  649. defer res.Body.Close()
  650. }
  651. if res.StatusCode >= 200 && res.StatusCode < 300 {
  652. conn.MarkAsAlive()
  653. } else {
  654. conn.MarkAsDead()
  655. c.errorf("elastic: %s is dead [status=%d]", conn.URL(), res.StatusCode)
  656. }
  657. } else {
  658. c.errorf("elastic: %s is dead", conn.URL())
  659. conn.MarkAsDead()
  660. }
  661. } else {
  662. c.errorf("elastic: %s is dead", conn.URL())
  663. conn.MarkAsDead()
  664. }
  665. }
  666. }
  667. // next returns the next available connection, or ErrNoClient.
  668. func (c *Client) next() (*conn, error) {
  669. // We do round-robin here.
  670. // TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
  671. c.connsMu.Lock()
  672. defer c.connsMu.Unlock()
  673. i := 0
  674. numConns := len(c.conns)
  675. for {
  676. i += 1
  677. if i > numConns {
  678. break // we visited all conns: they all seem to be dead
  679. }
  680. c.cindex += 1
  681. if c.cindex >= numConns {
  682. c.cindex = 0
  683. }
  684. conn := c.conns[c.cindex]
  685. if !conn.IsDead() {
  686. return conn, nil
  687. }
  688. }
  689. // TODO(oe) As a last resort, we could try to awake a dead connection here.
  690. // We tried hard, but there is no node available
  691. return nil, ErrNoClient
  692. }
  693. // mustActiveConn returns nil if there is an active connection,
  694. // otherwise ErrNoClient is returned.
  695. func (c *Client) mustActiveConn() error {
  696. c.connsMu.Lock()
  697. defer c.connsMu.Unlock()
  698. for _, c := range c.conns {
  699. if !c.IsDead() {
  700. return nil
  701. }
  702. }
  703. return ErrNoClient
  704. }
  705. // PerformRequest does a HTTP request to Elasticsearch.
  706. // It returns a response and an error on failure.
  707. func (c *Client) PerformRequest(method, path string, params url.Values, body interface{}) (*Response, error) {
  708. start := time.Now().UTC()
  709. c.mu.RLock()
  710. timeout := c.healthcheckTimeout
  711. retries := c.maxRetries
  712. c.mu.RUnlock()
  713. var err error
  714. var conn *conn
  715. var req *Request
  716. var resp *Response
  717. var retried bool
  718. // We wait between retries, using simple exponential back-off.
  719. // TODO: Make this configurable, including the jitter.
  720. retryWaitMsec := int64(100 + (rand.Intn(20) - 10))
  721. for {
  722. pathWithParams := path
  723. if len(params) > 0 {
  724. pathWithParams += "?" + params.Encode()
  725. }
  726. // Get a connection
  727. conn, err = c.next()
  728. if err == ErrNoClient {
  729. if !retried {
  730. // Force a healtcheck as all connections seem to be dead.
  731. c.healthcheck(timeout, false)
  732. }
  733. retries -= 1
  734. if retries <= 0 {
  735. return nil, err
  736. }
  737. retried = true
  738. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  739. retryWaitMsec += retryWaitMsec
  740. continue // try again
  741. }
  742. if err != nil {
  743. c.errorf("elastic: cannot get connection from pool")
  744. return nil, err
  745. }
  746. req, err = NewRequest(method, conn.URL()+pathWithParams)
  747. if err != nil {
  748. c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err)
  749. return nil, err
  750. }
  751. // Set body
  752. if body != nil {
  753. switch b := body.(type) {
  754. case string:
  755. req.SetBodyString(b)
  756. break
  757. default:
  758. req.SetBodyJson(body)
  759. break
  760. }
  761. }
  762. // Tracing
  763. c.dumpRequest((*http.Request)(req))
  764. // Get response
  765. res, err := c.c.Do((*http.Request)(req))
  766. if err != nil {
  767. retries -= 1
  768. if retries <= 0 {
  769. c.errorf("elastic: %s is dead", conn.URL())
  770. conn.MarkAsDead()
  771. return nil, err
  772. }
  773. retried = true
  774. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  775. retryWaitMsec += retryWaitMsec
  776. continue // try again
  777. }
  778. if res.Body != nil {
  779. defer res.Body.Close()
  780. }
  781. // Check for errors
  782. if err := checkResponse(res); err != nil {
  783. retries -= 1
  784. if retries <= 0 {
  785. return nil, err
  786. }
  787. retried = true
  788. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  789. retryWaitMsec += retryWaitMsec
  790. continue // try again
  791. }
  792. // Tracing
  793. c.dumpResponse(res)
  794. // We successfully made a request with this connection
  795. conn.MarkAsHealthy()
  796. resp, err = c.newResponse(res)
  797. if err != nil {
  798. return nil, err
  799. }
  800. break
  801. }
  802. duration := time.Now().UTC().Sub(start)
  803. c.infof("%s %s [status:%d, request:%.3fs]",
  804. strings.ToUpper(method),
  805. req.URL,
  806. resp.StatusCode,
  807. float64(int64(duration/time.Millisecond))/1000)
  808. return resp, nil
  809. }
  810. // ElasticsearchVersion returns the version number of Elasticsearch
  811. // running on the given URL.
  812. func (c *Client) ElasticsearchVersion(url string) (string, error) {
  813. res, _, err := c.Ping().URL(url).Do()
  814. if err != nil {
  815. return "", err
  816. }
  817. return res.Version.Number, nil
  818. }
  819. // IndexNames returns the names of all indices in the cluster.
  820. func (c *Client) IndexNames() ([]string, error) {
  821. res, err := c.IndexGetSettings().Index("_all").Do()
  822. if err != nil {
  823. return nil, err
  824. }
  825. var names []string
  826. for name, _ := range res {
  827. names = append(names, name)
  828. }
  829. return names, nil
  830. }
  831. // Ping checks if a given node in a cluster exists and (optionally)
  832. // returns some basic information about the Elasticsearch server,
  833. // e.g. the Elasticsearch version number.
  834. func (c *Client) Ping() *PingService {
  835. return NewPingService(c)
  836. }
  837. // CreateIndex returns a service to create a new index.
  838. func (c *Client) CreateIndex(name string) *CreateIndexService {
  839. builder := NewCreateIndexService(c)
  840. builder.Index(name)
  841. return builder
  842. }
  843. // DeleteIndex returns a service to delete an index.
  844. func (c *Client) DeleteIndex(name string) *DeleteIndexService {
  845. builder := NewDeleteIndexService(c)
  846. builder.Index(name)
  847. return builder
  848. }
  849. // IndexExists allows to check if an index exists.
  850. func (c *Client) IndexExists(name string) *IndexExistsService {
  851. builder := NewIndexExistsService(c)
  852. builder.Index(name)
  853. return builder
  854. }
  855. // TypeExists allows to check if one or more types exist in one or more indices.
  856. func (c *Client) TypeExists() *IndicesExistsTypeService {
  857. return NewIndicesExistsTypeService(c)
  858. }
  859. // IndexStats provides statistics on different operations happining
  860. // in one or more indices.
  861. func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
  862. builder := NewIndicesStatsService(c)
  863. builder = builder.Index(indices...)
  864. return builder
  865. }
  866. // OpenIndex opens an index.
  867. func (c *Client) OpenIndex(name string) *OpenIndexService {
  868. builder := NewOpenIndexService(c)
  869. builder.Index(name)
  870. return builder
  871. }
  872. // CloseIndex closes an index.
  873. func (c *Client) CloseIndex(name string) *CloseIndexService {
  874. builder := NewCloseIndexService(c)
  875. builder.Index(name)
  876. return builder
  877. }
  878. // Index a document.
  879. func (c *Client) Index() *IndexService {
  880. builder := NewIndexService(c)
  881. return builder
  882. }
  883. // IndexGet retrieves information about one or more indices.
  884. // IndexGet is only available for Elasticsearch 1.4 or later.
  885. func (c *Client) IndexGet() *IndicesGetService {
  886. builder := NewIndicesGetService(c)
  887. return builder
  888. }
  889. // IndexGetSettings retrieves settings about one or more indices.
  890. func (c *Client) IndexGetSettings() *IndicesGetSettingsService {
  891. builder := NewIndicesGetSettingsService(c)
  892. return builder
  893. }
  894. // Update a document.
  895. func (c *Client) Update() *UpdateService {
  896. builder := NewUpdateService(c)
  897. return builder
  898. }
  899. // Delete a document.
  900. func (c *Client) Delete() *DeleteService {
  901. builder := NewDeleteService(c)
  902. return builder
  903. }
  904. // DeleteByQuery deletes documents as found by a query.
  905. func (c *Client) DeleteByQuery() *DeleteByQueryService {
  906. builder := NewDeleteByQueryService(c)
  907. return builder
  908. }
  909. // Get a document.
  910. func (c *Client) Get() *GetService {
  911. builder := NewGetService(c)
  912. return builder
  913. }
  914. // MultiGet retrieves multiple documents in one roundtrip.
  915. func (c *Client) MultiGet() *MultiGetService {
  916. builder := NewMultiGetService(c)
  917. return builder
  918. }
  919. // Exists checks if a document exists.
  920. func (c *Client) Exists() *ExistsService {
  921. builder := NewExistsService(c)
  922. return builder
  923. }
  924. // Count documents.
  925. func (c *Client) Count(indices ...string) *CountService {
  926. builder := NewCountService(c)
  927. builder.Indices(indices...)
  928. return builder
  929. }
  930. // Search is the entry point for searches.
  931. func (c *Client) Search(indices ...string) *SearchService {
  932. builder := NewSearchService(c)
  933. builder.Indices(indices...)
  934. return builder
  935. }
  936. // Percolate allows to send a document and return matching queries.
  937. // See http://www.elastic.co/guide/en/elasticsearch/reference/current/search-percolate.html.
  938. func (c *Client) Percolate() *PercolateService {
  939. builder := NewPercolateService(c)
  940. return builder
  941. }
  942. // MultiSearch is the entry point for multi searches.
  943. func (c *Client) MultiSearch() *MultiSearchService {
  944. return NewMultiSearchService(c)
  945. }
  946. // Suggest returns a service to return suggestions.
  947. func (c *Client) Suggest(indices ...string) *SuggestService {
  948. builder := NewSuggestService(c)
  949. builder.Indices(indices...)
  950. return builder
  951. }
  952. // Scan through documents. Use this to iterate inside a server process
  953. // where the results will be processed without returning them to a client.
  954. func (c *Client) Scan(indices ...string) *ScanService {
  955. builder := NewScanService(c)
  956. builder.Indices(indices...)
  957. return builder
  958. }
  959. // Scroll through documents. Use this to efficiently scroll through results
  960. // while returning the results to a client. Use Scan when you don't need
  961. // to return requests to a client (i.e. not paginating via request/response).
  962. func (c *Client) Scroll(indices ...string) *ScrollService {
  963. builder := NewScrollService(c)
  964. builder.Indices(indices...)
  965. return builder
  966. }
  967. // ClearScroll can be used to clear search contexts manually.
  968. func (c *Client) ClearScroll() *ClearScrollService {
  969. builder := NewClearScrollService(c)
  970. return builder
  971. }
  972. // Optimize asks Elasticsearch to optimize one or more indices.
  973. func (c *Client) Optimize(indices ...string) *OptimizeService {
  974. builder := NewOptimizeService(c)
  975. builder.Indices(indices...)
  976. return builder
  977. }
  978. // Refresh asks Elasticsearch to refresh one or more indices.
  979. func (c *Client) Refresh(indices ...string) *RefreshService {
  980. builder := NewRefreshService(c)
  981. builder.Indices(indices...)
  982. return builder
  983. }
  984. // Flush asks Elasticsearch to free memory from the index and
  985. // flush data to disk.
  986. func (c *Client) Flush() *FlushService {
  987. builder := NewFlushService(c)
  988. return builder
  989. }
  990. // Explain computes a score explanation for a query and a specific document.
  991. func (c *Client) Explain(index, typ, id string) *ExplainService {
  992. builder := NewExplainService(c)
  993. builder = builder.Index(index).Type(typ).Id(id)
  994. return builder
  995. }
  996. // Bulk is the entry point to mass insert/update/delete documents.
  997. func (c *Client) Bulk() *BulkService {
  998. builder := NewBulkService(c)
  999. return builder
  1000. }
  1001. // Alias enables the caller to add and/or remove aliases.
  1002. func (c *Client) Alias() *AliasService {
  1003. builder := NewAliasService(c)
  1004. return builder
  1005. }
  1006. // Aliases returns aliases by index name(s).
  1007. func (c *Client) Aliases() *AliasesService {
  1008. builder := NewAliasesService(c)
  1009. return builder
  1010. }
  1011. // GetTemplate gets a search template.
  1012. // Use IndexXXXTemplate funcs to manage index templates.
  1013. func (c *Client) GetTemplate() *GetTemplateService {
  1014. return NewGetTemplateService(c)
  1015. }
  1016. // PutTemplate creates or updates a search template.
  1017. // Use IndexXXXTemplate funcs to manage index templates.
  1018. func (c *Client) PutTemplate() *PutTemplateService {
  1019. return NewPutTemplateService(c)
  1020. }
  1021. // DeleteTemplate deletes a search template.
  1022. // Use IndexXXXTemplate funcs to manage index templates.
  1023. func (c *Client) DeleteTemplate() *DeleteTemplateService {
  1024. return NewDeleteTemplateService(c)
  1025. }
  1026. // IndexGetTemplate gets an index template.
  1027. // Use XXXTemplate funcs to manage search templates.
  1028. func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
  1029. builder := NewIndicesGetTemplateService(c)
  1030. builder = builder.Name(names...)
  1031. return builder
  1032. }
  1033. // IndexTemplateExists gets check if an index template exists.
  1034. // Use XXXTemplate funcs to manage search templates.
  1035. func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
  1036. builder := NewIndicesExistsTemplateService(c)
  1037. builder = builder.Name(name)
  1038. return builder
  1039. }
  1040. // IndexPutTemplate creates or updates an index template.
  1041. // Use XXXTemplate funcs to manage search templates.
  1042. func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
  1043. builder := NewIndicesPutTemplateService(c)
  1044. builder = builder.Name(name)
  1045. return builder
  1046. }
  1047. // IndexDeleteTemplate deletes an index template.
  1048. // Use XXXTemplate funcs to manage search templates.
  1049. func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
  1050. builder := NewIndicesDeleteTemplateService(c)
  1051. builder = builder.Name(name)
  1052. return builder
  1053. }
  1054. // GetMapping gets a mapping.
  1055. func (c *Client) GetMapping() *GetMappingService {
  1056. return NewGetMappingService(c)
  1057. }
  1058. // PutMapping registers a mapping.
  1059. func (c *Client) PutMapping() *PutMappingService {
  1060. return NewPutMappingService(c)
  1061. }
  1062. // DeleteMapping deletes a mapping.
  1063. func (c *Client) DeleteMapping() *DeleteMappingService {
  1064. return NewDeleteMappingService(c)
  1065. }
  1066. // ClusterHealth retrieves the health of the cluster.
  1067. func (c *Client) ClusterHealth() *ClusterHealthService {
  1068. return NewClusterHealthService(c)
  1069. }
  1070. // ClusterState retrieves the state of the cluster.
  1071. func (c *Client) ClusterState() *ClusterStateService {
  1072. return NewClusterStateService(c)
  1073. }
  1074. // NodesInfo retrieves one or more or all of the cluster nodes information.
  1075. func (c *Client) NodesInfo() *NodesInfoService {
  1076. return NewNodesInfoService(c)
  1077. }
  1078. // Reindex returns a service that will reindex documents from a source
  1079. // index into a target index. See
  1080. // http://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
  1081. // for more information about reindexing.
  1082. func (c *Client) Reindex(sourceIndex, targetIndex string) *Reindexer {
  1083. return NewReindexer(c, sourceIndex, targetIndex)
  1084. }