client.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "log"
  11. "math/rand"
  12. "net/http"
  13. "net/http/httputil"
  14. "net/url"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. const (
  21. // Version is the current version of Elastic.
  22. Version = "2.0.0"
  23. // DefaultUrl is the default endpoint of Elasticsearch on the local machine.
  24. // It is used e.g. when initializing a new Client without a specific URL.
  25. DefaultURL = "http://127.0.0.1:9200"
  26. // DefaultScheme is the default protocol scheme to use when sniffing
  27. // the Elasticsearch cluster.
  28. DefaultScheme = "http"
  29. // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
  30. DefaultHealthcheckEnabled = true
  31. // DefaultHealthcheckInterval is the default interval between
  32. // two health checks of the nodes in the cluster.
  33. DefaultHealthcheckInterval = 60 * time.Second
  34. // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
  35. DefaultSnifferEnabled = true
  36. // DefaultSnifferInterval is the interval between two sniffing procedures,
  37. // i.e. the lookup of all nodes in the cluster and their addition/removal
  38. // from the list of actual connections.
  39. DefaultSnifferInterval = 15 * time.Minute
  40. // DefaultSnifferTimeout is the default timeout after which the
  41. // sniffing process times out.
  42. DefaultSnifferTimeout = 1 * time.Second
  43. // DefaultMaxRetries is the number of retries for a single request after
  44. // Elastic will give up and return an error. It is zero by default, so
  45. // retry is disabled by default.
  46. DefaultMaxRetries = 0
  47. )
  48. var (
  49. // ErrNoClient is raised when no Elasticsearch node is available.
  50. ErrNoClient = errors.New("no Elasticsearch node available")
  51. // ErrRetry is raised when a request cannot be executed after the configured
  52. // number of retries.
  53. ErrRetry = errors.New("cannot connect after several retries")
  54. )
  55. // ClientOptionFunc is a function that configures a Client.
  56. // It is used in NewClient.
  57. type ClientOptionFunc func(*Client) error
  58. // Client is an Elasticsearch client. Create one by calling NewClient.
  59. type Client struct {
  60. c *http.Client // net/http Client to use for requests
  61. connsMu sync.RWMutex // connsMu guards the next block
  62. conns []*conn // all connections
  63. cindex int // index into conns
  64. mu sync.RWMutex // guards the next block
  65. urls []string // set of URLs passed initially to the client
  66. running bool // true if the client's background processes are running
  67. errorlog *log.Logger // error log for critical messages
  68. infolog *log.Logger // information log for e.g. response times
  69. tracelog *log.Logger // trace log for debugging
  70. maxRetries int // max. number of retries
  71. scheme string // http or https
  72. healthcheckEnabled bool // healthchecks enabled or disabled
  73. healthcheckInterval time.Duration // interval between healthchecks
  74. healthcheckStop chan bool // notify healthchecker to stop, and notify back
  75. snifferEnabled bool // sniffer enabled or disabled
  76. snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
  77. snifferInterval time.Duration // interval between sniffing
  78. snifferStop chan bool // notify sniffer to stop, and notify back
  79. decoder Decoder // used to decode data sent from Elasticsearch
  80. }
  81. // NewClient creates a new client to work with Elasticsearch.
  82. //
  83. // The caller can configure the new client by passing configuration options
  84. // to the func.
  85. //
  86. // Example:
  87. //
  88. // client, err := elastic.NewClient(
  89. // elastic.SetURL("http://localhost:9200", "http://localhost:9201"),
  90. // elastic.SetMaxRetries(10))
  91. //
  92. // If no URL is configured, Elastic uses DefaultURL by default.
  93. //
  94. // If the sniffer is enabled (the default), the new client then sniffes
  95. // the cluster via the Nodes Info API
  96. // (see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info).
  97. // It uses the URLs specified by the caller. The caller is responsible
  98. // to only pass a list of URLs of nodes that belong to the same cluster.
  99. // This sniffing process is run on startup and periodically.
  100. // Use SnifferInterval to set the interval between two sniffs (default is
  101. // 15 minutes). In other words: By default, the client will find new nodes
  102. // in the cluster and remove those that are no longer available every
  103. // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
  104. //
  105. // The list of nodes found in the sniffing process will be used to make
  106. // connections to the REST API of Elasticsearch. These nodes are also
  107. // periodically checked in a shorter time frame. This process is called
  108. // a health check. By default, a health check is done every 60 seconds.
  109. // You can set a shorter or longer interval by SetHealthcheckInterval.
  110. // Disabling health checks is not recommended, but can be done by
  111. // SetHealthcheck(false).
  112. //
  113. // Connections are automatically marked as dead or healthy while
  114. // making requests to Elasticsearch. When a request fails, Elastic will
  115. // retry up to a maximum number of retries configured with SetMaxRetries.
  116. // Retries are disabled by default.
  117. //
  118. // If no HttpClient is configured, then http.DefaultClient is used.
  119. // You can use your own http.Client with some http.Transport for
  120. // advanced scenarios.
  121. //
  122. // An error is also returned when some configuration option is invalid or
  123. // the new client cannot sniff the cluster (if enabled).
  124. func NewClient(options ...ClientOptionFunc) (*Client, error) {
  125. // Set up the client
  126. c := &Client{
  127. urls: []string{DefaultURL},
  128. c: http.DefaultClient,
  129. conns: make([]*conn, 0),
  130. cindex: -1,
  131. scheme: DefaultScheme,
  132. decoder: &DefaultDecoder{},
  133. maxRetries: DefaultMaxRetries,
  134. healthcheckEnabled: DefaultHealthcheckEnabled,
  135. healthcheckInterval: DefaultHealthcheckInterval,
  136. healthcheckStop: make(chan bool),
  137. snifferEnabled: DefaultSnifferEnabled,
  138. snifferInterval: DefaultSnifferInterval,
  139. snifferStop: make(chan bool),
  140. snifferTimeout: DefaultSnifferTimeout,
  141. }
  142. // Run the options on it
  143. for _, option := range options {
  144. if err := option(c); err != nil {
  145. return nil, err
  146. }
  147. }
  148. if len(c.urls) == 0 {
  149. c.urls = []string{DefaultURL}
  150. }
  151. c.urls = canonicalize(c.urls...)
  152. if c.snifferEnabled {
  153. // Sniff the cluster initially
  154. if err := c.sniff(); err != nil {
  155. return nil, err
  156. }
  157. } else {
  158. // Do not sniff the cluster initially. Use the provided URLs instead.
  159. for _, url := range c.urls {
  160. c.conns = append(c.conns, newConn(url, url))
  161. }
  162. }
  163. // Perform an initial health check
  164. c.healthcheck()
  165. go c.sniffer() // periodically update cluster information
  166. go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
  167. c.mu.Lock()
  168. c.running = true
  169. c.mu.Unlock()
  170. return c, nil
  171. }
  172. // SetHttpClient can be used to specify the http.Client to use when making
  173. // HTTP requests to Elasticsearch.
  174. func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
  175. return func(c *Client) error {
  176. if httpClient != nil {
  177. c.c = httpClient
  178. } else {
  179. c.c = http.DefaultClient
  180. }
  181. return nil
  182. }
  183. }
  184. // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
  185. // when sniffing is enabled, these URLs are used to initially sniff the
  186. // cluster on startup.
  187. func SetURL(urls ...string) ClientOptionFunc {
  188. return func(c *Client) error {
  189. switch len(urls) {
  190. case 0:
  191. c.urls = []string{DefaultURL}
  192. default:
  193. c.urls = make([]string, 0)
  194. for _, url := range urls {
  195. c.urls = append(c.urls, url)
  196. }
  197. }
  198. return nil
  199. }
  200. }
  201. // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
  202. // This is http by default.
  203. func SetScheme(scheme string) ClientOptionFunc {
  204. return func(c *Client) error {
  205. c.scheme = scheme
  206. return nil
  207. }
  208. }
  209. // SetSniff enables or disables the sniffer (enabled by default).
  210. func SetSniff(enabled bool) ClientOptionFunc {
  211. return func(c *Client) error {
  212. c.snifferEnabled = enabled
  213. return nil
  214. }
  215. }
  216. // SetSnifferInterval sets the interval between two sniffing processes.
  217. // The default interval is 15 minutes.
  218. func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
  219. return func(c *Client) error {
  220. c.snifferInterval = interval
  221. return nil
  222. }
  223. }
  224. // SetSnifferTimeout sets the timeout for the sniffer that finds the
  225. // nodes in a cluster. The default is 1 second.
  226. func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
  227. return func(c *Client) error {
  228. c.snifferTimeout = timeout
  229. return nil
  230. }
  231. }
  232. // SetHealthcheck enables or disables healthchecks (enabled by default).
  233. func SetHealthcheck(enabled bool) ClientOptionFunc {
  234. return func(c *Client) error {
  235. c.healthcheckEnabled = enabled
  236. return nil
  237. }
  238. }
  239. // SetHealthcheckInterval sets the interval between two health checks.
  240. // The default interval is 60 seconds.
  241. func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
  242. return func(c *Client) error {
  243. c.healthcheckInterval = interval
  244. return nil
  245. }
  246. }
  247. // SetMaxRetries sets the maximum number of retries before giving up when
  248. // performing a HTTP request to Elasticsearch.
  249. func SetMaxRetries(maxRetries int) func(*Client) error {
  250. return func(c *Client) error {
  251. if maxRetries < 0 {
  252. return errors.New("MaxRetries must be greater than or equal to 0")
  253. }
  254. c.maxRetries = maxRetries
  255. return nil
  256. }
  257. }
  258. // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
  259. // DefaultDecoder is used by default.
  260. func SetDecoder(decoder Decoder) func(*Client) error {
  261. return func(c *Client) error {
  262. if decoder != nil {
  263. c.decoder = decoder
  264. } else {
  265. c.decoder = &DefaultDecoder{}
  266. }
  267. return nil
  268. }
  269. }
  270. // SetErrorLog sets the logger for critical messages like nodes joining
  271. // or leaving the cluster or failing requests. It is nil by default.
  272. func SetErrorLog(logger *log.Logger) func(*Client) error {
  273. return func(c *Client) error {
  274. c.errorlog = logger
  275. return nil
  276. }
  277. }
  278. // SetInfoLog sets the logger for informational messages, e.g. requests
  279. // and their response times. It is nil by default.
  280. func SetInfoLog(logger *log.Logger) func(*Client) error {
  281. return func(c *Client) error {
  282. c.infolog = logger
  283. return nil
  284. }
  285. }
  286. // SetTraceLog specifies the log.Logger to use for output of HTTP requests
  287. // and responses which is helpful during debugging. It is nil by default.
  288. func SetTraceLog(logger *log.Logger) func(*Client) error {
  289. return func(c *Client) error {
  290. c.tracelog = logger
  291. return nil
  292. }
  293. }
  294. // String returns a string representation of the client status.
  295. func (c *Client) String() string {
  296. c.connsMu.Lock()
  297. conns := c.conns
  298. c.connsMu.Unlock()
  299. var buf bytes.Buffer
  300. for i, conn := range conns {
  301. if i > 0 {
  302. buf.WriteString(", ")
  303. }
  304. buf.WriteString(conn.String())
  305. }
  306. return buf.String()
  307. }
  308. // IsRunning returns true if the background processes of the client are
  309. // running, false otherwise.
  310. func (c *Client) IsRunning() bool {
  311. c.mu.RLock()
  312. defer c.mu.RUnlock()
  313. return c.running
  314. }
  315. // Start starts the background processes like sniffing the cluster and
  316. // periodic health checks. You don't need to run Start when creating a
  317. // client with NewClient; the background processes are run by default.
  318. //
  319. // If the background processes are already running, this is a no-op.
  320. func (c *Client) Start() {
  321. c.mu.RLock()
  322. if c.running {
  323. c.mu.RUnlock()
  324. return
  325. }
  326. c.mu.RUnlock()
  327. go c.sniffer()
  328. go c.healthchecker()
  329. c.mu.Lock()
  330. c.running = true
  331. c.mu.Unlock()
  332. c.infof("elastic: client started")
  333. }
  334. // Stop stops the background processes that the client is running,
  335. // i.e. sniffing the cluster periodically and running health checks
  336. // on the nodes.
  337. //
  338. // If the background processes are not running, this is a no-op.
  339. func (c *Client) Stop() {
  340. c.mu.RLock()
  341. if !c.running {
  342. c.mu.RUnlock()
  343. return
  344. }
  345. c.mu.RUnlock()
  346. c.healthcheckStop <- true
  347. <-c.healthcheckStop
  348. c.snifferStop <- true
  349. <-c.snifferStop
  350. c.mu.Lock()
  351. c.running = false
  352. c.mu.Unlock()
  353. c.infof("elastic: client stopped")
  354. }
  355. // errorf logs to the error log.
  356. func (c *Client) errorf(format string, args ...interface{}) {
  357. if c.errorlog != nil {
  358. c.errorlog.Printf(format, args...)
  359. }
  360. }
  361. // infof logs informational messages.
  362. func (c *Client) infof(format string, args ...interface{}) {
  363. if c.infolog != nil {
  364. c.infolog.Printf(format, args...)
  365. }
  366. }
  367. // tracef logs to the trace log.
  368. func (c *Client) tracef(format string, args ...interface{}) {
  369. if c.tracelog != nil {
  370. c.tracelog.Printf(format, args...)
  371. }
  372. }
  373. // dumpRequest dumps the given HTTP request to the trace log.
  374. func (c *Client) dumpRequest(r *http.Request) {
  375. if c.tracelog != nil {
  376. out, err := httputil.DumpRequestOut(r, true)
  377. if err == nil {
  378. c.tracef("%s\n", string(out))
  379. }
  380. }
  381. }
  382. // dumpResponse dumps the given HTTP response to the trace log.
  383. func (c *Client) dumpResponse(resp *http.Response) {
  384. if c.tracelog != nil {
  385. out, err := httputil.DumpResponse(resp, true)
  386. if err == nil {
  387. c.tracef("%s\n", string(out))
  388. }
  389. }
  390. }
  391. // sniffer periodically runs sniff.
  392. func (c *Client) sniffer() {
  393. for {
  394. c.mu.RLock()
  395. ticker := time.NewTicker(c.snifferInterval)
  396. c.mu.RUnlock()
  397. select {
  398. case <-c.snifferStop:
  399. // we are asked to stop, so we signal back that we're stopping now
  400. c.snifferStop <- true
  401. return
  402. case <-ticker.C:
  403. c.sniff()
  404. }
  405. }
  406. }
  407. // sniff uses the Node Info API to return the list of nodes in the cluster.
  408. // It uses the list of URLs passed on startup plus the list of URLs found
  409. // by the preceding sniffing process (if sniffing is enabled).
  410. //
  411. // If sniffing is disabled, this is a no-op.
  412. func (c *Client) sniff() error {
  413. c.mu.RLock()
  414. if !c.snifferEnabled {
  415. c.mu.RUnlock()
  416. return nil
  417. }
  418. // Use all available URLs provided to sniff the cluster.
  419. urlsMap := make(map[string]bool)
  420. urls := make([]string, 0)
  421. // Add all URLs provided on startup
  422. for _, url := range c.urls {
  423. urlsMap[url] = true
  424. urls = append(urls, url)
  425. }
  426. timeout := c.snifferTimeout
  427. c.mu.RUnlock()
  428. // Add all URLs found by sniffing
  429. c.connsMu.RLock()
  430. for _, conn := range c.conns {
  431. if !conn.IsDead() {
  432. url := conn.URL()
  433. if _, found := urlsMap[url]; !found {
  434. urls = append(urls, url)
  435. }
  436. }
  437. }
  438. c.connsMu.RUnlock()
  439. if len(urls) == 0 {
  440. return ErrNoClient
  441. }
  442. // Start sniffing on all found URLs
  443. ch := make(chan []*conn, len(urls))
  444. for _, url := range urls {
  445. go func(url string) { ch <- c.sniffNode(url) }(url)
  446. }
  447. // Wait for the results to come back, or the process times out.
  448. for {
  449. select {
  450. case conns := <-ch:
  451. if len(conns) > 0 {
  452. c.updateConns(conns)
  453. return nil
  454. }
  455. case <-time.After(timeout):
  456. // We get here if no cluster responds in time
  457. return ErrNoClient
  458. }
  459. }
  460. }
  461. // reSniffHostAndPort is used to extract hostname and port from a result
  462. // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
  463. var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
  464. // sniffNode sniffs a single node. This method is run as a goroutine
  465. // in sniff. If successful, it returns the list of node URLs extracted
  466. // from the result of calling Nodes Info API. Otherwise, an empty array
  467. // is returned.
  468. func (c *Client) sniffNode(url string) []*conn {
  469. nodes := make([]*conn, 0)
  470. // Call the Nodes Info API at /_nodes/http
  471. req, err := NewRequest("GET", url+"/_nodes/http")
  472. if err != nil {
  473. return nodes
  474. }
  475. res, err := c.c.Do((*http.Request)(req))
  476. if err != nil {
  477. return nodes
  478. }
  479. if res == nil {
  480. return nodes
  481. }
  482. if res.Body != nil {
  483. defer res.Body.Close()
  484. }
  485. var info NodesInfoResponse
  486. if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
  487. if len(info.Nodes) > 0 {
  488. switch c.scheme {
  489. case "https":
  490. for nodeID, node := range info.Nodes {
  491. m := reSniffHostAndPort.FindStringSubmatch(node.HTTPSAddress)
  492. if len(m) == 3 {
  493. url := fmt.Sprintf("https://%s:%s", m[1], m[2])
  494. nodes = append(nodes, newConn(nodeID, url))
  495. }
  496. }
  497. default:
  498. for nodeID, node := range info.Nodes {
  499. m := reSniffHostAndPort.FindStringSubmatch(node.HTTPAddress)
  500. if len(m) == 3 {
  501. url := fmt.Sprintf("http://%s:%s", m[1], m[2])
  502. nodes = append(nodes, newConn(nodeID, url))
  503. }
  504. }
  505. }
  506. }
  507. }
  508. return nodes
  509. }
  510. // updateConns updates the clients' connections with new information
  511. // gather by a sniff operation.
  512. func (c *Client) updateConns(conns []*conn) {
  513. c.connsMu.Lock()
  514. newConns := make([]*conn, 0)
  515. // Build up new connections:
  516. // If we find an existing connection, use that (including no. of failures etc.).
  517. // If we find a new connection, add it.
  518. for _, conn := range conns {
  519. var found bool
  520. for _, oldConn := range c.conns {
  521. if oldConn.NodeID() == conn.NodeID() {
  522. // Take over the old connection
  523. newConns = append(newConns, oldConn)
  524. found = true
  525. break
  526. }
  527. }
  528. if !found {
  529. // New connection didn't exist, so add it to our list of new conns.
  530. c.errorf("elastic: %s joined the cluster", conn.URL())
  531. newConns = append(newConns, conn)
  532. }
  533. }
  534. c.conns = newConns
  535. c.cindex = -1
  536. c.connsMu.Unlock()
  537. }
  538. // healthchecker periodically runs healthcheck.
  539. func (c *Client) healthchecker() {
  540. for {
  541. c.mu.RLock()
  542. ticker := time.NewTicker(c.healthcheckInterval)
  543. c.mu.RUnlock()
  544. select {
  545. case <-c.healthcheckStop:
  546. // we are asked to stop, so we signal back that we're stopping now
  547. c.healthcheckStop <- true
  548. return
  549. case <-ticker.C:
  550. c.healthcheck()
  551. }
  552. }
  553. }
  554. // healthcheck does a health check on all nodes in the cluster. Depending on
  555. // the node state, it marks connections as dead, sets them alive etc.
  556. // If healthchecks are disabled, this is a no-op.
  557. func (c *Client) healthcheck() {
  558. c.mu.RLock()
  559. if !c.healthcheckEnabled {
  560. c.mu.RUnlock()
  561. return
  562. }
  563. c.mu.RUnlock()
  564. c.connsMu.RLock()
  565. conns := c.conns
  566. c.connsMu.RUnlock()
  567. for _, conn := range conns {
  568. params := make(url.Values)
  569. params.Set("timeout", "1")
  570. req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode())
  571. if err == nil {
  572. res, err := c.c.Do((*http.Request)(req))
  573. if err == nil {
  574. if res.Body != nil {
  575. defer res.Body.Close()
  576. }
  577. if res.StatusCode >= 200 && res.StatusCode < 300 {
  578. conn.MarkAsAlive()
  579. } else {
  580. conn.MarkAsDead()
  581. c.errorf("elastic: %s is dead [status=%d]", conn.URL(), res.StatusCode)
  582. }
  583. } else {
  584. c.errorf("elastic: %s is dead", conn.URL())
  585. conn.MarkAsDead()
  586. }
  587. } else {
  588. c.errorf("elastic: %s is dead", conn.URL())
  589. conn.MarkAsDead()
  590. }
  591. }
  592. }
  593. // next returns the next available connection, or ErrNoClient.
  594. func (c *Client) next() (*conn, error) {
  595. // We do round-robin here.
  596. // TODO: This should be a pluggable strategy, like the Selector in the official clients.
  597. c.connsMu.Lock()
  598. defer c.connsMu.Unlock()
  599. i := 0
  600. numConns := len(c.conns)
  601. for {
  602. i += 1
  603. if i > numConns {
  604. break // we visited all conns: they all seem to be dead
  605. }
  606. c.cindex += 1
  607. if c.cindex >= numConns {
  608. c.cindex = 0
  609. }
  610. conn := c.conns[c.cindex]
  611. if !conn.IsDead() {
  612. return conn, nil
  613. }
  614. }
  615. // TODO: As a last resort, we could try to awake a dead connection here.
  616. // We tried hard, but there is no node available
  617. return nil, ErrNoClient
  618. }
  619. // PerformRequest does a HTTP request to Elasticsearch.
  620. // It returns a response and an error on failure.
  621. func (c *Client) PerformRequest(method, path string, params url.Values, body interface{}) (*Response, error) {
  622. start := time.Now().UTC()
  623. c.mu.RLock()
  624. retries := c.maxRetries
  625. c.mu.RUnlock()
  626. var err error
  627. var conn *conn
  628. var req *Request
  629. var resp *Response
  630. var retried bool
  631. // We wait between retries, using simple exponential back-off.
  632. // TODO: Make this configurable, including the jitter.
  633. retryWaitMsec := int64(100 + (rand.Intn(20) - 10))
  634. for {
  635. pathWithParams := path
  636. if len(params) > 0 {
  637. pathWithParams += "?" + params.Encode()
  638. }
  639. // Get a connection
  640. conn, err = c.next()
  641. if err == ErrNoClient {
  642. if !retried {
  643. // Force a healtcheck as all connections seem to be dead.
  644. c.healthcheck()
  645. }
  646. retries -= 1
  647. if retries <= 0 {
  648. return nil, err
  649. }
  650. retried = true
  651. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  652. retryWaitMsec += retryWaitMsec
  653. continue // try again
  654. }
  655. if err != nil {
  656. c.errorf("elastic: cannot get connection from pool")
  657. return nil, err
  658. }
  659. req, err = NewRequest(method, conn.URL()+pathWithParams)
  660. if err != nil {
  661. c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err)
  662. return nil, err
  663. }
  664. // Set body
  665. if body != nil {
  666. switch b := body.(type) {
  667. case string:
  668. req.SetBodyString(b)
  669. break
  670. default:
  671. req.SetBodyJson(body)
  672. break
  673. }
  674. }
  675. // Tracing
  676. c.dumpRequest((*http.Request)(req))
  677. // Get response
  678. res, err := c.c.Do((*http.Request)(req))
  679. if err != nil {
  680. retries -= 1
  681. if retries <= 0 {
  682. c.errorf("elastic: %s is dead", conn.URL())
  683. conn.MarkAsDead()
  684. return nil, err
  685. }
  686. retried = true
  687. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  688. retryWaitMsec += retryWaitMsec
  689. continue // try again
  690. }
  691. if res.Body != nil {
  692. defer res.Body.Close()
  693. }
  694. // Check for errors
  695. if err := checkResponse(res); err != nil {
  696. retries -= 1
  697. if retries <= 0 {
  698. return nil, err
  699. }
  700. retried = true
  701. time.Sleep(time.Duration(retryWaitMsec) * time.Millisecond)
  702. retryWaitMsec += retryWaitMsec
  703. continue // try again
  704. }
  705. // Tracing
  706. c.dumpResponse(res)
  707. // We successfully made a request with this connection
  708. conn.MarkAsHealthy()
  709. resp, err = c.newResponse(res)
  710. if err != nil {
  711. return nil, err
  712. }
  713. break
  714. }
  715. duration := time.Now().UTC().Sub(start)
  716. c.infof("%s %s [status:%d, request:%.3fs]",
  717. strings.ToUpper(method),
  718. req.URL,
  719. resp.StatusCode,
  720. float64(int64(duration/time.Millisecond))/1000)
  721. return resp, nil
  722. }
  723. // ElasticsearchVersion returns the version number of Elasticsearch
  724. // running on the given URL.
  725. func (c *Client) ElasticsearchVersion(url string) (string, error) {
  726. res, _, err := c.Ping().URL(url).Do()
  727. if err != nil {
  728. return "", err
  729. }
  730. return res.Version.Number, nil
  731. }
  732. // IndexNames returns the names of all indices in the cluster.
  733. func (c *Client) IndexNames() ([]string, error) {
  734. res, err := c.IndexGetSettings().Index("_all").Do()
  735. if err != nil {
  736. return nil, err
  737. }
  738. var names []string
  739. for name, _ := range res {
  740. names = append(names, name)
  741. }
  742. return names, nil
  743. }
  744. // Ping checks if a given node in a cluster exists and (optionally)
  745. // returns some basic information about the Elasticsearch server,
  746. // e.g. the Elasticsearch version number.
  747. func (c *Client) Ping() *PingService {
  748. return NewPingService(c)
  749. }
  750. // CreateIndex returns a service to create a new index.
  751. func (c *Client) CreateIndex(name string) *CreateIndexService {
  752. builder := NewCreateIndexService(c)
  753. builder.Index(name)
  754. return builder
  755. }
  756. // DeleteIndex returns a service to delete an index.
  757. func (c *Client) DeleteIndex(name string) *DeleteIndexService {
  758. builder := NewDeleteIndexService(c)
  759. builder.Index(name)
  760. return builder
  761. }
  762. // IndexExists allows to check if an index exists.
  763. func (c *Client) IndexExists(name string) *IndexExistsService {
  764. builder := NewIndexExistsService(c)
  765. builder.Index(name)
  766. return builder
  767. }
  768. // TypeExists allows to check if one or more types exist in one or more indices.
  769. func (c *Client) TypeExists() *IndicesExistsTypeService {
  770. return NewIndicesExistsTypeService(c)
  771. }
  772. // IndexStats provides statistics on different operations happining
  773. // in one or more indices.
  774. func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
  775. builder := NewIndicesStatsService(c)
  776. builder = builder.Index(indices...)
  777. return builder
  778. }
  779. // OpenIndex opens an index.
  780. func (c *Client) OpenIndex(name string) *OpenIndexService {
  781. builder := NewOpenIndexService(c)
  782. builder.Index(name)
  783. return builder
  784. }
  785. // CloseIndex closes an index.
  786. func (c *Client) CloseIndex(name string) *CloseIndexService {
  787. builder := NewCloseIndexService(c)
  788. builder.Index(name)
  789. return builder
  790. }
  791. // Index a document.
  792. func (c *Client) Index() *IndexService {
  793. builder := NewIndexService(c)
  794. return builder
  795. }
  796. // IndexGet retrieves information about one or more indices.
  797. // IndexGet is only available for Elasticsearch 1.4 or later.
  798. func (c *Client) IndexGet() *IndicesGetService {
  799. builder := NewIndicesGetService(c)
  800. return builder
  801. }
  802. // IndexGetSettings retrieves settings about one or more indices.
  803. func (c *Client) IndexGetSettings() *IndicesGetSettingsService {
  804. builder := NewIndicesGetSettingsService(c)
  805. return builder
  806. }
  807. // Update a document.
  808. func (c *Client) Update() *UpdateService {
  809. builder := NewUpdateService(c)
  810. return builder
  811. }
  812. // Delete a document.
  813. func (c *Client) Delete() *DeleteService {
  814. builder := NewDeleteService(c)
  815. return builder
  816. }
  817. // DeleteByQuery deletes documents as found by a query.
  818. func (c *Client) DeleteByQuery() *DeleteByQueryService {
  819. builder := NewDeleteByQueryService(c)
  820. return builder
  821. }
  822. // Get a document.
  823. func (c *Client) Get() *GetService {
  824. builder := NewGetService(c)
  825. return builder
  826. }
  827. // MultiGet retrieves multiple documents in one roundtrip.
  828. func (c *Client) MultiGet() *MultiGetService {
  829. builder := NewMultiGetService(c)
  830. return builder
  831. }
  832. // Exists checks if a document exists.
  833. func (c *Client) Exists() *ExistsService {
  834. builder := NewExistsService(c)
  835. return builder
  836. }
  837. // Count documents.
  838. func (c *Client) Count(indices ...string) *CountService {
  839. builder := NewCountService(c)
  840. builder.Indices(indices...)
  841. return builder
  842. }
  843. // Search is the entry point for searches.
  844. func (c *Client) Search(indices ...string) *SearchService {
  845. builder := NewSearchService(c)
  846. builder.Indices(indices...)
  847. return builder
  848. }
  849. // Percolate allows to send a document and return matching queries.
  850. // See http://www.elastic.co/guide/en/elasticsearch/reference/current/search-percolate.html.
  851. func (c *Client) Percolate() *PercolateService {
  852. builder := NewPercolateService(c)
  853. return builder
  854. }
  855. // MultiSearch is the entry point for multi searches.
  856. func (c *Client) MultiSearch() *MultiSearchService {
  857. return NewMultiSearchService(c)
  858. }
  859. // Suggest returns a service to return suggestions.
  860. func (c *Client) Suggest(indices ...string) *SuggestService {
  861. builder := NewSuggestService(c)
  862. builder.Indices(indices...)
  863. return builder
  864. }
  865. // Scan through documents. Use this to iterate inside a server process
  866. // where the results will be processed without returning them to a client.
  867. func (c *Client) Scan(indices ...string) *ScanService {
  868. builder := NewScanService(c)
  869. builder.Indices(indices...)
  870. return builder
  871. }
  872. // Scroll through documents. Use this to efficiently scroll through results
  873. // while returning the results to a client. Use Scan when you don't need
  874. // to return requests to a client (i.e. not paginating via request/response).
  875. func (c *Client) Scroll(indices ...string) *ScrollService {
  876. builder := NewScrollService(c)
  877. builder.Indices(indices...)
  878. return builder
  879. }
  880. // ClearScroll can be used to clear search contexts manually.
  881. func (c *Client) ClearScroll() *ClearScrollService {
  882. builder := NewClearScrollService(c)
  883. return builder
  884. }
  885. // Optimize asks Elasticsearch to optimize one or more indices.
  886. func (c *Client) Optimize(indices ...string) *OptimizeService {
  887. builder := NewOptimizeService(c)
  888. builder.Indices(indices...)
  889. return builder
  890. }
  891. // Refresh asks Elasticsearch to refresh one or more indices.
  892. func (c *Client) Refresh(indices ...string) *RefreshService {
  893. builder := NewRefreshService(c)
  894. builder.Indices(indices...)
  895. return builder
  896. }
  897. // Flush asks Elasticsearch to free memory from the index and
  898. // flush data to disk.
  899. func (c *Client) Flush() *FlushService {
  900. builder := NewFlushService(c)
  901. return builder
  902. }
  903. // Explain computes a score explanation for a query and a specific document.
  904. func (c *Client) Explain(index, typ, id string) *ExplainService {
  905. builder := NewExplainService(c)
  906. builder = builder.Index(index).Type(typ).Id(id)
  907. return builder
  908. }
  909. // Bulk is the entry point to mass insert/update/delete documents.
  910. func (c *Client) Bulk() *BulkService {
  911. builder := NewBulkService(c)
  912. return builder
  913. }
  914. // Alias enables the caller to add and/or remove aliases.
  915. func (c *Client) Alias() *AliasService {
  916. builder := NewAliasService(c)
  917. return builder
  918. }
  919. // Aliases returns aliases by index name(s).
  920. func (c *Client) Aliases() *AliasesService {
  921. builder := NewAliasesService(c)
  922. return builder
  923. }
  924. // GetTemplate gets a search template.
  925. // Use IndexXXXTemplate funcs to manage index templates.
  926. func (c *Client) GetTemplate() *GetTemplateService {
  927. return NewGetTemplateService(c)
  928. }
  929. // PutTemplate creates or updates a search template.
  930. // Use IndexXXXTemplate funcs to manage index templates.
  931. func (c *Client) PutTemplate() *PutTemplateService {
  932. return NewPutTemplateService(c)
  933. }
  934. // DeleteTemplate deletes a search template.
  935. // Use IndexXXXTemplate funcs to manage index templates.
  936. func (c *Client) DeleteTemplate() *DeleteTemplateService {
  937. return NewDeleteTemplateService(c)
  938. }
  939. // IndexGetTemplate gets an index template.
  940. // Use XXXTemplate funcs to manage search templates.
  941. func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
  942. builder := NewIndicesGetTemplateService(c)
  943. builder = builder.Name(names...)
  944. return builder
  945. }
  946. // IndexTemplateExists gets check if an index template exists.
  947. // Use XXXTemplate funcs to manage search templates.
  948. func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
  949. builder := NewIndicesExistsTemplateService(c)
  950. builder = builder.Name(name)
  951. return builder
  952. }
  953. // IndexPutTemplate creates or updates an index template.
  954. // Use XXXTemplate funcs to manage search templates.
  955. func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
  956. builder := NewIndicesPutTemplateService(c)
  957. builder = builder.Name(name)
  958. return builder
  959. }
  960. // IndexDeleteTemplate deletes an index template.
  961. // Use XXXTemplate funcs to manage search templates.
  962. func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
  963. builder := NewIndicesDeleteTemplateService(c)
  964. builder = builder.Name(name)
  965. return builder
  966. }
  967. // GetMapping gets a mapping.
  968. func (c *Client) GetMapping() *GetMappingService {
  969. return NewGetMappingService(c)
  970. }
  971. // PutMapping registers a mapping.
  972. func (c *Client) PutMapping() *PutMappingService {
  973. return NewPutMappingService(c)
  974. }
  975. // DeleteMapping deletes a mapping.
  976. func (c *Client) DeleteMapping() *DeleteMappingService {
  977. return NewDeleteMappingService(c)
  978. }
  979. // ClusterHealth retrieves the health of the cluster.
  980. func (c *Client) ClusterHealth() *ClusterHealthService {
  981. return NewClusterHealthService(c)
  982. }
  983. // ClusterState retrieves the state of the cluster.
  984. func (c *Client) ClusterState() *ClusterStateService {
  985. return NewClusterStateService(c)
  986. }
  987. // NodesInfo retrieves one or more or all of the cluster nodes information.
  988. func (c *Client) NodesInfo() *NodesInfoService {
  989. return NewNodesInfoService(c)
  990. }
  991. // Reindex returns a service that will reindex documents from a source
  992. // index into a target index. See
  993. // http://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
  994. // for more information about reindexing.
  995. func (c *Client) Reindex(sourceIndex, targetIndex string) *Reindexer {
  996. return NewReindexer(c, sourceIndex, targetIndex)
  997. }