cluster-test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package main
  5. import (
  6. "encoding/json"
  7. "errors"
  8. "flag"
  9. "fmt"
  10. "log"
  11. "math/rand"
  12. "os"
  13. "runtime"
  14. "strings"
  15. "sync/atomic"
  16. "time"
  17. "github.com/olivere/elastic"
  18. )
  19. type Tweet struct {
  20. User string `json:"user"`
  21. Message string `json:"message"`
  22. Retweets int `json:"retweets"`
  23. Image string `json:"image,omitempty"`
  24. Created time.Time `json:"created,omitempty"`
  25. Tags []string `json:"tags,omitempty"`
  26. Location string `json:"location,omitempty"`
  27. Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
  28. }
  29. var (
  30. nodes = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')")
  31. n = flag.Int("n", 5, "number of goroutines that run searches")
  32. index = flag.String("index", "twitter", "name of ES index to use")
  33. errorlogfile = flag.String("errorlog", "", "error log file")
  34. infologfile = flag.String("infolog", "", "info log file")
  35. tracelogfile = flag.String("tracelog", "", "trace log file")
  36. retries = flag.Int("retries", elastic.DefaultMaxRetries, "number of retries")
  37. sniff = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer")
  38. sniffer = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval")
  39. healthcheck = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks")
  40. healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval")
  41. )
  42. func main() {
  43. flag.Parse()
  44. runtime.GOMAXPROCS(runtime.NumCPU())
  45. if *nodes == "" {
  46. log.Fatal("no nodes specified")
  47. }
  48. urls := strings.SplitN(*nodes, ",", -1)
  49. testcase, err := NewTestCase(*index, urls)
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. testcase.SetErrorLogFile(*errorlogfile)
  54. testcase.SetInfoLogFile(*infologfile)
  55. testcase.SetTraceLogFile(*tracelogfile)
  56. testcase.SetMaxRetries(*retries)
  57. testcase.SetHealthcheck(*healthcheck)
  58. testcase.SetHealthcheckInterval(*healthchecker)
  59. testcase.SetSniff(*sniff)
  60. testcase.SetSnifferInterval(*sniffer)
  61. if err := testcase.Run(*n); err != nil {
  62. log.Fatal(err)
  63. }
  64. select {}
  65. }
  66. type RunInfo struct {
  67. Success bool
  68. }
  69. type TestCase struct {
  70. nodes []string
  71. client *elastic.Client
  72. runs int64
  73. failures int64
  74. runCh chan RunInfo
  75. index string
  76. errorlogfile string
  77. infologfile string
  78. tracelogfile string
  79. maxRetries int
  80. healthcheck bool
  81. healthcheckInterval time.Duration
  82. sniff bool
  83. snifferInterval time.Duration
  84. }
  85. func NewTestCase(index string, nodes []string) (*TestCase, error) {
  86. if index == "" {
  87. return nil, errors.New("no index name specified")
  88. }
  89. return &TestCase{
  90. index: index,
  91. nodes: nodes,
  92. runCh: make(chan RunInfo),
  93. }, nil
  94. }
  95. func (t *TestCase) SetIndex(name string) {
  96. t.index = name
  97. }
  98. func (t *TestCase) SetErrorLogFile(name string) {
  99. t.errorlogfile = name
  100. }
  101. func (t *TestCase) SetInfoLogFile(name string) {
  102. t.infologfile = name
  103. }
  104. func (t *TestCase) SetTraceLogFile(name string) {
  105. t.tracelogfile = name
  106. }
  107. func (t *TestCase) SetMaxRetries(n int) {
  108. t.maxRetries = n
  109. }
  110. func (t *TestCase) SetSniff(enabled bool) {
  111. t.sniff = enabled
  112. }
  113. func (t *TestCase) SetSnifferInterval(d time.Duration) {
  114. t.snifferInterval = d
  115. }
  116. func (t *TestCase) SetHealthcheck(enabled bool) {
  117. t.healthcheck = enabled
  118. }
  119. func (t *TestCase) SetHealthcheckInterval(d time.Duration) {
  120. t.healthcheckInterval = d
  121. }
  122. func (t *TestCase) Run(n int) error {
  123. if err := t.setup(); err != nil {
  124. return err
  125. }
  126. for i := 1; i < n; i++ {
  127. go t.search()
  128. }
  129. go t.monitor()
  130. return nil
  131. }
  132. func (t *TestCase) monitor() {
  133. print := func() {
  134. fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), " ")
  135. }
  136. for {
  137. select {
  138. case run := <-t.runCh:
  139. atomic.AddInt64(&t.runs, 1)
  140. if !run.Success {
  141. atomic.AddInt64(&t.failures, 1)
  142. fmt.Println()
  143. }
  144. print()
  145. case <-time.After(5 * time.Second):
  146. // Print stats after some inactivity
  147. print()
  148. break
  149. }
  150. }
  151. }
  152. func (t *TestCase) setup() error {
  153. var errorlogger *log.Logger
  154. if t.errorlogfile != "" {
  155. f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  156. if err != nil {
  157. return err
  158. }
  159. errorlogger = log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile)
  160. }
  161. var infologger *log.Logger
  162. if t.infologfile != "" {
  163. f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  164. if err != nil {
  165. return err
  166. }
  167. infologger = log.New(f, "", log.LstdFlags)
  168. }
  169. // Trace request and response details like this
  170. var tracelogger *log.Logger
  171. if t.tracelogfile != "" {
  172. f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  173. if err != nil {
  174. return err
  175. }
  176. tracelogger = log.New(f, "", log.LstdFlags)
  177. }
  178. client, err := elastic.NewClient(
  179. elastic.SetURL(t.nodes...),
  180. elastic.SetErrorLog(errorlogger),
  181. elastic.SetInfoLog(infologger),
  182. elastic.SetTraceLog(tracelogger),
  183. elastic.SetMaxRetries(t.maxRetries),
  184. elastic.SetSniff(t.sniff),
  185. elastic.SetSnifferInterval(t.snifferInterval),
  186. elastic.SetHealthcheck(t.healthcheck),
  187. elastic.SetHealthcheckInterval(t.healthcheckInterval))
  188. if err != nil {
  189. // Handle error
  190. return err
  191. }
  192. t.client = client
  193. // Use the IndexExists service to check if a specified index exists.
  194. exists, err := t.client.IndexExists(t.index).Do()
  195. if err != nil {
  196. return err
  197. }
  198. if exists {
  199. deleteIndex, err := t.client.DeleteIndex(t.index).Do()
  200. if err != nil {
  201. return err
  202. }
  203. if !deleteIndex.Acknowledged {
  204. return errors.New("delete index not acknowledged")
  205. }
  206. }
  207. // Create a new index.
  208. createIndex, err := t.client.CreateIndex(t.index).Do()
  209. if err != nil {
  210. return err
  211. }
  212. if !createIndex.Acknowledged {
  213. return errors.New("create index not acknowledged")
  214. }
  215. // Index a tweet (using JSON serialization)
  216. tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
  217. _, err = t.client.Index().
  218. Index(t.index).
  219. Type("tweet").
  220. Id("1").
  221. BodyJson(tweet1).
  222. Do()
  223. if err != nil {
  224. return err
  225. }
  226. // Index a second tweet (by string)
  227. tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
  228. _, err = t.client.Index().
  229. Index(t.index).
  230. Type("tweet").
  231. Id("2").
  232. BodyString(tweet2).
  233. Do()
  234. if err != nil {
  235. return err
  236. }
  237. // Flush to make sure the documents got written.
  238. _, err = t.client.Flush().Index(t.index).Do()
  239. if err != nil {
  240. return err
  241. }
  242. return nil
  243. }
  244. func (t *TestCase) search() {
  245. // Loop forever to check for connection issues
  246. for {
  247. // Get tweet with specified ID
  248. get1, err := t.client.Get().
  249. Index(t.index).
  250. Type("tweet").
  251. Id("1").
  252. Do()
  253. if err != nil {
  254. //failf("Get failed: %v", err)
  255. t.runCh <- RunInfo{Success: false}
  256. continue
  257. }
  258. if !get1.Found {
  259. //log.Printf("Document %s not found\n", "1")
  260. //fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
  261. t.runCh <- RunInfo{Success: false}
  262. continue
  263. }
  264. // Search with a term query
  265. termQuery := elastic.NewTermQuery("user", "olivere")
  266. searchResult, err := t.client.Search().
  267. Index(t.index). // search in index t.index
  268. Query(&termQuery). // specify the query
  269. Sort("user", true). // sort by "user" field, ascending
  270. From(0).Size(10). // take documents 0-9
  271. Pretty(true). // pretty print request and response JSON
  272. Do() // execute
  273. if err != nil {
  274. //failf("Search failed: %v\n", err)
  275. t.runCh <- RunInfo{Success: false}
  276. continue
  277. }
  278. // searchResult is of type SearchResult and returns hits, suggestions,
  279. // and all kinds of other information from Elasticsearch.
  280. //fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
  281. // Number of hits
  282. if searchResult.Hits != nil {
  283. //fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
  284. // Iterate through results
  285. for _, hit := range searchResult.Hits.Hits {
  286. // hit.Index contains the name of the index
  287. // Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
  288. var tweet Tweet
  289. err := json.Unmarshal(*hit.Source, &tweet)
  290. if err != nil {
  291. // Deserialization failed
  292. //failf("Deserialize failed: %v\n", err)
  293. t.runCh <- RunInfo{Success: false}
  294. continue
  295. }
  296. // Work with tweet
  297. //fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
  298. }
  299. } else {
  300. // No hits
  301. //fmt.Print("Found no tweets\n")
  302. }
  303. t.runCh <- RunInfo{Success: true}
  304. // Sleep some time
  305. time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
  306. }
  307. }