scan.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "net/url"
  10. "strings"
  11. "github.com/olivere/elastic/uritemplates"
  12. )
  13. const (
  14. defaultKeepAlive = "5m"
  15. )
  16. var (
  17. // End of stream (or scan)
  18. EOS = errors.New("EOS")
  19. // No ScrollId
  20. ErrNoScrollId = errors.New("no scrollId")
  21. )
  22. // ScanService manages a cursor through documents in Elasticsearch.
  23. type ScanService struct {
  24. client *Client
  25. indices []string
  26. types []string
  27. keepAlive string
  28. query Query
  29. size *int
  30. pretty bool
  31. }
  32. func NewScanService(client *Client) *ScanService {
  33. builder := &ScanService{
  34. client: client,
  35. query: NewMatchAllQuery(),
  36. }
  37. return builder
  38. }
  39. func (s *ScanService) Index(index string) *ScanService {
  40. if s.indices == nil {
  41. s.indices = make([]string, 0)
  42. }
  43. s.indices = append(s.indices, index)
  44. return s
  45. }
  46. func (s *ScanService) Indices(indices ...string) *ScanService {
  47. if s.indices == nil {
  48. s.indices = make([]string, 0)
  49. }
  50. s.indices = append(s.indices, indices...)
  51. return s
  52. }
  53. func (s *ScanService) Type(typ string) *ScanService {
  54. if s.types == nil {
  55. s.types = make([]string, 0)
  56. }
  57. s.types = append(s.types, typ)
  58. return s
  59. }
  60. func (s *ScanService) Types(types ...string) *ScanService {
  61. if s.types == nil {
  62. s.types = make([]string, 0)
  63. }
  64. s.types = append(s.types, types...)
  65. return s
  66. }
  67. // Scroll is an alias for KeepAlive, the time to keep
  68. // the cursor alive (e.g. "5m" for 5 minutes).
  69. func (s *ScanService) Scroll(keepAlive string) *ScanService {
  70. s.keepAlive = keepAlive
  71. return s
  72. }
  73. // KeepAlive sets the maximum time the cursor will be
  74. // available before expiration (e.g. "5m" for 5 minutes).
  75. func (s *ScanService) KeepAlive(keepAlive string) *ScanService {
  76. s.keepAlive = keepAlive
  77. return s
  78. }
  79. func (s *ScanService) Query(query Query) *ScanService {
  80. s.query = query
  81. return s
  82. }
  83. func (s *ScanService) Pretty(pretty bool) *ScanService {
  84. s.pretty = pretty
  85. return s
  86. }
  87. func (s *ScanService) Size(size int) *ScanService {
  88. s.size = &size
  89. return s
  90. }
  91. func (s *ScanService) Do() (*ScanCursor, error) {
  92. // Build url
  93. path := "/"
  94. // Indices part
  95. indexPart := make([]string, 0)
  96. for _, index := range s.indices {
  97. index, err := uritemplates.Expand("{index}", map[string]string{
  98. "index": index,
  99. })
  100. if err != nil {
  101. return nil, err
  102. }
  103. indexPart = append(indexPart, index)
  104. }
  105. if len(indexPart) > 0 {
  106. path += strings.Join(indexPart, ",")
  107. }
  108. // Types
  109. typesPart := make([]string, 0)
  110. for _, typ := range s.types {
  111. typ, err := uritemplates.Expand("{type}", map[string]string{
  112. "type": typ,
  113. })
  114. if err != nil {
  115. return nil, err
  116. }
  117. typesPart = append(typesPart, typ)
  118. }
  119. if len(typesPart) > 0 {
  120. path += "/" + strings.Join(typesPart, ",")
  121. }
  122. // Search
  123. path += "/_search"
  124. // Parameters
  125. params := make(url.Values)
  126. params.Set("search_type", "scan")
  127. if s.pretty {
  128. params.Set("pretty", fmt.Sprintf("%v", s.pretty))
  129. }
  130. if s.keepAlive != "" {
  131. params.Set("scroll", s.keepAlive)
  132. } else {
  133. params.Set("scroll", defaultKeepAlive)
  134. }
  135. if s.size != nil && *s.size > 0 {
  136. params.Set("size", fmt.Sprintf("%d", *s.size))
  137. }
  138. // Set body
  139. body := make(map[string]interface{})
  140. if s.query != nil {
  141. body["query"] = s.query.Source()
  142. }
  143. // Get response
  144. res, err := s.client.PerformRequest("POST", path, params, body)
  145. if err != nil {
  146. return nil, err
  147. }
  148. // Return result
  149. searchResult := new(SearchResult)
  150. if err := json.Unmarshal(res.Body, searchResult); err != nil {
  151. return nil, err
  152. }
  153. cursor := NewScanCursor(s.client, s.keepAlive, s.pretty, searchResult)
  154. return cursor, nil
  155. }
  156. // scanCursor represents a single page of results from
  157. // an Elasticsearch Scan operation.
  158. type ScanCursor struct {
  159. Results *SearchResult
  160. client *Client
  161. keepAlive string
  162. pretty bool
  163. currentPage int
  164. }
  165. // newScanCursor returns a new initialized instance
  166. // of scanCursor.
  167. func NewScanCursor(client *Client, keepAlive string, pretty bool, searchResult *SearchResult) *ScanCursor {
  168. return &ScanCursor{
  169. client: client,
  170. keepAlive: keepAlive,
  171. pretty: pretty,
  172. Results: searchResult,
  173. }
  174. }
  175. // TotalHits is a convenience method that returns the number
  176. // of hits the cursor will iterate through.
  177. func (c *ScanCursor) TotalHits() int64 {
  178. if c.Results.Hits == nil {
  179. return 0
  180. }
  181. return c.Results.Hits.TotalHits
  182. }
  183. // Next returns the next search result or nil when all
  184. // documents have been scanned.
  185. //
  186. // Usage:
  187. //
  188. // for {
  189. // res, err := cursor.Next()
  190. // if err == elastic.EOS {
  191. // // End of stream (or scan)
  192. // break
  193. // }
  194. // if err != nil {
  195. // // Handle error
  196. // }
  197. // // Work with res
  198. // }
  199. //
  200. func (c *ScanCursor) Next() (*SearchResult, error) {
  201. if c.currentPage > 0 {
  202. if c.Results.Hits == nil || len(c.Results.Hits.Hits) == 0 || c.Results.Hits.TotalHits == 0 {
  203. return nil, EOS
  204. }
  205. }
  206. if c.Results.ScrollId == "" {
  207. return nil, EOS
  208. }
  209. // Build url
  210. path := "/_search/scroll"
  211. // Parameters
  212. params := make(url.Values)
  213. if c.pretty {
  214. params.Set("pretty", fmt.Sprintf("%v", c.pretty))
  215. }
  216. if c.keepAlive != "" {
  217. params.Set("scroll", c.keepAlive)
  218. } else {
  219. params.Set("scroll", defaultKeepAlive)
  220. }
  221. // Set body
  222. body := c.Results.ScrollId
  223. // Get response
  224. res, err := c.client.PerformRequest("POST", path, params, body)
  225. if err != nil {
  226. return nil, err
  227. }
  228. // Return result
  229. if err := json.Unmarshal(res.Body, c.Results); err != nil {
  230. return nil, err
  231. }
  232. c.currentPage += 1
  233. return c.Results, nil
  234. }