client.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "log"
  11. "net/http"
  12. "net/http/httputil"
  13. "net/url"
  14. "os"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/pkg/errors"
  20. "gopkg.in/olivere/elastic.v5/config"
  21. )
  22. const (
  23. // Version is the current version of Elastic.
  24. Version = "5.0.48"
  25. // DefaultURL is the default endpoint of Elasticsearch on the local machine.
  26. // It is used e.g. when initializing a new Client without a specific URL.
  27. DefaultURL = "http://127.0.0.1:9200"
  28. // DefaultScheme is the default protocol scheme to use when sniffing
  29. // the Elasticsearch cluster.
  30. DefaultScheme = "http"
  31. // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
  32. DefaultHealthcheckEnabled = true
  33. // DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
  34. // for a response from Elasticsearch on startup, i.e. when creating a
  35. // client. After the client is started, a shorter timeout is commonly used
  36. // (its default is specified in DefaultHealthcheckTimeout).
  37. DefaultHealthcheckTimeoutStartup = 5 * time.Second
  38. // DefaultHealthcheckTimeout specifies the time a running client waits for
  39. // a response from Elasticsearch. Notice that the healthcheck timeout
  40. // when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
  41. DefaultHealthcheckTimeout = 1 * time.Second
  42. // DefaultHealthcheckInterval is the default interval between
  43. // two health checks of the nodes in the cluster.
  44. DefaultHealthcheckInterval = 60 * time.Second
  45. // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
  46. DefaultSnifferEnabled = true
  47. // DefaultSnifferInterval is the interval between two sniffing procedures,
  48. // i.e. the lookup of all nodes in the cluster and their addition/removal
  49. // from the list of actual connections.
  50. DefaultSnifferInterval = 15 * time.Minute
  51. // DefaultSnifferTimeoutStartup is the default timeout for the sniffing
  52. // process that is initiated while creating a new client. For subsequent
  53. // sniffing processes, DefaultSnifferTimeout is used (by default).
  54. DefaultSnifferTimeoutStartup = 5 * time.Second
  55. // DefaultSnifferTimeout is the default timeout after which the
  56. // sniffing process times out. Notice that for the initial sniffing
  57. // process, DefaultSnifferTimeoutStartup is used.
  58. DefaultSnifferTimeout = 2 * time.Second
  59. // DefaultSendGetBodyAs is the HTTP method to use when elastic is sending
  60. // a GET request with a body.
  61. DefaultSendGetBodyAs = "GET"
  62. // DefaultGzipEnabled specifies if gzip compression is enabled by default.
  63. DefaultGzipEnabled = false
  64. // off is used to disable timeouts.
  65. off = -1 * time.Second
  66. )
  67. var (
  68. // ErrNoClient is raised when no Elasticsearch node is available.
  69. ErrNoClient = errors.New("no Elasticsearch node available")
  70. // ErrRetry is raised when a request cannot be executed after the configured
  71. // number of retries.
  72. ErrRetry = errors.New("cannot connect after several retries")
  73. // ErrTimeout is raised when a request timed out, e.g. when WaitForStatus
  74. // didn't return in time.
  75. ErrTimeout = errors.New("timeout")
  76. // noRetries is a retrier that does not retry.
  77. noRetries = NewStopRetrier()
  78. )
  79. // ClientOptionFunc is a function that configures a Client.
  80. // It is used in NewClient.
  81. type ClientOptionFunc func(*Client) error
  82. // Client is an Elasticsearch client. Create one by calling NewClient.
  83. type Client struct {
  84. c *http.Client // net/http Client to use for requests
  85. connsMu sync.RWMutex // connsMu guards the next block
  86. conns []*conn // all connections
  87. cindex int // index into conns
  88. mu sync.RWMutex // guards the next block
  89. urls []string // set of URLs passed initially to the client
  90. running bool // true if the client's background processes are running
  91. errorlog Logger // error log for critical messages
  92. infolog Logger // information log for e.g. response times
  93. tracelog Logger // trace log for debugging
  94. scheme string // http or https
  95. healthcheckEnabled bool // healthchecks enabled or disabled
  96. healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup
  97. healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch
  98. healthcheckInterval time.Duration // interval between healthchecks
  99. healthcheckStop chan bool // notify healthchecker to stop, and notify back
  100. snifferEnabled bool // sniffer enabled or disabled
  101. snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup
  102. snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
  103. snifferInterval time.Duration // interval between sniffing
  104. snifferCallback SnifferCallback // callback to modify the sniffing decision
  105. snifferStop chan bool // notify sniffer to stop, and notify back
  106. decoder Decoder // used to decode data sent from Elasticsearch
  107. basicAuth bool // indicates whether to send HTTP Basic Auth credentials
  108. basicAuthUsername string // username for HTTP Basic Auth
  109. basicAuthPassword string // password for HTTP Basic Auth
  110. sendGetBodyAs string // override for when sending a GET with a body
  111. requiredPlugins []string // list of required plugins
  112. gzipEnabled bool // gzip compression enabled or disabled (default)
  113. retrier Retrier // strategy for retries
  114. }
  115. // NewClient creates a new client to work with Elasticsearch.
  116. //
  117. // NewClient, by default, is meant to be long-lived and shared across
  118. // your application. If you need a short-lived client, e.g. for request-scope,
  119. // consider using NewSimpleClient instead.
  120. //
  121. // The caller can configure the new client by passing configuration options
  122. // to the func.
  123. //
  124. // Example:
  125. //
  126. // client, err := elastic.NewClient(
  127. // elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"),
  128. // elastic.SetBasicAuth("user", "secret"))
  129. //
  130. // If no URL is configured, Elastic uses DefaultURL by default.
  131. //
  132. // If the sniffer is enabled (the default), the new client then sniffes
  133. // the cluster via the Nodes Info API
  134. // (see https://www.elastic.co/guide/en/elasticsearch/reference/5.2/cluster-nodes-info.html#cluster-nodes-info).
  135. // It uses the URLs specified by the caller. The caller is responsible
  136. // to only pass a list of URLs of nodes that belong to the same cluster.
  137. // This sniffing process is run on startup and periodically.
  138. // Use SnifferInterval to set the interval between two sniffs (default is
  139. // 15 minutes). In other words: By default, the client will find new nodes
  140. // in the cluster and remove those that are no longer available every
  141. // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
  142. //
  143. // The list of nodes found in the sniffing process will be used to make
  144. // connections to the REST API of Elasticsearch. These nodes are also
  145. // periodically checked in a shorter time frame. This process is called
  146. // a health check. By default, a health check is done every 60 seconds.
  147. // You can set a shorter or longer interval by SetHealthcheckInterval.
  148. // Disabling health checks is not recommended, but can be done by
  149. // SetHealthcheck(false).
  150. //
  151. // Connections are automatically marked as dead or healthy while
  152. // making requests to Elasticsearch. When a request fails, Elastic will
  153. // call into the Retry strategy which can be specified with SetRetry.
  154. // The Retry strategy is also responsible for handling backoff i.e. the time
  155. // to wait before starting the next request. There are various standard
  156. // backoff implementations, e.g. ExponentialBackoff or SimpleBackoff.
  157. // Retries are disabled by default.
  158. //
  159. // If no HttpClient is configured, then http.DefaultClient is used.
  160. // You can use your own http.Client with some http.Transport for
  161. // advanced scenarios.
  162. //
  163. // An error is also returned when some configuration option is invalid or
  164. // the new client cannot sniff the cluster (if enabled).
  165. func NewClient(options ...ClientOptionFunc) (*Client, error) {
  166. // Set up the client
  167. c := &Client{
  168. c: http.DefaultClient,
  169. conns: make([]*conn, 0),
  170. cindex: -1,
  171. scheme: DefaultScheme,
  172. decoder: &DefaultDecoder{},
  173. healthcheckEnabled: DefaultHealthcheckEnabled,
  174. healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
  175. healthcheckTimeout: DefaultHealthcheckTimeout,
  176. healthcheckInterval: DefaultHealthcheckInterval,
  177. healthcheckStop: make(chan bool),
  178. snifferEnabled: DefaultSnifferEnabled,
  179. snifferTimeoutStartup: DefaultSnifferTimeoutStartup,
  180. snifferTimeout: DefaultSnifferTimeout,
  181. snifferInterval: DefaultSnifferInterval,
  182. snifferCallback: nopSnifferCallback,
  183. snifferStop: make(chan bool),
  184. sendGetBodyAs: DefaultSendGetBodyAs,
  185. gzipEnabled: DefaultGzipEnabled,
  186. retrier: noRetries, // no retries by default
  187. }
  188. // Run the options on it
  189. for _, option := range options {
  190. if err := option(c); err != nil {
  191. return nil, err
  192. }
  193. }
  194. // Use a default URL and normalize them
  195. if len(c.urls) == 0 {
  196. c.urls = []string{DefaultURL}
  197. }
  198. c.urls = canonicalize(c.urls...)
  199. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  200. if !c.basicAuth {
  201. for _, urlStr := range c.urls {
  202. u, err := url.Parse(urlStr)
  203. if err == nil && u.User != nil {
  204. c.basicAuth = true
  205. c.basicAuthUsername = u.User.Username()
  206. c.basicAuthPassword, _ = u.User.Password()
  207. break
  208. }
  209. }
  210. }
  211. // Check if we can make a request to any of the specified URLs
  212. if c.healthcheckEnabled {
  213. if err := c.startupHealthcheck(c.healthcheckTimeoutStartup); err != nil {
  214. return nil, err
  215. }
  216. }
  217. if c.snifferEnabled {
  218. // Sniff the cluster initially
  219. if err := c.sniff(c.snifferTimeoutStartup); err != nil {
  220. return nil, err
  221. }
  222. } else {
  223. // Do not sniff the cluster initially. Use the provided URLs instead.
  224. for _, url := range c.urls {
  225. c.conns = append(c.conns, newConn(url, url))
  226. }
  227. }
  228. if c.healthcheckEnabled {
  229. // Perform an initial health check
  230. c.healthcheck(c.healthcheckTimeoutStartup, true)
  231. }
  232. // Ensure that we have at least one connection available
  233. if err := c.mustActiveConn(); err != nil {
  234. return nil, err
  235. }
  236. // Check the required plugins
  237. for _, plugin := range c.requiredPlugins {
  238. found, err := c.HasPlugin(plugin)
  239. if err != nil {
  240. return nil, err
  241. }
  242. if !found {
  243. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  244. }
  245. }
  246. if c.snifferEnabled {
  247. go c.sniffer() // periodically update cluster information
  248. }
  249. if c.healthcheckEnabled {
  250. go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
  251. }
  252. c.mu.Lock()
  253. c.running = true
  254. c.mu.Unlock()
  255. return c, nil
  256. }
  257. // NewClientFromConfig initializes a client from a configuration.
  258. func NewClientFromConfig(cfg *config.Config) (*Client, error) {
  259. var options []ClientOptionFunc
  260. if cfg != nil {
  261. if cfg.URL != "" {
  262. options = append(options, SetURL(cfg.URL))
  263. }
  264. if cfg.Errorlog != "" {
  265. f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  266. if err != nil {
  267. return nil, errors.Wrap(err, "unable to initialize error log")
  268. }
  269. l := log.New(f, "", 0)
  270. options = append(options, SetErrorLog(l))
  271. }
  272. if cfg.Tracelog != "" {
  273. f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  274. if err != nil {
  275. return nil, errors.Wrap(err, "unable to initialize trace log")
  276. }
  277. l := log.New(f, "", 0)
  278. options = append(options, SetTraceLog(l))
  279. }
  280. if cfg.Infolog != "" {
  281. f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  282. if err != nil {
  283. return nil, errors.Wrap(err, "unable to initialize info log")
  284. }
  285. l := log.New(f, "", 0)
  286. options = append(options, SetInfoLog(l))
  287. }
  288. if cfg.Username != "" || cfg.Password != "" {
  289. options = append(options, SetBasicAuth(cfg.Username, cfg.Password))
  290. }
  291. if cfg.Sniff != nil {
  292. options = append(options, SetSniff(*cfg.Sniff))
  293. }
  294. }
  295. return NewClient(options...)
  296. }
  297. // NewSimpleClient creates a new short-lived Client that can be used in
  298. // use cases where you need e.g. one client per request.
  299. //
  300. // While NewClient by default sets up e.g. periodic health checks
  301. // and sniffing for new nodes in separate goroutines, NewSimpleClient does
  302. // not and is meant as a simple replacement where you don't need all the
  303. // heavy lifting of NewClient.
  304. //
  305. // NewSimpleClient does the following by default: First, all health checks
  306. // are disabled, including timeouts and periodic checks. Second, sniffing
  307. // is disabled, including timeouts and periodic checks. The number of retries
  308. // is set to 1. NewSimpleClient also does not start any goroutines.
  309. //
  310. // Notice that you can still override settings by passing additional options,
  311. // just like with NewClient.
  312. func NewSimpleClient(options ...ClientOptionFunc) (*Client, error) {
  313. c := &Client{
  314. c: http.DefaultClient,
  315. conns: make([]*conn, 0),
  316. cindex: -1,
  317. scheme: DefaultScheme,
  318. decoder: &DefaultDecoder{},
  319. healthcheckEnabled: false,
  320. healthcheckTimeoutStartup: off,
  321. healthcheckTimeout: off,
  322. healthcheckInterval: off,
  323. healthcheckStop: make(chan bool),
  324. snifferEnabled: false,
  325. snifferTimeoutStartup: off,
  326. snifferTimeout: off,
  327. snifferInterval: off,
  328. snifferCallback: nopSnifferCallback,
  329. snifferStop: make(chan bool),
  330. sendGetBodyAs: DefaultSendGetBodyAs,
  331. gzipEnabled: DefaultGzipEnabled,
  332. retrier: noRetries, // no retries by default
  333. }
  334. // Run the options on it
  335. for _, option := range options {
  336. if err := option(c); err != nil {
  337. return nil, err
  338. }
  339. }
  340. // Use a default URL and normalize them
  341. if len(c.urls) == 0 {
  342. c.urls = []string{DefaultURL}
  343. }
  344. c.urls = canonicalize(c.urls...)
  345. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  346. if !c.basicAuth {
  347. for _, urlStr := range c.urls {
  348. u, err := url.Parse(urlStr)
  349. if err == nil && u.User != nil {
  350. c.basicAuth = true
  351. c.basicAuthUsername = u.User.Username()
  352. c.basicAuthPassword, _ = u.User.Password()
  353. break
  354. }
  355. }
  356. }
  357. for _, url := range c.urls {
  358. c.conns = append(c.conns, newConn(url, url))
  359. }
  360. // Ensure that we have at least one connection available
  361. if err := c.mustActiveConn(); err != nil {
  362. return nil, err
  363. }
  364. // Check the required plugins
  365. for _, plugin := range c.requiredPlugins {
  366. found, err := c.HasPlugin(plugin)
  367. if err != nil {
  368. return nil, err
  369. }
  370. if !found {
  371. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  372. }
  373. }
  374. c.mu.Lock()
  375. c.running = true
  376. c.mu.Unlock()
  377. return c, nil
  378. }
  379. // SetHttpClient can be used to specify the http.Client to use when making
  380. // HTTP requests to Elasticsearch.
  381. func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
  382. return func(c *Client) error {
  383. if httpClient != nil {
  384. c.c = httpClient
  385. } else {
  386. c.c = http.DefaultClient
  387. }
  388. return nil
  389. }
  390. }
  391. // SetBasicAuth can be used to specify the HTTP Basic Auth credentials to
  392. // use when making HTTP requests to Elasticsearch.
  393. func SetBasicAuth(username, password string) ClientOptionFunc {
  394. return func(c *Client) error {
  395. c.basicAuthUsername = username
  396. c.basicAuthPassword = password
  397. c.basicAuth = c.basicAuthUsername != "" || c.basicAuthPassword != ""
  398. return nil
  399. }
  400. }
  401. // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
  402. // when sniffing is enabled, these URLs are used to initially sniff the
  403. // cluster on startup.
  404. func SetURL(urls ...string) ClientOptionFunc {
  405. return func(c *Client) error {
  406. switch len(urls) {
  407. case 0:
  408. c.urls = []string{DefaultURL}
  409. default:
  410. c.urls = urls
  411. }
  412. return nil
  413. }
  414. }
  415. // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
  416. // This is http by default.
  417. func SetScheme(scheme string) ClientOptionFunc {
  418. return func(c *Client) error {
  419. c.scheme = scheme
  420. return nil
  421. }
  422. }
  423. // SetSniff enables or disables the sniffer (enabled by default).
  424. func SetSniff(enabled bool) ClientOptionFunc {
  425. return func(c *Client) error {
  426. c.snifferEnabled = enabled
  427. return nil
  428. }
  429. }
  430. // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
  431. // when creating a new client. The default is 5 seconds. Notice that the
  432. // timeout being used for subsequent sniffing processes is set with
  433. // SetSnifferTimeout.
  434. func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  435. return func(c *Client) error {
  436. c.snifferTimeoutStartup = timeout
  437. return nil
  438. }
  439. }
  440. // SetSnifferTimeout sets the timeout for the sniffer that finds the
  441. // nodes in a cluster. The default is 2 seconds. Notice that the timeout
  442. // used when creating a new client on startup is usually greater and can
  443. // be set with SetSnifferTimeoutStartup.
  444. func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
  445. return func(c *Client) error {
  446. c.snifferTimeout = timeout
  447. return nil
  448. }
  449. }
  450. // SetSnifferInterval sets the interval between two sniffing processes.
  451. // The default interval is 15 minutes.
  452. func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
  453. return func(c *Client) error {
  454. c.snifferInterval = interval
  455. return nil
  456. }
  457. }
  458. // SnifferCallback defines the protocol for sniffing decisions.
  459. type SnifferCallback func(*NodesInfoNode) bool
  460. // nopSnifferCallback is the default sniffer callback: It accepts
  461. // all nodes the sniffer finds.
  462. var nopSnifferCallback = func(*NodesInfoNode) bool { return true }
  463. // SetSnifferCallback allows the caller to modify sniffer decisions.
  464. // When setting the callback, the given SnifferCallback is called for
  465. // each (healthy) node found during the sniffing process.
  466. // If the callback returns false, the node is ignored: No requests
  467. // are routed to it.
  468. func SetSnifferCallback(f SnifferCallback) ClientOptionFunc {
  469. return func(c *Client) error {
  470. if f != nil {
  471. c.snifferCallback = f
  472. }
  473. return nil
  474. }
  475. }
  476. // SetHealthcheck enables or disables healthchecks (enabled by default).
  477. func SetHealthcheck(enabled bool) ClientOptionFunc {
  478. return func(c *Client) error {
  479. c.healthcheckEnabled = enabled
  480. return nil
  481. }
  482. }
  483. // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
  484. // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
  485. // Notice that timeouts for subsequent health checks can be modified with
  486. // SetHealthcheckTimeout.
  487. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  488. return func(c *Client) error {
  489. c.healthcheckTimeoutStartup = timeout
  490. return nil
  491. }
  492. }
  493. // SetHealthcheckTimeout sets the timeout for periodic health checks.
  494. // The default timeout is 1 second (see DefaultHealthcheckTimeout).
  495. // Notice that a different (usually larger) timeout is used for the initial
  496. // healthcheck, which is initiated while creating a new client.
  497. // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
  498. func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
  499. return func(c *Client) error {
  500. c.healthcheckTimeout = timeout
  501. return nil
  502. }
  503. }
  504. // SetHealthcheckInterval sets the interval between two health checks.
  505. // The default interval is 60 seconds.
  506. func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
  507. return func(c *Client) error {
  508. c.healthcheckInterval = interval
  509. return nil
  510. }
  511. }
  512. // SetMaxRetries sets the maximum number of retries before giving up when
  513. // performing a HTTP request to Elasticsearch.
  514. //
  515. // Deprecated: Replace with a Retry implementation.
  516. func SetMaxRetries(maxRetries int) ClientOptionFunc {
  517. return func(c *Client) error {
  518. if maxRetries < 0 {
  519. return errors.New("MaxRetries must be greater than or equal to 0")
  520. } else if maxRetries == 0 {
  521. c.retrier = noRetries
  522. } else {
  523. // Create a Retrier that will wait for 100ms (+/- jitter) between requests.
  524. // This resembles the old behavior with maxRetries.
  525. ticks := make([]int, maxRetries)
  526. for i := 0; i < len(ticks); i++ {
  527. ticks[i] = 100
  528. }
  529. backoff := NewSimpleBackoff(ticks...)
  530. c.retrier = NewBackoffRetrier(backoff)
  531. }
  532. return nil
  533. }
  534. }
  535. // SetGzip enables or disables gzip compression (disabled by default).
  536. func SetGzip(enabled bool) ClientOptionFunc {
  537. return func(c *Client) error {
  538. c.gzipEnabled = enabled
  539. return nil
  540. }
  541. }
  542. // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
  543. // DefaultDecoder is used by default.
  544. func SetDecoder(decoder Decoder) ClientOptionFunc {
  545. return func(c *Client) error {
  546. if decoder != nil {
  547. c.decoder = decoder
  548. } else {
  549. c.decoder = &DefaultDecoder{}
  550. }
  551. return nil
  552. }
  553. }
  554. // SetRequiredPlugins can be used to indicate that some plugins are required
  555. // before a Client will be created.
  556. func SetRequiredPlugins(plugins ...string) ClientOptionFunc {
  557. return func(c *Client) error {
  558. if c.requiredPlugins == nil {
  559. c.requiredPlugins = make([]string, 0)
  560. }
  561. c.requiredPlugins = append(c.requiredPlugins, plugins...)
  562. return nil
  563. }
  564. }
  565. // SetErrorLog sets the logger for critical messages like nodes joining
  566. // or leaving the cluster or failing requests. It is nil by default.
  567. func SetErrorLog(logger Logger) ClientOptionFunc {
  568. return func(c *Client) error {
  569. c.errorlog = logger
  570. return nil
  571. }
  572. }
  573. // SetInfoLog sets the logger for informational messages, e.g. requests
  574. // and their response times. It is nil by default.
  575. func SetInfoLog(logger Logger) ClientOptionFunc {
  576. return func(c *Client) error {
  577. c.infolog = logger
  578. return nil
  579. }
  580. }
  581. // SetTraceLog specifies the log.Logger to use for output of HTTP requests
  582. // and responses which is helpful during debugging. It is nil by default.
  583. func SetTraceLog(logger Logger) ClientOptionFunc {
  584. return func(c *Client) error {
  585. c.tracelog = logger
  586. return nil
  587. }
  588. }
  589. // SetSendGetBodyAs specifies the HTTP method to use when sending a GET request
  590. // with a body. It is GET by default.
  591. func SetSendGetBodyAs(httpMethod string) ClientOptionFunc {
  592. return func(c *Client) error {
  593. c.sendGetBodyAs = httpMethod
  594. return nil
  595. }
  596. }
  597. // SetRetrier specifies the retry strategy that handles errors during
  598. // HTTP request/response with Elasticsearch.
  599. func SetRetrier(retrier Retrier) ClientOptionFunc {
  600. return func(c *Client) error {
  601. if retrier == nil {
  602. retrier = noRetries // no retries by default
  603. }
  604. c.retrier = retrier
  605. return nil
  606. }
  607. }
  608. // String returns a string representation of the client status.
  609. func (c *Client) String() string {
  610. c.connsMu.Lock()
  611. conns := c.conns
  612. c.connsMu.Unlock()
  613. var buf bytes.Buffer
  614. for i, conn := range conns {
  615. if i > 0 {
  616. buf.WriteString(", ")
  617. }
  618. buf.WriteString(conn.String())
  619. }
  620. return buf.String()
  621. }
  622. // IsRunning returns true if the background processes of the client are
  623. // running, false otherwise.
  624. func (c *Client) IsRunning() bool {
  625. c.mu.RLock()
  626. defer c.mu.RUnlock()
  627. return c.running
  628. }
  629. // Start starts the background processes like sniffing the cluster and
  630. // periodic health checks. You don't need to run Start when creating a
  631. // client with NewClient; the background processes are run by default.
  632. //
  633. // If the background processes are already running, this is a no-op.
  634. func (c *Client) Start() {
  635. c.mu.RLock()
  636. if c.running {
  637. c.mu.RUnlock()
  638. return
  639. }
  640. c.mu.RUnlock()
  641. if c.snifferEnabled {
  642. go c.sniffer()
  643. }
  644. if c.healthcheckEnabled {
  645. go c.healthchecker()
  646. }
  647. c.mu.Lock()
  648. c.running = true
  649. c.mu.Unlock()
  650. c.infof("elastic: client started")
  651. }
  652. // Stop stops the background processes that the client is running,
  653. // i.e. sniffing the cluster periodically and running health checks
  654. // on the nodes.
  655. //
  656. // If the background processes are not running, this is a no-op.
  657. func (c *Client) Stop() {
  658. c.mu.RLock()
  659. if !c.running {
  660. c.mu.RUnlock()
  661. return
  662. }
  663. c.mu.RUnlock()
  664. if c.healthcheckEnabled {
  665. c.healthcheckStop <- true
  666. <-c.healthcheckStop
  667. }
  668. if c.snifferEnabled {
  669. c.snifferStop <- true
  670. <-c.snifferStop
  671. }
  672. c.mu.Lock()
  673. c.running = false
  674. c.mu.Unlock()
  675. c.infof("elastic: client stopped")
  676. }
  677. // errorf logs to the error log.
  678. func (c *Client) errorf(format string, args ...interface{}) {
  679. if c.errorlog != nil {
  680. c.errorlog.Printf(format, args...)
  681. }
  682. }
  683. // infof logs informational messages.
  684. func (c *Client) infof(format string, args ...interface{}) {
  685. if c.infolog != nil {
  686. c.infolog.Printf(format, args...)
  687. }
  688. }
  689. // tracef logs to the trace log.
  690. func (c *Client) tracef(format string, args ...interface{}) {
  691. if c.tracelog != nil {
  692. c.tracelog.Printf(format, args...)
  693. }
  694. }
  695. // dumpRequest dumps the given HTTP request to the trace log.
  696. func (c *Client) dumpRequest(r *http.Request) {
  697. if c.tracelog != nil {
  698. out, err := httputil.DumpRequestOut(r, true)
  699. if err == nil {
  700. c.tracef("%s\n", string(out))
  701. }
  702. }
  703. }
  704. // dumpResponse dumps the given HTTP response to the trace log.
  705. func (c *Client) dumpResponse(resp *http.Response) {
  706. if c.tracelog != nil {
  707. out, err := httputil.DumpResponse(resp, true)
  708. if err == nil {
  709. c.tracef("%s\n", string(out))
  710. }
  711. }
  712. }
  713. // sniffer periodically runs sniff.
  714. func (c *Client) sniffer() {
  715. c.mu.RLock()
  716. timeout := c.snifferTimeout
  717. interval := c.snifferInterval
  718. c.mu.RUnlock()
  719. ticker := time.NewTicker(interval)
  720. defer ticker.Stop()
  721. for {
  722. select {
  723. case <-c.snifferStop:
  724. // we are asked to stop, so we signal back that we're stopping now
  725. c.snifferStop <- true
  726. return
  727. case <-ticker.C:
  728. c.sniff(timeout)
  729. }
  730. }
  731. }
  732. // sniff uses the Node Info API to return the list of nodes in the cluster.
  733. // It uses the list of URLs passed on startup plus the list of URLs found
  734. // by the preceding sniffing process (if sniffing is enabled).
  735. //
  736. // If sniffing is disabled, this is a no-op.
  737. func (c *Client) sniff(timeout time.Duration) error {
  738. c.mu.RLock()
  739. if !c.snifferEnabled {
  740. c.mu.RUnlock()
  741. return nil
  742. }
  743. // Use all available URLs provided to sniff the cluster.
  744. var urls []string
  745. urlsMap := make(map[string]bool)
  746. // Add all URLs provided on startup
  747. for _, url := range c.urls {
  748. urlsMap[url] = true
  749. urls = append(urls, url)
  750. }
  751. c.mu.RUnlock()
  752. // Add all URLs found by sniffing
  753. c.connsMu.RLock()
  754. for _, conn := range c.conns {
  755. if !conn.IsDead() {
  756. url := conn.URL()
  757. if _, found := urlsMap[url]; !found {
  758. urls = append(urls, url)
  759. }
  760. }
  761. }
  762. c.connsMu.RUnlock()
  763. if len(urls) == 0 {
  764. return errors.Wrap(ErrNoClient, "no URLs found")
  765. }
  766. // Start sniffing on all found URLs
  767. ch := make(chan []*conn, len(urls))
  768. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  769. defer cancel()
  770. for _, url := range urls {
  771. go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
  772. }
  773. // Wait for the results to come back, or the process times out.
  774. for {
  775. select {
  776. case conns := <-ch:
  777. if len(conns) > 0 {
  778. c.updateConns(conns)
  779. return nil
  780. }
  781. case <-ctx.Done():
  782. // We get here if no cluster responds in time
  783. return errors.Wrap(ErrNoClient, "sniff timeout")
  784. }
  785. }
  786. }
  787. // sniffNode sniffs a single node. This method is run as a goroutine
  788. // in sniff. If successful, it returns the list of node URLs extracted
  789. // from the result of calling Nodes Info API. Otherwise, an empty array
  790. // is returned.
  791. func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
  792. var nodes []*conn
  793. // Call the Nodes Info API at /_nodes/http
  794. req, err := NewRequest("GET", url+"/_nodes/http")
  795. if err != nil {
  796. return nodes
  797. }
  798. c.mu.RLock()
  799. if c.basicAuth {
  800. req.SetBasicAuth(c.basicAuthUsername, c.basicAuthPassword)
  801. }
  802. c.mu.RUnlock()
  803. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  804. if err != nil {
  805. return nodes
  806. }
  807. if res == nil {
  808. return nodes
  809. }
  810. if res.Body != nil {
  811. defer res.Body.Close()
  812. }
  813. var info NodesInfoResponse
  814. if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
  815. if len(info.Nodes) > 0 {
  816. for nodeID, node := range info.Nodes {
  817. if c.snifferCallback(node) {
  818. if node.HTTP != nil && len(node.HTTP.PublishAddress) > 0 {
  819. url := c.extractHostname(c.scheme, node.HTTP.PublishAddress)
  820. if url != "" {
  821. nodes = append(nodes, newConn(nodeID, url))
  822. }
  823. }
  824. }
  825. }
  826. }
  827. }
  828. return nodes
  829. }
  830. // reSniffHostAndPort is used to extract hostname and port from a result
  831. // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
  832. var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
  833. func (c *Client) extractHostname(scheme, address string) string {
  834. if strings.HasPrefix(address, "inet") {
  835. m := reSniffHostAndPort.FindStringSubmatch(address)
  836. if len(m) == 3 {
  837. return fmt.Sprintf("%s://%s:%s", scheme, m[1], m[2])
  838. }
  839. }
  840. s := address
  841. if idx := strings.Index(s, "/"); idx >= 0 {
  842. s = s[idx+1:]
  843. }
  844. if strings.Index(s, ":") < 0 {
  845. return ""
  846. }
  847. return fmt.Sprintf("%s://%s", scheme, s)
  848. }
  849. // updateConns updates the clients' connections with new information
  850. // gather by a sniff operation.
  851. func (c *Client) updateConns(conns []*conn) {
  852. c.connsMu.Lock()
  853. // Build up new connections:
  854. // If we find an existing connection, use that (including no. of failures etc.).
  855. // If we find a new connection, add it.
  856. var newConns []*conn
  857. for _, conn := range conns {
  858. var found bool
  859. for _, oldConn := range c.conns {
  860. if oldConn.NodeID() == conn.NodeID() {
  861. // Take over the old connection
  862. newConns = append(newConns, oldConn)
  863. found = true
  864. break
  865. }
  866. }
  867. if !found {
  868. // New connection didn't exist, so add it to our list of new conns.
  869. c.infof("elastic: %s joined the cluster", conn.URL())
  870. newConns = append(newConns, conn)
  871. }
  872. }
  873. c.conns = newConns
  874. c.cindex = -1
  875. c.connsMu.Unlock()
  876. }
  877. // healthchecker periodically runs healthcheck.
  878. func (c *Client) healthchecker() {
  879. c.mu.RLock()
  880. timeout := c.healthcheckTimeout
  881. interval := c.healthcheckInterval
  882. c.mu.RUnlock()
  883. ticker := time.NewTicker(interval)
  884. defer ticker.Stop()
  885. for {
  886. select {
  887. case <-c.healthcheckStop:
  888. // we are asked to stop, so we signal back that we're stopping now
  889. c.healthcheckStop <- true
  890. return
  891. case <-ticker.C:
  892. c.healthcheck(timeout, false)
  893. }
  894. }
  895. }
  896. // healthcheck does a health check on all nodes in the cluster. Depending on
  897. // the node state, it marks connections as dead, sets them alive etc.
  898. // If healthchecks are disabled and force is false, this is a no-op.
  899. // The timeout specifies how long to wait for a response from Elasticsearch.
  900. func (c *Client) healthcheck(timeout time.Duration, force bool) {
  901. c.mu.RLock()
  902. if !c.healthcheckEnabled && !force {
  903. c.mu.RUnlock()
  904. return
  905. }
  906. basicAuth := c.basicAuth
  907. basicAuthUsername := c.basicAuthUsername
  908. basicAuthPassword := c.basicAuthPassword
  909. c.mu.RUnlock()
  910. c.connsMu.RLock()
  911. conns := c.conns
  912. c.connsMu.RUnlock()
  913. for _, conn := range conns {
  914. // Run the HEAD request against ES with a timeout
  915. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  916. defer cancel()
  917. // Goroutine executes the HTTP request, returns an error and sets status
  918. var status int
  919. errc := make(chan error, 1)
  920. go func(url string) {
  921. req, err := NewRequest("HEAD", url)
  922. if err != nil {
  923. errc <- err
  924. return
  925. }
  926. if basicAuth {
  927. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  928. }
  929. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  930. if res != nil {
  931. status = res.StatusCode
  932. if res.Body != nil {
  933. res.Body.Close()
  934. }
  935. }
  936. errc <- err
  937. }(conn.URL())
  938. // Wait for the Goroutine (or its timeout)
  939. select {
  940. case <-ctx.Done(): // timeout
  941. c.errorf("elastic: %s is dead", conn.URL())
  942. conn.MarkAsDead()
  943. case err := <-errc:
  944. if err != nil {
  945. c.errorf("elastic: %s is dead", conn.URL())
  946. conn.MarkAsDead()
  947. break
  948. }
  949. if status >= 200 && status < 300 {
  950. conn.MarkAsAlive()
  951. } else {
  952. conn.MarkAsDead()
  953. c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
  954. }
  955. }
  956. }
  957. }
  958. // startupHealthcheck is used at startup to check if the server is available
  959. // at all.
  960. func (c *Client) startupHealthcheck(timeout time.Duration) error {
  961. c.mu.Lock()
  962. urls := c.urls
  963. basicAuth := c.basicAuth
  964. basicAuthUsername := c.basicAuthUsername
  965. basicAuthPassword := c.basicAuthPassword
  966. c.mu.Unlock()
  967. // If we don't get a connection after "timeout", we bail.
  968. start := time.Now()
  969. for {
  970. // Make a copy of the HTTP client provided via options to respect
  971. // settings like Basic Auth or a user-specified http.Transport.
  972. cl := new(http.Client)
  973. *cl = *c.c
  974. cl.Timeout = timeout
  975. for _, url := range urls {
  976. req, err := http.NewRequest("HEAD", url, nil)
  977. if err != nil {
  978. return err
  979. }
  980. if basicAuth {
  981. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  982. }
  983. res, err := cl.Do(req)
  984. if err == nil && res != nil && res.StatusCode >= 200 && res.StatusCode < 300 {
  985. return nil
  986. }
  987. }
  988. time.Sleep(1 * time.Second)
  989. if time.Now().Sub(start) > timeout {
  990. break
  991. }
  992. }
  993. return errors.Wrap(ErrNoClient, "health check timeout")
  994. }
  995. // next returns the next available connection, or ErrNoClient.
  996. func (c *Client) next() (*conn, error) {
  997. // We do round-robin here.
  998. // TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
  999. c.connsMu.Lock()
  1000. defer c.connsMu.Unlock()
  1001. i := 0
  1002. numConns := len(c.conns)
  1003. for {
  1004. i++
  1005. if i > numConns {
  1006. break // we visited all conns: they all seem to be dead
  1007. }
  1008. c.cindex++
  1009. if c.cindex >= numConns {
  1010. c.cindex = 0
  1011. }
  1012. conn := c.conns[c.cindex]
  1013. if !conn.IsDead() {
  1014. return conn, nil
  1015. }
  1016. }
  1017. // We have a deadlock here: All nodes are marked as dead.
  1018. // If sniffing is disabled, connections will never be marked alive again.
  1019. // So we are marking them as alive--if sniffing is disabled.
  1020. // They'll then be picked up in the next call to PerformRequest.
  1021. if !c.snifferEnabled {
  1022. c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
  1023. for _, conn := range c.conns {
  1024. conn.MarkAsAlive()
  1025. }
  1026. }
  1027. // We tried hard, but there is no node available
  1028. return nil, errors.Wrap(ErrNoClient, "no available connection")
  1029. }
  1030. // mustActiveConn returns nil if there is an active connection,
  1031. // otherwise ErrNoClient is returned.
  1032. func (c *Client) mustActiveConn() error {
  1033. c.connsMu.Lock()
  1034. defer c.connsMu.Unlock()
  1035. for _, c := range c.conns {
  1036. if !c.IsDead() {
  1037. return nil
  1038. }
  1039. }
  1040. return errors.Wrap(ErrNoClient, "no active connection found")
  1041. }
  1042. // PerformRequest does a HTTP request to Elasticsearch.
  1043. // It returns a response (which might be nil) and an error on failure.
  1044. //
  1045. // Optionally, a list of HTTP error codes to ignore can be passed.
  1046. // This is necessary for services that expect e.g. HTTP status 404 as a
  1047. // valid outcome (Exists, IndicesExists, IndicesTypeExists).
  1048. func (c *Client) PerformRequest(ctx context.Context, method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
  1049. start := time.Now().UTC()
  1050. c.mu.RLock()
  1051. timeout := c.healthcheckTimeout
  1052. basicAuth := c.basicAuth
  1053. basicAuthUsername := c.basicAuthUsername
  1054. basicAuthPassword := c.basicAuthPassword
  1055. sendGetBodyAs := c.sendGetBodyAs
  1056. gzipEnabled := c.gzipEnabled
  1057. c.mu.RUnlock()
  1058. var err error
  1059. var conn *conn
  1060. var req *Request
  1061. var resp *Response
  1062. var retried bool
  1063. var n int
  1064. // Change method if sendGetBodyAs is specified.
  1065. if method == "GET" && body != nil && sendGetBodyAs != "GET" {
  1066. method = sendGetBodyAs
  1067. }
  1068. for {
  1069. pathWithParams := path
  1070. if len(params) > 0 {
  1071. pathWithParams += "?" + params.Encode()
  1072. }
  1073. // Get a connection
  1074. conn, err = c.next()
  1075. if errors.Cause(err) == ErrNoClient {
  1076. n++
  1077. if !retried {
  1078. // Force a healtcheck as all connections seem to be dead.
  1079. c.healthcheck(timeout, false)
  1080. }
  1081. wait, ok, rerr := c.retrier.Retry(ctx, n, nil, nil, err)
  1082. if rerr != nil {
  1083. return nil, rerr
  1084. }
  1085. if !ok {
  1086. return nil, err
  1087. }
  1088. retried = true
  1089. time.Sleep(wait)
  1090. continue // try again
  1091. }
  1092. if err != nil {
  1093. c.errorf("elastic: cannot get connection from pool")
  1094. return nil, err
  1095. }
  1096. req, err = NewRequest(method, conn.URL()+pathWithParams)
  1097. if err != nil {
  1098. c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err)
  1099. return nil, err
  1100. }
  1101. if basicAuth {
  1102. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  1103. }
  1104. // Set body
  1105. if body != nil {
  1106. err = req.SetBody(body, gzipEnabled)
  1107. if err != nil {
  1108. c.errorf("elastic: couldn't set body %+v for request: %v", body, err)
  1109. return nil, err
  1110. }
  1111. }
  1112. // Tracing
  1113. c.dumpRequest((*http.Request)(req))
  1114. // Get response
  1115. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  1116. if err == context.Canceled || err == context.DeadlineExceeded {
  1117. // Proceed, but don't mark the node as dead
  1118. return nil, err
  1119. }
  1120. if ue, ok := err.(*url.Error); ok {
  1121. // This happens e.g. on redirect errors, see https://golang.org/src/net/http/client_test.go#L329
  1122. if ue.Err == context.Canceled || ue.Err == context.DeadlineExceeded {
  1123. // Proceed, but don't mark the node as dead
  1124. return nil, err
  1125. }
  1126. }
  1127. if err != nil {
  1128. n++
  1129. wait, ok, rerr := c.retrier.Retry(ctx, n, (*http.Request)(req), res, err)
  1130. if rerr != nil {
  1131. c.errorf("elastic: %s is dead", conn.URL())
  1132. conn.MarkAsDead()
  1133. return nil, rerr
  1134. }
  1135. if !ok {
  1136. c.errorf("elastic: %s is dead", conn.URL())
  1137. conn.MarkAsDead()
  1138. return nil, err
  1139. }
  1140. retried = true
  1141. time.Sleep(wait)
  1142. continue // try again
  1143. }
  1144. if res.Body != nil {
  1145. defer res.Body.Close()
  1146. }
  1147. // Tracing
  1148. c.dumpResponse(res)
  1149. // Check for errors
  1150. if err := checkResponse((*http.Request)(req), res, ignoreErrors...); err != nil {
  1151. // No retry if request succeeded
  1152. // We still try to return a response.
  1153. resp, _ = c.newResponse(res)
  1154. return resp, err
  1155. }
  1156. // We successfully made a request with this connection
  1157. conn.MarkAsHealthy()
  1158. resp, err = c.newResponse(res)
  1159. if err != nil {
  1160. return nil, err
  1161. }
  1162. break
  1163. }
  1164. duration := time.Now().UTC().Sub(start)
  1165. c.infof("%s %s [status:%d, request:%.3fs]",
  1166. strings.ToUpper(method),
  1167. req.URL,
  1168. resp.StatusCode,
  1169. float64(int64(duration/time.Millisecond))/1000)
  1170. return resp, nil
  1171. }
  1172. // -- Document APIs --
  1173. // Index a document.
  1174. func (c *Client) Index() *IndexService {
  1175. return NewIndexService(c)
  1176. }
  1177. // Get a document.
  1178. func (c *Client) Get() *GetService {
  1179. return NewGetService(c)
  1180. }
  1181. // MultiGet retrieves multiple documents in one roundtrip.
  1182. func (c *Client) MultiGet() *MgetService {
  1183. return NewMgetService(c)
  1184. }
  1185. // Mget retrieves multiple documents in one roundtrip.
  1186. func (c *Client) Mget() *MgetService {
  1187. return NewMgetService(c)
  1188. }
  1189. // Delete a document.
  1190. func (c *Client) Delete() *DeleteService {
  1191. return NewDeleteService(c)
  1192. }
  1193. // DeleteByQuery deletes documents as found by a query.
  1194. func (c *Client) DeleteByQuery(indices ...string) *DeleteByQueryService {
  1195. return NewDeleteByQueryService(c).Index(indices...)
  1196. }
  1197. // Update a document.
  1198. func (c *Client) Update() *UpdateService {
  1199. return NewUpdateService(c)
  1200. }
  1201. // UpdateByQuery performs an update on a set of documents.
  1202. func (c *Client) UpdateByQuery(indices ...string) *UpdateByQueryService {
  1203. return NewUpdateByQueryService(c).Index(indices...)
  1204. }
  1205. // Bulk is the entry point to mass insert/update/delete documents.
  1206. func (c *Client) Bulk() *BulkService {
  1207. return NewBulkService(c)
  1208. }
  1209. // BulkProcessor allows setting up a concurrent processor of bulk requests.
  1210. func (c *Client) BulkProcessor() *BulkProcessorService {
  1211. return NewBulkProcessorService(c)
  1212. }
  1213. // Reindex copies data from a source index into a destination index.
  1214. //
  1215. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-reindex.html
  1216. // for details on the Reindex API.
  1217. func (c *Client) Reindex() *ReindexService {
  1218. return NewReindexService(c)
  1219. }
  1220. // TermVectors returns information and statistics on terms in the fields
  1221. // of a particular document.
  1222. func (c *Client) TermVectors(index, typ string) *TermvectorsService {
  1223. builder := NewTermvectorsService(c)
  1224. builder = builder.Index(index).Type(typ)
  1225. return builder
  1226. }
  1227. // MultiTermVectors returns information and statistics on terms in the fields
  1228. // of multiple documents.
  1229. func (c *Client) MultiTermVectors() *MultiTermvectorService {
  1230. return NewMultiTermvectorService(c)
  1231. }
  1232. // -- Search APIs --
  1233. // Search is the entry point for searches.
  1234. func (c *Client) Search(indices ...string) *SearchService {
  1235. return NewSearchService(c).Index(indices...)
  1236. }
  1237. // Suggest returns a service to return suggestions.
  1238. func (c *Client) Suggest(indices ...string) *SuggestService {
  1239. return NewSuggestService(c).Index(indices...)
  1240. }
  1241. // MultiSearch is the entry point for multi searches.
  1242. func (c *Client) MultiSearch() *MultiSearchService {
  1243. return NewMultiSearchService(c)
  1244. }
  1245. // Count documents.
  1246. func (c *Client) Count(indices ...string) *CountService {
  1247. return NewCountService(c).Index(indices...)
  1248. }
  1249. // Explain computes a score explanation for a query and a specific document.
  1250. func (c *Client) Explain(index, typ, id string) *ExplainService {
  1251. return NewExplainService(c).Index(index).Type(typ).Id(id)
  1252. }
  1253. // TODO Search Template
  1254. // TODO Search Shards API
  1255. // TODO Search Exists API
  1256. // TODO Validate API
  1257. // FieldStats returns statistical information about fields in indices.
  1258. func (c *Client) FieldStats(indices ...string) *FieldStatsService {
  1259. return NewFieldStatsService(c).Index(indices...)
  1260. }
  1261. // Exists checks if a document exists.
  1262. func (c *Client) Exists() *ExistsService {
  1263. return NewExistsService(c)
  1264. }
  1265. // Scroll through documents. Use this to efficiently scroll through results
  1266. // while returning the results to a client.
  1267. func (c *Client) Scroll(indices ...string) *ScrollService {
  1268. return NewScrollService(c).Index(indices...)
  1269. }
  1270. // ClearScroll can be used to clear search contexts manually.
  1271. func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService {
  1272. return NewClearScrollService(c).ScrollId(scrollIds...)
  1273. }
  1274. // -- Indices APIs --
  1275. // CreateIndex returns a service to create a new index.
  1276. func (c *Client) CreateIndex(name string) *IndicesCreateService {
  1277. return NewIndicesCreateService(c).Index(name)
  1278. }
  1279. // DeleteIndex returns a service to delete an index.
  1280. func (c *Client) DeleteIndex(indices ...string) *IndicesDeleteService {
  1281. return NewIndicesDeleteService(c).Index(indices)
  1282. }
  1283. // IndexExists allows to check if an index exists.
  1284. func (c *Client) IndexExists(indices ...string) *IndicesExistsService {
  1285. return NewIndicesExistsService(c).Index(indices)
  1286. }
  1287. // ShrinkIndex returns a service to shrink one index into another.
  1288. func (c *Client) ShrinkIndex(source, target string) *IndicesShrinkService {
  1289. return NewIndicesShrinkService(c).Source(source).Target(target)
  1290. }
  1291. // RolloverIndex rolls an alias over to a new index when the existing index
  1292. // is considered to be too large or too old.
  1293. func (c *Client) RolloverIndex(alias string) *IndicesRolloverService {
  1294. return NewIndicesRolloverService(c).Alias(alias)
  1295. }
  1296. // TypeExists allows to check if one or more types exist in one or more indices.
  1297. func (c *Client) TypeExists() *IndicesExistsTypeService {
  1298. return NewIndicesExistsTypeService(c)
  1299. }
  1300. // IndexStats provides statistics on different operations happining
  1301. // in one or more indices.
  1302. func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
  1303. return NewIndicesStatsService(c).Index(indices...)
  1304. }
  1305. // OpenIndex opens an index.
  1306. func (c *Client) OpenIndex(name string) *IndicesOpenService {
  1307. return NewIndicesOpenService(c).Index(name)
  1308. }
  1309. // CloseIndex closes an index.
  1310. func (c *Client) CloseIndex(name string) *IndicesCloseService {
  1311. return NewIndicesCloseService(c).Index(name)
  1312. }
  1313. // IndexGet retrieves information about one or more indices.
  1314. // IndexGet is only available for Elasticsearch 1.4 or later.
  1315. func (c *Client) IndexGet(indices ...string) *IndicesGetService {
  1316. return NewIndicesGetService(c).Index(indices...)
  1317. }
  1318. // IndexGetSettings retrieves settings of all, one or more indices.
  1319. func (c *Client) IndexGetSettings(indices ...string) *IndicesGetSettingsService {
  1320. return NewIndicesGetSettingsService(c).Index(indices...)
  1321. }
  1322. // IndexPutSettings sets settings for all, one or more indices.
  1323. func (c *Client) IndexPutSettings(indices ...string) *IndicesPutSettingsService {
  1324. return NewIndicesPutSettingsService(c).Index(indices...)
  1325. }
  1326. // IndexAnalyze performs the analysis process on a text and returns the
  1327. // token breakdown of the text.
  1328. func (c *Client) IndexAnalyze() *IndicesAnalyzeService {
  1329. return NewIndicesAnalyzeService(c)
  1330. }
  1331. // Forcemerge optimizes one or more indices.
  1332. // It replaces the deprecated Optimize API.
  1333. func (c *Client) Forcemerge(indices ...string) *IndicesForcemergeService {
  1334. return NewIndicesForcemergeService(c).Index(indices...)
  1335. }
  1336. // Refresh asks Elasticsearch to refresh one or more indices.
  1337. func (c *Client) Refresh(indices ...string) *RefreshService {
  1338. return NewRefreshService(c).Index(indices...)
  1339. }
  1340. // Flush asks Elasticsearch to free memory from the index and
  1341. // flush data to disk.
  1342. func (c *Client) Flush(indices ...string) *IndicesFlushService {
  1343. return NewIndicesFlushService(c).Index(indices...)
  1344. }
  1345. // Alias enables the caller to add and/or remove aliases.
  1346. func (c *Client) Alias() *AliasService {
  1347. return NewAliasService(c)
  1348. }
  1349. // Aliases returns aliases by index name(s).
  1350. func (c *Client) Aliases() *AliasesService {
  1351. return NewAliasesService(c)
  1352. }
  1353. // GetTemplate gets a search template.
  1354. // Use IndexXXXTemplate funcs to manage index templates.
  1355. func (c *Client) GetTemplate() *GetTemplateService {
  1356. return NewGetTemplateService(c)
  1357. }
  1358. // PutTemplate creates or updates a search template.
  1359. // Use IndexXXXTemplate funcs to manage index templates.
  1360. func (c *Client) PutTemplate() *PutTemplateService {
  1361. return NewPutTemplateService(c)
  1362. }
  1363. // DeleteTemplate deletes a search template.
  1364. // Use IndexXXXTemplate funcs to manage index templates.
  1365. func (c *Client) DeleteTemplate() *DeleteTemplateService {
  1366. return NewDeleteTemplateService(c)
  1367. }
  1368. // IndexGetTemplate gets an index template.
  1369. // Use XXXTemplate funcs to manage search templates.
  1370. func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
  1371. return NewIndicesGetTemplateService(c).Name(names...)
  1372. }
  1373. // IndexTemplateExists gets check if an index template exists.
  1374. // Use XXXTemplate funcs to manage search templates.
  1375. func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
  1376. return NewIndicesExistsTemplateService(c).Name(name)
  1377. }
  1378. // IndexPutTemplate creates or updates an index template.
  1379. // Use XXXTemplate funcs to manage search templates.
  1380. func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
  1381. return NewIndicesPutTemplateService(c).Name(name)
  1382. }
  1383. // IndexDeleteTemplate deletes an index template.
  1384. // Use XXXTemplate funcs to manage search templates.
  1385. func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
  1386. return NewIndicesDeleteTemplateService(c).Name(name)
  1387. }
  1388. // GetMapping gets a mapping.
  1389. func (c *Client) GetMapping() *IndicesGetMappingService {
  1390. return NewIndicesGetMappingService(c)
  1391. }
  1392. // PutMapping registers a mapping.
  1393. func (c *Client) PutMapping() *IndicesPutMappingService {
  1394. return NewIndicesPutMappingService(c)
  1395. }
  1396. // GetFieldMapping gets mapping for fields.
  1397. func (c *Client) GetFieldMapping() *IndicesGetFieldMappingService {
  1398. return NewIndicesGetFieldMappingService(c)
  1399. }
  1400. // -- cat APIs --
  1401. // TODO cat aliases
  1402. // TODO cat allocation
  1403. // TODO cat count
  1404. // TODO cat fielddata
  1405. // TODO cat health
  1406. // TODO cat indices
  1407. // TODO cat master
  1408. // TODO cat nodes
  1409. // TODO cat pending tasks
  1410. // TODO cat plugins
  1411. // TODO cat recovery
  1412. // TODO cat thread pool
  1413. // TODO cat shards
  1414. // TODO cat segments
  1415. // -- Ingest APIs --
  1416. // IngestPutPipeline adds pipelines and updates existing pipelines in
  1417. // the cluster.
  1418. func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService {
  1419. return NewIngestPutPipelineService(c).Id(id)
  1420. }
  1421. // IngestGetPipeline returns pipelines based on ID.
  1422. func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService {
  1423. return NewIngestGetPipelineService(c).Id(ids...)
  1424. }
  1425. // IngestDeletePipeline deletes a pipeline by ID.
  1426. func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService {
  1427. return NewIngestDeletePipelineService(c).Id(id)
  1428. }
  1429. // IngestSimulatePipeline executes a specific pipeline against the set of
  1430. // documents provided in the body of the request.
  1431. func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService {
  1432. return NewIngestSimulatePipelineService(c)
  1433. }
  1434. // -- Cluster APIs --
  1435. // ClusterHealth retrieves the health of the cluster.
  1436. func (c *Client) ClusterHealth() *ClusterHealthService {
  1437. return NewClusterHealthService(c)
  1438. }
  1439. // ClusterState retrieves the state of the cluster.
  1440. func (c *Client) ClusterState() *ClusterStateService {
  1441. return NewClusterStateService(c)
  1442. }
  1443. // ClusterStats retrieves cluster statistics.
  1444. func (c *Client) ClusterStats() *ClusterStatsService {
  1445. return NewClusterStatsService(c)
  1446. }
  1447. // NodesInfo retrieves one or more or all of the cluster nodes information.
  1448. func (c *Client) NodesInfo() *NodesInfoService {
  1449. return NewNodesInfoService(c)
  1450. }
  1451. // NodesStats retrieves one or more or all of the cluster nodes statistics.
  1452. func (c *Client) NodesStats() *NodesStatsService {
  1453. return NewNodesStatsService(c)
  1454. }
  1455. // TasksCancel cancels tasks running on the specified nodes.
  1456. func (c *Client) TasksCancel() *TasksCancelService {
  1457. return NewTasksCancelService(c)
  1458. }
  1459. // TasksList retrieves the list of tasks running on the specified nodes.
  1460. func (c *Client) TasksList() *TasksListService {
  1461. return NewTasksListService(c)
  1462. }
  1463. // TasksGetTask retrieves a task running on the cluster.
  1464. func (c *Client) TasksGetTask() *TasksGetTaskService {
  1465. return NewTasksGetTaskService(c)
  1466. }
  1467. // TODO Pending cluster tasks
  1468. // TODO Cluster Reroute
  1469. // TODO Cluster Update Settings
  1470. // TODO Nodes Stats
  1471. // TODO Nodes hot_threads
  1472. // -- Snapshot and Restore --
  1473. // TODO Snapshot Delete
  1474. // TODO Snapshot Get
  1475. // TODO Snapshot Restore
  1476. // TODO Snapshot Status
  1477. // SnapshotCreate creates a snapshot.
  1478. func (c *Client) SnapshotCreate(repository string, snapshot string) *SnapshotCreateService {
  1479. return NewSnapshotCreateService(c).Repository(repository).Snapshot(snapshot)
  1480. }
  1481. // SnapshotCreateRepository creates or updates a snapshot repository.
  1482. func (c *Client) SnapshotCreateRepository(repository string) *SnapshotCreateRepositoryService {
  1483. return NewSnapshotCreateRepositoryService(c).Repository(repository)
  1484. }
  1485. // SnapshotDeleteRepository deletes a snapshot repository.
  1486. func (c *Client) SnapshotDeleteRepository(repositories ...string) *SnapshotDeleteRepositoryService {
  1487. return NewSnapshotDeleteRepositoryService(c).Repository(repositories...)
  1488. }
  1489. // SnapshotGetRepository gets a snapshot repository.
  1490. func (c *Client) SnapshotGetRepository(repositories ...string) *SnapshotGetRepositoryService {
  1491. return NewSnapshotGetRepositoryService(c).Repository(repositories...)
  1492. }
  1493. // SnapshotVerifyRepository verifies a snapshot repository.
  1494. func (c *Client) SnapshotVerifyRepository(repository string) *SnapshotVerifyRepositoryService {
  1495. return NewSnapshotVerifyRepositoryService(c).Repository(repository)
  1496. }
  1497. // -- Helpers and shortcuts --
  1498. // ElasticsearchVersion returns the version number of Elasticsearch
  1499. // running on the given URL.
  1500. func (c *Client) ElasticsearchVersion(url string) (string, error) {
  1501. res, _, err := c.Ping(url).Do(context.Background())
  1502. if err != nil {
  1503. return "", err
  1504. }
  1505. return res.Version.Number, nil
  1506. }
  1507. // IndexNames returns the names of all indices in the cluster.
  1508. func (c *Client) IndexNames() ([]string, error) {
  1509. res, err := c.IndexGetSettings().Index("_all").Do(context.Background())
  1510. if err != nil {
  1511. return nil, err
  1512. }
  1513. var names []string
  1514. for name := range res {
  1515. names = append(names, name)
  1516. }
  1517. return names, nil
  1518. }
  1519. // Ping checks if a given node in a cluster exists and (optionally)
  1520. // returns some basic information about the Elasticsearch server,
  1521. // e.g. the Elasticsearch version number.
  1522. //
  1523. // Notice that you need to specify a URL here explicitly.
  1524. func (c *Client) Ping(url string) *PingService {
  1525. return NewPingService(c).URL(url)
  1526. }
  1527. // WaitForStatus waits for the cluster to have the given status.
  1528. // This is a shortcut method for the ClusterHealth service.
  1529. //
  1530. // WaitForStatus waits for the specified timeout, e.g. "10s".
  1531. // If the cluster will have the given state within the timeout, nil is returned.
  1532. // If the request timed out, ErrTimeout is returned.
  1533. func (c *Client) WaitForStatus(status string, timeout string) error {
  1534. health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background())
  1535. if err != nil {
  1536. return err
  1537. }
  1538. if health.TimedOut {
  1539. return ErrTimeout
  1540. }
  1541. return nil
  1542. }
  1543. // WaitForGreenStatus waits for the cluster to have the "green" status.
  1544. // See WaitForStatus for more details.
  1545. func (c *Client) WaitForGreenStatus(timeout string) error {
  1546. return c.WaitForStatus("green", timeout)
  1547. }
  1548. // WaitForYellowStatus waits for the cluster to have the "yellow" status.
  1549. // See WaitForStatus for more details.
  1550. func (c *Client) WaitForYellowStatus(timeout string) error {
  1551. return c.WaitForStatus("yellow", timeout)
  1552. }
  1553. // IsConnError unwraps the given error value and checks if it is equal to
  1554. // elastic.ErrNoClient.
  1555. func IsConnErr(err error) bool {
  1556. return errors.Cause(err) == ErrNoClient
  1557. }