client_test.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "log"
  12. "net"
  13. "net/http"
  14. "reflect"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "testing"
  19. "time"
  20. "github.com/fortytw2/leaktest"
  21. )
  22. func findConn(s string, slice ...*conn) (int, bool) {
  23. for i, t := range slice {
  24. if s == t.URL() {
  25. return i, true
  26. }
  27. }
  28. return -1, false
  29. }
  30. // -- NewClient --
  31. func TestClientDefaults(t *testing.T) {
  32. client, err := NewClient()
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. if client.healthcheckEnabled != true {
  37. t.Errorf("expected health checks to be enabled, got: %v", client.healthcheckEnabled)
  38. }
  39. if client.healthcheckTimeoutStartup != DefaultHealthcheckTimeoutStartup {
  40. t.Errorf("expected health checks timeout on startup = %v, got: %v", DefaultHealthcheckTimeoutStartup, client.healthcheckTimeoutStartup)
  41. }
  42. if client.healthcheckTimeout != DefaultHealthcheckTimeout {
  43. t.Errorf("expected health checks timeout = %v, got: %v", DefaultHealthcheckTimeout, client.healthcheckTimeout)
  44. }
  45. if client.healthcheckInterval != DefaultHealthcheckInterval {
  46. t.Errorf("expected health checks interval = %v, got: %v", DefaultHealthcheckInterval, client.healthcheckInterval)
  47. }
  48. if client.snifferEnabled != true {
  49. t.Errorf("expected sniffing to be enabled, got: %v", client.snifferEnabled)
  50. }
  51. if client.snifferTimeoutStartup != DefaultSnifferTimeoutStartup {
  52. t.Errorf("expected sniffer timeout on startup = %v, got: %v", DefaultSnifferTimeoutStartup, client.snifferTimeoutStartup)
  53. }
  54. if client.snifferTimeout != DefaultSnifferTimeout {
  55. t.Errorf("expected sniffer timeout = %v, got: %v", DefaultSnifferTimeout, client.snifferTimeout)
  56. }
  57. if client.snifferInterval != DefaultSnifferInterval {
  58. t.Errorf("expected sniffer interval = %v, got: %v", DefaultSnifferInterval, client.snifferInterval)
  59. }
  60. if client.basicAuth != false {
  61. t.Errorf("expected no basic auth; got: %v", client.basicAuth)
  62. }
  63. if client.basicAuthUsername != "" {
  64. t.Errorf("expected no basic auth username; got: %q", client.basicAuthUsername)
  65. }
  66. if client.basicAuthPassword != "" {
  67. t.Errorf("expected no basic auth password; got: %q", client.basicAuthUsername)
  68. }
  69. if client.sendGetBodyAs != "GET" {
  70. t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs)
  71. }
  72. }
  73. func TestClientWithoutURL(t *testing.T) {
  74. client, err := NewClient()
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. // Two things should happen here:
  79. // 1. The client starts sniffing the cluster on DefaultURL
  80. // 2. The sniffing process should find (at least) one node in the cluster, i.e. the DefaultURL
  81. if len(client.conns) == 0 {
  82. t.Fatalf("expected at least 1 node in the cluster, got: %d (%v)", len(client.conns), client.conns)
  83. }
  84. if !isTravis() {
  85. if _, found := findConn(DefaultURL, client.conns...); !found {
  86. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  87. }
  88. }
  89. }
  90. func TestClientWithSingleURL(t *testing.T) {
  91. client, err := NewClient(SetURL("http://127.0.0.1:9200"))
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. // Two things should happen here:
  96. // 1. The client starts sniffing the cluster on DefaultURL
  97. // 2. The sniffing process should find (at least) one node in the cluster, i.e. the DefaultURL
  98. if len(client.conns) == 0 {
  99. t.Fatalf("expected at least 1 node in the cluster, got: %d (%v)", len(client.conns), client.conns)
  100. }
  101. if !isTravis() {
  102. if _, found := findConn(DefaultURL, client.conns...); !found {
  103. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  104. }
  105. }
  106. }
  107. func TestClientWithMultipleURLs(t *testing.T) {
  108. client, err := NewClient(SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. // The client should sniff both URLs, but only 127.0.0.1:9200 should return nodes.
  113. if len(client.conns) != 1 {
  114. t.Fatalf("expected exactly 1 node in the local cluster, got: %d (%v)", len(client.conns), client.conns)
  115. }
  116. if !isTravis() {
  117. if client.conns[0].URL() != DefaultURL {
  118. t.Errorf("expected to find node with default URL of %s in %v", DefaultURL, client.conns)
  119. }
  120. }
  121. }
  122. func TestClientWithBasicAuth(t *testing.T) {
  123. client, err := NewClient(SetBasicAuth("user", "secret"))
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. if client.basicAuth != true {
  128. t.Errorf("expected basic auth; got: %v", client.basicAuth)
  129. }
  130. if got, want := client.basicAuthUsername, "user"; got != want {
  131. t.Errorf("expected basic auth username %q; got: %q", want, got)
  132. }
  133. if got, want := client.basicAuthPassword, "secret"; got != want {
  134. t.Errorf("expected basic auth password %q; got: %q", want, got)
  135. }
  136. }
  137. func TestClientWithBasicAuthInUserInfo(t *testing.T) {
  138. client, err := NewClient(SetURL("http://user1:secret1@localhost:9200", "http://user2:secret2@localhost:9200"))
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. if client.basicAuth != true {
  143. t.Errorf("expected basic auth; got: %v", client.basicAuth)
  144. }
  145. if got, want := client.basicAuthUsername, "user1"; got != want {
  146. t.Errorf("expected basic auth username %q; got: %q", want, got)
  147. }
  148. if got, want := client.basicAuthPassword, "secret1"; got != want {
  149. t.Errorf("expected basic auth password %q; got: %q", want, got)
  150. }
  151. }
  152. func TestClientSniffSuccess(t *testing.T) {
  153. client, err := NewClient(SetURL("http://127.0.0.1:19200", "http://127.0.0.1:9200"))
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. // The client should sniff both URLs, but only 127.0.0.1:9200 should return nodes.
  158. if len(client.conns) != 1 {
  159. t.Fatalf("expected exactly 1 node in the local cluster, got: %d (%v)", len(client.conns), client.conns)
  160. }
  161. }
  162. func TestClientSniffFailure(t *testing.T) {
  163. _, err := NewClient(SetURL("http://127.0.0.1:19200", "http://127.0.0.1:19201"))
  164. if err == nil {
  165. t.Fatalf("expected cluster to fail with no nodes found")
  166. }
  167. }
  168. func TestClientSnifferCallback(t *testing.T) {
  169. var calls int
  170. cb := func(node *NodesInfoNode) bool {
  171. calls++
  172. return false
  173. }
  174. _, err := NewClient(
  175. SetURL("http://127.0.0.1:19200", "http://127.0.0.1:9200"),
  176. SetSnifferCallback(cb))
  177. if err == nil {
  178. t.Fatalf("expected cluster to fail with no nodes found")
  179. }
  180. if calls != 1 {
  181. t.Fatalf("expected 1 call to the sniffer callback, got %d", calls)
  182. }
  183. }
  184. func TestClientSniffDisabled(t *testing.T) {
  185. client, err := NewClient(SetSniff(false), SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  186. if err != nil {
  187. t.Fatal(err)
  188. }
  189. // The client should not sniff, so it should have two connections.
  190. if len(client.conns) != 2 {
  191. t.Fatalf("expected 2 nodes, got: %d (%v)", len(client.conns), client.conns)
  192. }
  193. // Make two requests, so that both connections are being used
  194. for i := 0; i < len(client.conns); i++ {
  195. client.Flush().Do(context.TODO())
  196. }
  197. // The first connection (127.0.0.1:9200) should now be okay.
  198. if i, found := findConn("http://127.0.0.1:9200", client.conns...); !found {
  199. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9200")
  200. } else {
  201. if conn := client.conns[i]; conn.IsDead() {
  202. t.Fatal("expected connection to be alive, but it is dead")
  203. }
  204. }
  205. // The second connection (127.0.0.1:9201) should now be marked as dead.
  206. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  207. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  208. } else {
  209. if conn := client.conns[i]; !conn.IsDead() {
  210. t.Fatal("expected connection to be dead, but it is alive")
  211. }
  212. }
  213. }
  214. func TestClientWillMarkConnectionsAsAliveWhenAllAreDead(t *testing.T) {
  215. client, err := NewClient(SetURL("http://127.0.0.1:9201"),
  216. SetSniff(false), SetHealthcheck(false), SetMaxRetries(0))
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. // We should have a connection.
  221. if len(client.conns) != 1 {
  222. t.Fatalf("expected 1 node, got: %d (%v)", len(client.conns), client.conns)
  223. }
  224. // Make a request, so that the connections is marked as dead.
  225. client.Flush().Do(context.TODO())
  226. // The connection should now be marked as dead.
  227. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  228. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  229. } else {
  230. if conn := client.conns[i]; !conn.IsDead() {
  231. t.Fatalf("expected connection to be dead, got: %v", conn)
  232. }
  233. }
  234. // Now send another request and the connection should be marked as alive again.
  235. client.Flush().Do(context.TODO())
  236. if i, found := findConn("http://127.0.0.1:9201", client.conns...); !found {
  237. t.Fatalf("expected connection to %q to be found", "http://127.0.0.1:9201")
  238. } else {
  239. if conn := client.conns[i]; conn.IsDead() {
  240. t.Fatalf("expected connection to be alive, got: %v", conn)
  241. }
  242. }
  243. }
  244. func TestClientWithRequiredPlugins(t *testing.T) {
  245. _, err := NewClient(SetRequiredPlugins("no-such-plugin"))
  246. if err == nil {
  247. t.Fatal("expected error when creating client")
  248. }
  249. if got, want := err.Error(), "elastic: plugin no-such-plugin not found"; got != want {
  250. t.Fatalf("expected error %q; got: %q", want, got)
  251. }
  252. }
  253. func TestClientHealthcheckStartupTimeout(t *testing.T) {
  254. start := time.Now()
  255. _, err := NewClient(SetURL("http://localhost:9299"), SetHealthcheckTimeoutStartup(5*time.Second))
  256. duration := time.Now().Sub(start)
  257. if !IsConnErr(err) {
  258. t.Fatal(err)
  259. }
  260. if duration < 5*time.Second {
  261. t.Fatalf("expected a timeout in more than 5 seconds; got: %v", duration)
  262. }
  263. }
  264. func TestClientHealthcheckTimeoutLeak(t *testing.T) {
  265. // This test test checks if healthcheck requests are canceled
  266. // after timeout.
  267. // It contains couple of hacks which won't be needed once we
  268. // stop supporting Go1.7.
  269. // On Go1.7 it uses server side effects to monitor if connection
  270. // was closed,
  271. // and on Go 1.8+ we're additionally honestly monitoring routine
  272. // leaks via leaktest.
  273. mux := http.NewServeMux()
  274. var reqDoneMu sync.Mutex
  275. var reqDone bool
  276. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  277. cn, ok := w.(http.CloseNotifier)
  278. if !ok {
  279. t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
  280. }
  281. <-cn.CloseNotify()
  282. reqDoneMu.Lock()
  283. reqDone = true
  284. reqDoneMu.Unlock()
  285. })
  286. lis, err := net.Listen("tcp", "127.0.0.1:0")
  287. if err != nil {
  288. t.Fatalf("Couldn't setup listener: %v", err)
  289. }
  290. addr := lis.Addr().String()
  291. srv := &http.Server{
  292. Handler: mux,
  293. }
  294. go srv.Serve(lis)
  295. cli := &Client{
  296. c: &http.Client{},
  297. conns: []*conn{
  298. &conn{
  299. url: "http://" + addr + "/",
  300. },
  301. },
  302. }
  303. type closer interface {
  304. Shutdown(context.Context) error
  305. }
  306. // pre-Go1.8 Server can't Shutdown
  307. cl, isServerCloseable := (interface{}(srv)).(closer)
  308. // Since Go1.7 can't Shutdown() - there will be leak from server
  309. // Monitor leaks on Go 1.8+
  310. if isServerCloseable {
  311. defer leaktest.CheckTimeout(t, time.Second*10)()
  312. }
  313. cli.healthcheck(time.Millisecond*500, true)
  314. if isServerCloseable {
  315. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  316. defer cancel()
  317. cl.Shutdown(ctx)
  318. }
  319. <-time.After(time.Second)
  320. reqDoneMu.Lock()
  321. if !reqDone {
  322. reqDoneMu.Unlock()
  323. t.Fatal("Request wasn't canceled or stopped")
  324. }
  325. reqDoneMu.Unlock()
  326. }
  327. // -- NewSimpleClient --
  328. func TestSimpleClientDefaults(t *testing.T) {
  329. client, err := NewSimpleClient()
  330. if err != nil {
  331. t.Fatal(err)
  332. }
  333. if client.healthcheckEnabled != false {
  334. t.Errorf("expected health checks to be disabled, got: %v", client.healthcheckEnabled)
  335. }
  336. if client.healthcheckTimeoutStartup != off {
  337. t.Errorf("expected health checks timeout on startup = %v, got: %v", off, client.healthcheckTimeoutStartup)
  338. }
  339. if client.healthcheckTimeout != off {
  340. t.Errorf("expected health checks timeout = %v, got: %v", off, client.healthcheckTimeout)
  341. }
  342. if client.healthcheckInterval != off {
  343. t.Errorf("expected health checks interval = %v, got: %v", off, client.healthcheckInterval)
  344. }
  345. if client.snifferEnabled != false {
  346. t.Errorf("expected sniffing to be disabled, got: %v", client.snifferEnabled)
  347. }
  348. if client.snifferTimeoutStartup != off {
  349. t.Errorf("expected sniffer timeout on startup = %v, got: %v", off, client.snifferTimeoutStartup)
  350. }
  351. if client.snifferTimeout != off {
  352. t.Errorf("expected sniffer timeout = %v, got: %v", off, client.snifferTimeout)
  353. }
  354. if client.snifferInterval != off {
  355. t.Errorf("expected sniffer interval = %v, got: %v", off, client.snifferInterval)
  356. }
  357. if client.basicAuth != false {
  358. t.Errorf("expected no basic auth; got: %v", client.basicAuth)
  359. }
  360. if client.basicAuthUsername != "" {
  361. t.Errorf("expected no basic auth username; got: %q", client.basicAuthUsername)
  362. }
  363. if client.basicAuthPassword != "" {
  364. t.Errorf("expected no basic auth password; got: %q", client.basicAuthUsername)
  365. }
  366. if client.sendGetBodyAs != "GET" {
  367. t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs)
  368. }
  369. }
  370. // -- Start and stop --
  371. func TestClientStartAndStop(t *testing.T) {
  372. client, err := NewClient()
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. running := client.IsRunning()
  377. if !running {
  378. t.Fatalf("expected background processes to run; got: %v", running)
  379. }
  380. // Stop
  381. client.Stop()
  382. running = client.IsRunning()
  383. if running {
  384. t.Fatalf("expected background processes to be stopped; got: %v", running)
  385. }
  386. // Stop again => no-op
  387. client.Stop()
  388. running = client.IsRunning()
  389. if running {
  390. t.Fatalf("expected background processes to be stopped; got: %v", running)
  391. }
  392. // Start
  393. client.Start()
  394. running = client.IsRunning()
  395. if !running {
  396. t.Fatalf("expected background processes to run; got: %v", running)
  397. }
  398. // Start again => no-op
  399. client.Start()
  400. running = client.IsRunning()
  401. if !running {
  402. t.Fatalf("expected background processes to run; got: %v", running)
  403. }
  404. }
  405. func TestClientStartAndStopWithSnifferAndHealthchecksDisabled(t *testing.T) {
  406. client, err := NewClient(SetSniff(false), SetHealthcheck(false))
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. running := client.IsRunning()
  411. if !running {
  412. t.Fatalf("expected background processes to run; got: %v", running)
  413. }
  414. // Stop
  415. client.Stop()
  416. running = client.IsRunning()
  417. if running {
  418. t.Fatalf("expected background processes to be stopped; got: %v", running)
  419. }
  420. // Stop again => no-op
  421. client.Stop()
  422. running = client.IsRunning()
  423. if running {
  424. t.Fatalf("expected background processes to be stopped; got: %v", running)
  425. }
  426. // Start
  427. client.Start()
  428. running = client.IsRunning()
  429. if !running {
  430. t.Fatalf("expected background processes to run; got: %v", running)
  431. }
  432. // Start again => no-op
  433. client.Start()
  434. running = client.IsRunning()
  435. if !running {
  436. t.Fatalf("expected background processes to run; got: %v", running)
  437. }
  438. }
  439. // -- Sniffing --
  440. func TestClientSniffNode(t *testing.T) {
  441. client, err := NewClient()
  442. if err != nil {
  443. t.Fatal(err)
  444. }
  445. ch := make(chan []*conn)
  446. go func() { ch <- client.sniffNode(context.Background(), DefaultURL) }()
  447. select {
  448. case nodes := <-ch:
  449. if len(nodes) != 1 {
  450. t.Fatalf("expected %d nodes; got: %d", 1, len(nodes))
  451. }
  452. pattern := `http:\/\/[\d\.]+:9200`
  453. matched, err := regexp.MatchString(pattern, nodes[0].URL())
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. if !matched {
  458. t.Fatalf("expected node URL pattern %q; got: %q", pattern, nodes[0].URL())
  459. }
  460. case <-time.After(2 * time.Second):
  461. t.Fatal("expected no timeout in sniff node")
  462. break
  463. }
  464. }
  465. func TestClientSniffOnDefaultURL(t *testing.T) {
  466. client, _ := NewClient()
  467. if client == nil {
  468. t.Fatal("no client returned")
  469. }
  470. ch := make(chan error, 1)
  471. go func() {
  472. ch <- client.sniff(DefaultSnifferTimeoutStartup)
  473. }()
  474. select {
  475. case err := <-ch:
  476. if err != nil {
  477. t.Fatalf("expected sniff to succeed; got: %v", err)
  478. }
  479. if len(client.conns) != 1 {
  480. t.Fatalf("expected %d nodes; got: %d", 1, len(client.conns))
  481. }
  482. pattern := `http:\/\/[\d\.]+:9200`
  483. matched, err := regexp.MatchString(pattern, client.conns[0].URL())
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. if !matched {
  488. t.Fatalf("expected node URL pattern %q; got: %q", pattern, client.conns[0].URL())
  489. }
  490. case <-time.After(2 * time.Second):
  491. t.Fatal("expected no timeout in sniff")
  492. break
  493. }
  494. }
  495. func TestClientSniffTimeoutLeak(t *testing.T) {
  496. // This test test checks if sniff requests are canceled
  497. // after timeout.
  498. // It contains couple of hacks which won't be needed once we
  499. // stop supporting Go1.7.
  500. // On Go1.7 it uses server side effects to monitor if connection
  501. // was closed,
  502. // and on Go 1.8+ we're additionally honestly monitoring routine
  503. // leaks via leaktest.
  504. mux := http.NewServeMux()
  505. var reqDoneMu sync.Mutex
  506. var reqDone bool
  507. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  508. cn, ok := w.(http.CloseNotifier)
  509. if !ok {
  510. t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
  511. }
  512. <-cn.CloseNotify()
  513. reqDoneMu.Lock()
  514. reqDone = true
  515. reqDoneMu.Unlock()
  516. })
  517. lis, err := net.Listen("tcp", "127.0.0.1:0")
  518. if err != nil {
  519. t.Fatalf("Couldn't setup listener: %v", err)
  520. }
  521. addr := lis.Addr().String()
  522. srv := &http.Server{
  523. Handler: mux,
  524. }
  525. go srv.Serve(lis)
  526. cli := &Client{
  527. c: &http.Client{},
  528. conns: []*conn{
  529. &conn{
  530. url: "http://" + addr + "/",
  531. },
  532. },
  533. snifferEnabled: true,
  534. }
  535. type closer interface {
  536. Shutdown(context.Context) error
  537. }
  538. // pre-Go1.8 Server can't Shutdown
  539. cl, isServerCloseable := (interface{}(srv)).(closer)
  540. // Since Go1.7 can't Shutdown() - there will be leak from server
  541. // Monitor leaks on Go 1.8+
  542. if isServerCloseable {
  543. defer leaktest.CheckTimeout(t, time.Second*10)()
  544. }
  545. cli.sniff(time.Millisecond * 500)
  546. if isServerCloseable {
  547. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  548. defer cancel()
  549. cl.Shutdown(ctx)
  550. }
  551. <-time.After(time.Second)
  552. reqDoneMu.Lock()
  553. if !reqDone {
  554. reqDoneMu.Unlock()
  555. t.Fatal("Request wasn't canceled or stopped")
  556. }
  557. reqDoneMu.Unlock()
  558. }
  559. func TestClientExtractHostname(t *testing.T) {
  560. tests := []struct {
  561. Scheme string
  562. Address string
  563. Output string
  564. }{
  565. {
  566. Scheme: "http",
  567. Address: "",
  568. Output: "",
  569. },
  570. {
  571. Scheme: "https",
  572. Address: "abc",
  573. Output: "",
  574. },
  575. {
  576. Scheme: "http",
  577. Address: "127.0.0.1:19200",
  578. Output: "http://127.0.0.1:19200",
  579. },
  580. {
  581. Scheme: "https",
  582. Address: "127.0.0.1:9200",
  583. Output: "https://127.0.0.1:9200",
  584. },
  585. {
  586. Scheme: "http",
  587. Address: "myelk.local/10.1.0.24:9200",
  588. Output: "http://10.1.0.24:9200",
  589. },
  590. }
  591. client, err := NewClient(SetSniff(false), SetHealthcheck(false))
  592. if err != nil {
  593. t.Fatal(err)
  594. }
  595. for _, test := range tests {
  596. got := client.extractHostname(test.Scheme, test.Address)
  597. if want := test.Output; want != got {
  598. t.Errorf("expected %q; got: %q", want, got)
  599. }
  600. }
  601. }
  602. // -- Selector --
  603. func TestClientSelectConnHealthy(t *testing.T) {
  604. client, err := NewClient(
  605. SetSniff(false),
  606. SetHealthcheck(false),
  607. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  608. if err != nil {
  609. t.Fatal(err)
  610. }
  611. // Both are healthy, so we should get both URLs in round-robin
  612. client.conns[0].MarkAsHealthy()
  613. client.conns[1].MarkAsHealthy()
  614. // #1: Return 1st
  615. c, err := client.next()
  616. if err != nil {
  617. t.Fatal(err)
  618. }
  619. if c.URL() != client.conns[0].URL() {
  620. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  621. }
  622. // #2: Return 2nd
  623. c, err = client.next()
  624. if err != nil {
  625. t.Fatal(err)
  626. }
  627. if c.URL() != client.conns[1].URL() {
  628. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  629. }
  630. // #3: Return 1st
  631. c, err = client.next()
  632. if err != nil {
  633. t.Fatal(err)
  634. }
  635. if c.URL() != client.conns[0].URL() {
  636. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  637. }
  638. }
  639. func TestClientSelectConnHealthyAndDead(t *testing.T) {
  640. client, err := NewClient(
  641. SetSniff(false),
  642. SetHealthcheck(false),
  643. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  644. if err != nil {
  645. t.Fatal(err)
  646. }
  647. // 1st is healthy, second is dead
  648. client.conns[0].MarkAsHealthy()
  649. client.conns[1].MarkAsDead()
  650. // #1: Return 1st
  651. c, err := client.next()
  652. if err != nil {
  653. t.Fatal(err)
  654. }
  655. if c.URL() != client.conns[0].URL() {
  656. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  657. }
  658. // #2: Return 1st again
  659. c, err = client.next()
  660. if err != nil {
  661. t.Fatal(err)
  662. }
  663. if c.URL() != client.conns[0].URL() {
  664. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  665. }
  666. // #3: Return 1st again and again
  667. c, err = client.next()
  668. if err != nil {
  669. t.Fatal(err)
  670. }
  671. if c.URL() != client.conns[0].URL() {
  672. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[0].URL())
  673. }
  674. }
  675. func TestClientSelectConnDeadAndHealthy(t *testing.T) {
  676. client, err := NewClient(
  677. SetSniff(false),
  678. SetHealthcheck(false),
  679. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  680. if err != nil {
  681. t.Fatal(err)
  682. }
  683. // 1st is dead, 2nd is healthy
  684. client.conns[0].MarkAsDead()
  685. client.conns[1].MarkAsHealthy()
  686. // #1: Return 2nd
  687. c, err := client.next()
  688. if err != nil {
  689. t.Fatal(err)
  690. }
  691. if c.URL() != client.conns[1].URL() {
  692. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  693. }
  694. // #2: Return 2nd again
  695. c, err = client.next()
  696. if err != nil {
  697. t.Fatal(err)
  698. }
  699. if c.URL() != client.conns[1].URL() {
  700. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  701. }
  702. // #3: Return 2nd again and again
  703. c, err = client.next()
  704. if err != nil {
  705. t.Fatal(err)
  706. }
  707. if c.URL() != client.conns[1].URL() {
  708. t.Fatalf("expected %s; got: %s", c.URL(), client.conns[1].URL())
  709. }
  710. }
  711. func TestClientSelectConnAllDead(t *testing.T) {
  712. client, err := NewClient(
  713. SetSniff(false),
  714. SetHealthcheck(false),
  715. SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"))
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. // Both are dead
  720. client.conns[0].MarkAsDead()
  721. client.conns[1].MarkAsDead()
  722. // If all connections are dead, next should make them alive again, but
  723. // still return an error when it first finds out.
  724. c, err := client.next()
  725. if !IsConnErr(err) {
  726. t.Fatal(err)
  727. }
  728. if c != nil {
  729. t.Fatalf("expected no connection; got: %v", c)
  730. }
  731. // Return a connection
  732. c, err = client.next()
  733. if err != nil {
  734. t.Fatalf("expected no error; got: %v", err)
  735. }
  736. if c == nil {
  737. t.Fatalf("expected connection; got: %v", c)
  738. }
  739. // Return a connection
  740. c, err = client.next()
  741. if err != nil {
  742. t.Fatalf("expected no error; got: %v", err)
  743. }
  744. if c == nil {
  745. t.Fatalf("expected connection; got: %v", c)
  746. }
  747. }
  748. // -- ElasticsearchVersion --
  749. func TestElasticsearchVersion(t *testing.T) {
  750. client, err := NewClient()
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. version, err := client.ElasticsearchVersion(DefaultURL)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. if version == "" {
  759. t.Errorf("expected a version number, got: %q", version)
  760. }
  761. }
  762. // -- IndexNames --
  763. func TestIndexNames(t *testing.T) {
  764. client := setupTestClientAndCreateIndex(t)
  765. names, err := client.IndexNames()
  766. if err != nil {
  767. t.Fatal(err)
  768. }
  769. if len(names) == 0 {
  770. t.Fatalf("expected some index names, got: %d", len(names))
  771. }
  772. var found bool
  773. for _, name := range names {
  774. if name == testIndexName {
  775. found = true
  776. break
  777. }
  778. }
  779. if !found {
  780. t.Fatalf("expected to find index %q; got: %v", testIndexName, found)
  781. }
  782. }
  783. // -- PerformRequest --
  784. func TestPerformRequest(t *testing.T) {
  785. client, err := NewClient()
  786. if err != nil {
  787. t.Fatal(err)
  788. }
  789. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  790. if err != nil {
  791. t.Fatal(err)
  792. }
  793. if res == nil {
  794. t.Fatal("expected response to be != nil")
  795. }
  796. ret := new(PingResult)
  797. if err := json.Unmarshal(res.Body, ret); err != nil {
  798. t.Fatalf("expected no error on decode; got: %v", err)
  799. }
  800. if ret.ClusterName == "" {
  801. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  802. }
  803. }
  804. func TestPerformRequestWithSimpleClient(t *testing.T) {
  805. client, err := NewSimpleClient()
  806. if err != nil {
  807. t.Fatal(err)
  808. }
  809. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  810. if err != nil {
  811. t.Fatal(err)
  812. }
  813. if res == nil {
  814. t.Fatal("expected response to be != nil")
  815. }
  816. ret := new(PingResult)
  817. if err := json.Unmarshal(res.Body, ret); err != nil {
  818. t.Fatalf("expected no error on decode; got: %v", err)
  819. }
  820. if ret.ClusterName == "" {
  821. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  822. }
  823. }
  824. func TestPerformRequestWithLogger(t *testing.T) {
  825. var w bytes.Buffer
  826. out := log.New(&w, "LOGGER ", log.LstdFlags)
  827. client, err := NewClient(SetInfoLog(out), SetSniff(false))
  828. if err != nil {
  829. t.Fatal(err)
  830. }
  831. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  832. if err != nil {
  833. t.Fatal(err)
  834. }
  835. if res == nil {
  836. t.Fatal("expected response to be != nil")
  837. }
  838. ret := new(PingResult)
  839. if err := json.Unmarshal(res.Body, ret); err != nil {
  840. t.Fatalf("expected no error on decode; got: %v", err)
  841. }
  842. if ret.ClusterName == "" {
  843. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  844. }
  845. got := w.String()
  846. pattern := `^LOGGER \d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} GET http://.*/ \[status:200, request:\d+\.\d{3}s\]\n`
  847. matched, err := regexp.MatchString(pattern, got)
  848. if err != nil {
  849. t.Fatalf("expected log line to match %q; got: %v", pattern, err)
  850. }
  851. if !matched {
  852. t.Errorf("expected log line to match %q; got: %v", pattern, got)
  853. }
  854. }
  855. func TestPerformRequestWithLoggerAndTracer(t *testing.T) {
  856. var lw bytes.Buffer
  857. lout := log.New(&lw, "LOGGER ", log.LstdFlags)
  858. var tw bytes.Buffer
  859. tout := log.New(&tw, "TRACER ", log.LstdFlags)
  860. client, err := NewClient(SetInfoLog(lout), SetTraceLog(tout), SetSniff(false))
  861. if err != nil {
  862. t.Fatal(err)
  863. }
  864. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  865. if err != nil {
  866. t.Fatal(err)
  867. }
  868. if res == nil {
  869. t.Fatal("expected response to be != nil")
  870. }
  871. ret := new(PingResult)
  872. if err := json.Unmarshal(res.Body, ret); err != nil {
  873. t.Fatalf("expected no error on decode; got: %v", err)
  874. }
  875. if ret.ClusterName == "" {
  876. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  877. }
  878. lgot := lw.String()
  879. if lgot == "" {
  880. t.Errorf("expected logger output; got: %q", lgot)
  881. }
  882. tgot := tw.String()
  883. if tgot == "" {
  884. t.Errorf("expected tracer output; got: %q", tgot)
  885. }
  886. }
  887. func TestPerformRequestWithTracerOnError(t *testing.T) {
  888. var tw bytes.Buffer
  889. tout := log.New(&tw, "TRACER ", log.LstdFlags)
  890. client, err := NewClient(SetTraceLog(tout), SetSniff(false))
  891. if err != nil {
  892. t.Fatal(err)
  893. }
  894. client.PerformRequest(context.TODO(), "GET", "/no-such-index", nil, nil)
  895. tgot := tw.String()
  896. if tgot == "" {
  897. t.Errorf("expected tracer output; got: %q", tgot)
  898. }
  899. }
  900. type customLogger struct {
  901. out bytes.Buffer
  902. }
  903. func (l *customLogger) Printf(format string, v ...interface{}) {
  904. l.out.WriteString(fmt.Sprintf(format, v...) + "\n")
  905. }
  906. func TestPerformRequestWithCustomLogger(t *testing.T) {
  907. logger := &customLogger{}
  908. client, err := NewClient(SetInfoLog(logger), SetSniff(false))
  909. if err != nil {
  910. t.Fatal(err)
  911. }
  912. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  913. if err != nil {
  914. t.Fatal(err)
  915. }
  916. if res == nil {
  917. t.Fatal("expected response to be != nil")
  918. }
  919. ret := new(PingResult)
  920. if err := json.Unmarshal(res.Body, ret); err != nil {
  921. t.Fatalf("expected no error on decode; got: %v", err)
  922. }
  923. if ret.ClusterName == "" {
  924. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  925. }
  926. got := logger.out.String()
  927. pattern := `^GET http://.*/ \[status:200, request:\d+\.\d{3}s\]\n`
  928. matched, err := regexp.MatchString(pattern, got)
  929. if err != nil {
  930. t.Fatalf("expected log line to match %q; got: %v", pattern, err)
  931. }
  932. if !matched {
  933. t.Errorf("expected log line to match %q; got: %v", pattern, got)
  934. }
  935. }
  936. // failingTransport will run a fail callback if it sees a given URL path prefix.
  937. type failingTransport struct {
  938. path string // path prefix to look for
  939. fail func(*http.Request) (*http.Response, error) // call when path prefix is found
  940. next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil)
  941. }
  942. // RoundTrip implements a failing transport.
  943. func (tr *failingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
  944. if strings.HasPrefix(r.URL.Path, tr.path) && tr.fail != nil {
  945. return tr.fail(r)
  946. }
  947. if tr.next != nil {
  948. return tr.next.RoundTrip(r)
  949. }
  950. return http.DefaultTransport.RoundTrip(r)
  951. }
  952. func TestPerformRequestRetryOnHttpError(t *testing.T) {
  953. var numFailedReqs int
  954. fail := func(r *http.Request) (*http.Response, error) {
  955. numFailedReqs += 1
  956. //return &http.Response{Request: r, StatusCode: 400}, nil
  957. return nil, errors.New("request failed")
  958. }
  959. // Run against a failing endpoint and see if PerformRequest
  960. // retries correctly.
  961. tr := &failingTransport{path: "/fail", fail: fail}
  962. httpClient := &http.Client{Transport: tr}
  963. client, err := NewClient(SetHttpClient(httpClient), SetMaxRetries(5), SetHealthcheck(false))
  964. if err != nil {
  965. t.Fatal(err)
  966. }
  967. res, err := client.PerformRequest(context.TODO(), "GET", "/fail", nil, nil)
  968. if err == nil {
  969. t.Fatal("expected error")
  970. }
  971. if res != nil {
  972. t.Fatal("expected no response")
  973. }
  974. // Connection should be marked as dead after it failed
  975. if numFailedReqs != 5 {
  976. t.Errorf("expected %d failed requests; got: %d", 5, numFailedReqs)
  977. }
  978. }
  979. func TestPerformRequestNoRetryOnValidButUnsuccessfulHttpStatus(t *testing.T) {
  980. var numFailedReqs int
  981. fail := func(r *http.Request) (*http.Response, error) {
  982. numFailedReqs += 1
  983. return &http.Response{Request: r, StatusCode: 500}, nil
  984. }
  985. // Run against a failing endpoint and see if PerformRequest
  986. // retries correctly.
  987. tr := &failingTransport{path: "/fail", fail: fail}
  988. httpClient := &http.Client{Transport: tr}
  989. client, err := NewClient(SetHttpClient(httpClient), SetMaxRetries(5), SetHealthcheck(false))
  990. if err != nil {
  991. t.Fatal(err)
  992. }
  993. res, err := client.PerformRequest(context.TODO(), "GET", "/fail", nil, nil)
  994. if err == nil {
  995. t.Fatal("expected error")
  996. }
  997. if res == nil {
  998. t.Fatal("expected response, got nil")
  999. }
  1000. if want, got := 500, res.StatusCode; want != got {
  1001. t.Fatalf("expected status code = %d, got %d", want, got)
  1002. }
  1003. // Retry should not have triggered additional requests because
  1004. if numFailedReqs != 1 {
  1005. t.Errorf("expected %d failed requests; got: %d", 1, numFailedReqs)
  1006. }
  1007. }
  1008. // failingBody will return an error when json.Marshal is called on it.
  1009. type failingBody struct{}
  1010. // MarshalJSON implements the json.Marshaler interface and always returns an error.
  1011. func (fb failingBody) MarshalJSON() ([]byte, error) {
  1012. return nil, errors.New("failing to marshal")
  1013. }
  1014. func TestPerformRequestWithSetBodyError(t *testing.T) {
  1015. client, err := NewClient()
  1016. if err != nil {
  1017. t.Fatal(err)
  1018. }
  1019. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, failingBody{})
  1020. if err == nil {
  1021. t.Fatal("expected error")
  1022. }
  1023. if res != nil {
  1024. t.Fatal("expected no response")
  1025. }
  1026. }
  1027. // sleepingTransport will sleep before doing a request.
  1028. type sleepingTransport struct {
  1029. timeout time.Duration
  1030. }
  1031. // RoundTrip implements a "sleepy" transport.
  1032. func (tr *sleepingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
  1033. time.Sleep(tr.timeout)
  1034. return http.DefaultTransport.RoundTrip(r)
  1035. }
  1036. func TestPerformRequestWithCancel(t *testing.T) {
  1037. tr := &sleepingTransport{timeout: 3 * time.Second}
  1038. httpClient := &http.Client{Transport: tr}
  1039. client, err := NewSimpleClient(SetHttpClient(httpClient), SetMaxRetries(0))
  1040. if err != nil {
  1041. t.Fatal(err)
  1042. }
  1043. type result struct {
  1044. res *Response
  1045. err error
  1046. }
  1047. ctx, cancel := context.WithCancel(context.Background())
  1048. resc := make(chan result, 1)
  1049. go func() {
  1050. res, err := client.PerformRequest(ctx, "GET", "/", nil, nil)
  1051. resc <- result{res: res, err: err}
  1052. }()
  1053. select {
  1054. case <-time.After(1 * time.Second):
  1055. cancel()
  1056. case res := <-resc:
  1057. t.Fatalf("expected response before cancel, got %v", res)
  1058. case <-ctx.Done():
  1059. t.Fatalf("expected no early termination, got ctx.Done(): %v", ctx.Err())
  1060. }
  1061. err = ctx.Err()
  1062. if err != context.Canceled {
  1063. t.Fatalf("expected error context.Canceled, got: %v", err)
  1064. }
  1065. }
  1066. func TestPerformRequestWithTimeout(t *testing.T) {
  1067. tr := &sleepingTransport{timeout: 3 * time.Second}
  1068. httpClient := &http.Client{Transport: tr}
  1069. client, err := NewSimpleClient(SetHttpClient(httpClient), SetMaxRetries(0))
  1070. if err != nil {
  1071. t.Fatal(err)
  1072. }
  1073. type result struct {
  1074. res *Response
  1075. err error
  1076. }
  1077. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  1078. defer cancel()
  1079. resc := make(chan result, 1)
  1080. go func() {
  1081. res, err := client.PerformRequest(ctx, "GET", "/", nil, nil)
  1082. resc <- result{res: res, err: err}
  1083. }()
  1084. select {
  1085. case res := <-resc:
  1086. t.Fatalf("expected timeout before response, got %v", res)
  1087. case <-ctx.Done():
  1088. err := ctx.Err()
  1089. if err != context.DeadlineExceeded {
  1090. t.Fatalf("expected error context.DeadlineExceeded, got: %v", err)
  1091. }
  1092. }
  1093. }
  1094. // -- Compression --
  1095. // Notice that the trace log does always print "Accept-Encoding: gzip"
  1096. // regardless of whether compression is enabled or not. This is because
  1097. // of the underlying "httputil.DumpRequestOut".
  1098. //
  1099. // Use a real HTTP proxy/recorder to convince yourself that
  1100. // "Accept-Encoding: gzip" is NOT sent when DisableCompression
  1101. // is set to true.
  1102. //
  1103. // See also:
  1104. // https://groups.google.com/forum/#!topic/golang-nuts/ms8QNCzew8Q
  1105. func TestPerformRequestWithCompressionEnabled(t *testing.T) {
  1106. testPerformRequestWithCompression(t, &http.Client{
  1107. Transport: &http.Transport{
  1108. DisableCompression: true,
  1109. },
  1110. })
  1111. }
  1112. func TestPerformRequestWithCompressionDisabled(t *testing.T) {
  1113. testPerformRequestWithCompression(t, &http.Client{
  1114. Transport: &http.Transport{
  1115. DisableCompression: false,
  1116. },
  1117. })
  1118. }
  1119. func testPerformRequestWithCompression(t *testing.T, hc *http.Client) {
  1120. client, err := NewClient(SetHttpClient(hc), SetSniff(false))
  1121. if err != nil {
  1122. t.Fatal(err)
  1123. }
  1124. res, err := client.PerformRequest(context.TODO(), "GET", "/", nil, nil)
  1125. if err != nil {
  1126. t.Fatal(err)
  1127. }
  1128. if res == nil {
  1129. t.Fatal("expected response to be != nil")
  1130. }
  1131. ret := new(PingResult)
  1132. if err := json.Unmarshal(res.Body, ret); err != nil {
  1133. t.Fatalf("expected no error on decode; got: %v", err)
  1134. }
  1135. if ret.ClusterName == "" {
  1136. t.Errorf("expected cluster name; got: %q", ret.ClusterName)
  1137. }
  1138. }