reindexer.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "encoding/json"
  7. "errors"
  8. )
  9. // Reindexer simplifies the process of reindexing an index. You typically
  10. // reindex a source index to a target index. However, you can also specify
  11. // a query that filters out documents from the source index before bulk
  12. // indexing them into the target index. The caller may also specify a
  13. // different client for the target, e.g. when copying indices from one
  14. // Elasticsearch cluster to another.
  15. //
  16. // Internally, the Reindex users a scan and scroll operation on the source
  17. // index and bulk indexing to push data into the target index.
  18. //
  19. // The caller is responsible for setting up and/or clearing the target index
  20. // before starting the reindex process.
  21. //
  22. // See http://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
  23. // for more information about reindexing.
  24. type Reindexer struct {
  25. sourceClient, targetClient *Client
  26. sourceIndex, targetIndex string
  27. query Query
  28. bulkSize int
  29. scroll string
  30. progress ReindexerProgressFunc
  31. statsOnly bool
  32. }
  33. // ReindexerProgressFunc is a callback that can be used with Reindexer
  34. // to report progress while reindexing data.
  35. type ReindexerProgressFunc func(current, total int64)
  36. // ReindexerResponse is returned from the Do func in a Reindexer.
  37. // By default, it returns the number of succeeded and failed bulk operations.
  38. // To return details about all failed items, set StatsOnly to false in
  39. // Reindexer.
  40. type ReindexerResponse struct {
  41. Success int64
  42. Failed int64
  43. Errors []*BulkResponseItem
  44. }
  45. // NewReindexer returns a new Reindexer.
  46. func NewReindexer(client *Client, source, target string) *Reindexer {
  47. return &Reindexer{
  48. sourceClient: client,
  49. sourceIndex: source,
  50. targetIndex: target,
  51. statsOnly: true,
  52. }
  53. }
  54. // TargetClient specifies a different client for the target. This is
  55. // necessary when the target index is in a different Elasticsearch cluster.
  56. // By default, the source and target clients are the same.
  57. func (ix *Reindexer) TargetClient(c *Client) *Reindexer {
  58. ix.targetClient = c
  59. return ix
  60. }
  61. // Query specifies the query to apply to the source. It filters out those
  62. // documents to be indexed into target. A nil query does not filter out any
  63. // documents.
  64. func (ix *Reindexer) Query(q Query) *Reindexer {
  65. ix.query = q
  66. return ix
  67. }
  68. // BulkSize returns the number of documents to send to Elasticsearch per chunk.
  69. // The default is 500.
  70. func (ix *Reindexer) BulkSize(size int) *Reindexer {
  71. ix.bulkSize = size
  72. return ix
  73. }
  74. // Scroll specifies for how long the scroll operation on the source index
  75. // should be maintained. The default is 5m.
  76. func (ix *Reindexer) Scroll(timeout string) *Reindexer {
  77. ix.scroll = timeout
  78. return ix
  79. }
  80. // Progress indicates a callback that will be called while indexing.
  81. func (ix *Reindexer) Progress(f ReindexerProgressFunc) *Reindexer {
  82. ix.progress = f
  83. return ix
  84. }
  85. // StatsOnly indicates whether the Do method should return details e.g. about
  86. // the documents that failed while indexing. It is true by default, i.e. only
  87. // the number of documents that succeeded/failed are returned. Set to false
  88. // if you want all the details.
  89. func (ix *Reindexer) StatsOnly(statsOnly bool) *Reindexer {
  90. ix.statsOnly = statsOnly
  91. return ix
  92. }
  93. // Do starts the reindexing process.
  94. func (ix *Reindexer) Do() (*ReindexerResponse, error) {
  95. if ix.sourceClient == nil {
  96. return nil, errors.New("no source client")
  97. }
  98. if ix.sourceIndex == "" {
  99. return nil, errors.New("no source index")
  100. }
  101. if ix.targetIndex == "" {
  102. return nil, errors.New("no target index")
  103. }
  104. if ix.targetClient == nil {
  105. ix.targetClient = ix.sourceClient
  106. }
  107. if ix.bulkSize <= 0 {
  108. ix.bulkSize = 500
  109. }
  110. if ix.scroll == "" {
  111. ix.scroll = "5m"
  112. }
  113. // Count total to report progress (if necessary)
  114. var err error
  115. var current, total int64
  116. if ix.progress != nil {
  117. total, err = ix.count()
  118. if err != nil {
  119. return nil, err
  120. }
  121. }
  122. // Prepare scan and scroll to iterate through the source index
  123. scanner := ix.sourceClient.Scan(ix.sourceIndex).Scroll(ix.scroll)
  124. if ix.query != nil {
  125. scanner = scanner.Query(ix.query)
  126. }
  127. cursor, err := scanner.Do()
  128. bulk := ix.targetClient.Bulk().Index(ix.targetIndex)
  129. ret := &ReindexerResponse{
  130. Errors: make([]*BulkResponseItem, 0),
  131. }
  132. // Main loop iterates through the source index and bulk indexes into target.
  133. for {
  134. docs, err := cursor.Next()
  135. if err == EOS {
  136. break
  137. }
  138. if err != nil {
  139. return ret, err
  140. }
  141. if docs.TotalHits() > 0 {
  142. for _, hit := range docs.Hits.Hits {
  143. if ix.progress != nil {
  144. current++
  145. ix.progress(current, total)
  146. }
  147. // TODO(oe) Do we need to deserialize here?
  148. source := make(map[string]interface{})
  149. if err := json.Unmarshal(*hit.Source, &source); err != nil {
  150. return ret, err
  151. }
  152. // Enqueue and write into target index
  153. req := NewBulkIndexRequest().Index(ix.targetIndex).Type(hit.Type).Id(hit.Id).Doc(source)
  154. bulk.Add(req)
  155. if bulk.NumberOfActions() >= ix.bulkSize {
  156. bulk, err = ix.commit(bulk, ret)
  157. if err != nil {
  158. return ret, err
  159. }
  160. }
  161. }
  162. }
  163. }
  164. // Final flush
  165. if bulk.NumberOfActions() > 0 {
  166. bulk, err = ix.commit(bulk, ret)
  167. if err != nil {
  168. return ret, err
  169. }
  170. bulk = nil
  171. }
  172. return ret, nil
  173. }
  174. // count returns the number of documents in the source index.
  175. // The query is taken into account, if specified.
  176. func (ix *Reindexer) count() (int64, error) {
  177. service := ix.sourceClient.Count(ix.sourceIndex)
  178. if ix.query != nil {
  179. service = service.Query(ix.query)
  180. }
  181. return service.Do()
  182. }
  183. // commit commits a bulk, updates the stats, and returns a fresh bulk service.
  184. func (ix *Reindexer) commit(bulk *BulkService, ret *ReindexerResponse) (*BulkService, error) {
  185. bres, err := bulk.Do()
  186. if err != nil {
  187. return nil, err
  188. }
  189. ret.Success += int64(len(bres.Succeeded()))
  190. failed := bres.Failed()
  191. ret.Failed += int64(len(failed))
  192. if !ix.statsOnly {
  193. ret.Errors = append(ret.Errors, failed...)
  194. }
  195. bulk = ix.targetClient.Bulk().Index(ix.targetIndex)
  196. return bulk, nil
  197. }