reindex.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. )
  10. // ReindexService is a method to copy documents from one index to another.
  11. // It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/5.0/docs-reindex.html.
  12. type ReindexService struct {
  13. client *Client
  14. pretty bool
  15. refresh string
  16. timeout string
  17. waitForActiveShards string
  18. waitForCompletion *bool
  19. requestsPerSecond *int
  20. body interface{}
  21. source *ReindexSource
  22. destination *ReindexDestination
  23. conflicts string
  24. size *int
  25. script *Script
  26. }
  27. // NewReindexService creates a new ReindexService.
  28. func NewReindexService(client *Client) *ReindexService {
  29. return &ReindexService{
  30. client: client,
  31. }
  32. }
  33. // WaitForActiveShards sets the number of shard copies that must be active before
  34. // proceeding with the reindex operation. Defaults to 1, meaning the primary shard only.
  35. // Set to `all` for all shard copies, otherwise set to any non-negative value less than or
  36. // equal to the total number of copies for the shard (number of replicas + 1).
  37. func (s *ReindexService) WaitForActiveShards(waitForActiveShards string) *ReindexService {
  38. s.waitForActiveShards = waitForActiveShards
  39. return s
  40. }
  41. // RequestsPerSecond specifies the throttle to set on this request in sub-requests per second.
  42. // -1 means set no throttle as does "unlimited" which is the only non-float this accepts.
  43. func (s *ReindexService) RequestsPerSecond(requestsPerSecond int) *ReindexService {
  44. s.requestsPerSecond = &requestsPerSecond
  45. return s
  46. }
  47. // Refresh indicates whether Elasticsearch should refresh the effected indexes
  48. // immediately.
  49. func (s *ReindexService) Refresh(refresh string) *ReindexService {
  50. s.refresh = refresh
  51. return s
  52. }
  53. // Timeout is the time each individual bulk request should wait for shards
  54. // that are unavailable.
  55. func (s *ReindexService) Timeout(timeout string) *ReindexService {
  56. s.timeout = timeout
  57. return s
  58. }
  59. // WaitForCompletion indicates whether Elasticsearch should block until the
  60. // reindex is complete.
  61. func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexService {
  62. s.waitForCompletion = &waitForCompletion
  63. return s
  64. }
  65. // Pretty indicates that the JSON response be indented and human readable.
  66. func (s *ReindexService) Pretty(pretty bool) *ReindexService {
  67. s.pretty = pretty
  68. return s
  69. }
  70. // Source specifies the source of the reindexing process.
  71. func (s *ReindexService) Source(source *ReindexSource) *ReindexService {
  72. s.source = source
  73. return s
  74. }
  75. // SourceIndex specifies the source index of the reindexing process.
  76. func (s *ReindexService) SourceIndex(index string) *ReindexService {
  77. if s.source == nil {
  78. s.source = NewReindexSource()
  79. }
  80. s.source = s.source.Index(index)
  81. return s
  82. }
  83. // Destination specifies the destination of the reindexing process.
  84. func (s *ReindexService) Destination(destination *ReindexDestination) *ReindexService {
  85. s.destination = destination
  86. return s
  87. }
  88. // DestinationIndex specifies the destination index of the reindexing process.
  89. func (s *ReindexService) DestinationIndex(index string) *ReindexService {
  90. if s.destination == nil {
  91. s.destination = NewReindexDestination()
  92. }
  93. s.destination = s.destination.Index(index)
  94. return s
  95. }
  96. // DestinationIndexAndType specifies both the destination index and type
  97. // of the reindexing process.
  98. func (s *ReindexService) DestinationIndexAndType(index, typ string) *ReindexService {
  99. if s.destination == nil {
  100. s.destination = NewReindexDestination()
  101. }
  102. s.destination = s.destination.Index(index)
  103. s.destination = s.destination.Type(typ)
  104. return s
  105. }
  106. // Conflicts indicates what to do when the process detects version conflicts.
  107. // Possible values are "proceed" and "abort".
  108. func (s *ReindexService) Conflicts(conflicts string) *ReindexService {
  109. s.conflicts = conflicts
  110. return s
  111. }
  112. // AbortOnVersionConflict aborts the request on version conflicts.
  113. // It is an alias to setting Conflicts("abort").
  114. func (s *ReindexService) AbortOnVersionConflict() *ReindexService {
  115. s.conflicts = "abort"
  116. return s
  117. }
  118. // ProceedOnVersionConflict aborts the request on version conflicts.
  119. // It is an alias to setting Conflicts("proceed").
  120. func (s *ReindexService) ProceedOnVersionConflict() *ReindexService {
  121. s.conflicts = "proceed"
  122. return s
  123. }
  124. // Size sets an upper limit for the number of processed documents.
  125. func (s *ReindexService) Size(size int) *ReindexService {
  126. s.size = &size
  127. return s
  128. }
  129. // Script allows for modification of the documents as they are reindexed
  130. // from source to destination.
  131. func (s *ReindexService) Script(script *Script) *ReindexService {
  132. s.script = script
  133. return s
  134. }
  135. // Body specifies the body of the request to send to Elasticsearch.
  136. // It overrides settings specified with other setters, e.g. Query.
  137. func (s *ReindexService) Body(body interface{}) *ReindexService {
  138. s.body = body
  139. return s
  140. }
  141. // buildURL builds the URL for the operation.
  142. func (s *ReindexService) buildURL() (string, url.Values, error) {
  143. // Build URL path
  144. path := "/_reindex"
  145. // Add query string parameters
  146. params := url.Values{}
  147. if s.pretty {
  148. params.Set("pretty", "1")
  149. }
  150. if s.refresh != "" {
  151. params.Set("refresh", s.refresh)
  152. }
  153. if s.timeout != "" {
  154. params.Set("timeout", s.timeout)
  155. }
  156. if s.requestsPerSecond != nil {
  157. params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond))
  158. }
  159. if s.waitForActiveShards != "" {
  160. params.Set("wait_for_active_shards", s.waitForActiveShards)
  161. }
  162. if s.waitForCompletion != nil {
  163. params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
  164. }
  165. return path, params, nil
  166. }
  167. // Validate checks if the operation is valid.
  168. func (s *ReindexService) Validate() error {
  169. var invalid []string
  170. if s.body != nil {
  171. return nil
  172. }
  173. if s.source == nil {
  174. invalid = append(invalid, "Source")
  175. } else {
  176. if len(s.source.indices) == 0 {
  177. invalid = append(invalid, "Source.Index")
  178. }
  179. }
  180. if s.destination == nil {
  181. invalid = append(invalid, "Destination")
  182. }
  183. if len(invalid) > 0 {
  184. return fmt.Errorf("missing required fields: %v", invalid)
  185. }
  186. return nil
  187. }
  188. // getBody returns the body part of the document request.
  189. func (s *ReindexService) getBody() (interface{}, error) {
  190. if s.body != nil {
  191. return s.body, nil
  192. }
  193. body := make(map[string]interface{})
  194. if s.conflicts != "" {
  195. body["conflicts"] = s.conflicts
  196. }
  197. if s.size != nil {
  198. body["size"] = *s.size
  199. }
  200. if s.script != nil {
  201. out, err := s.script.Source()
  202. if err != nil {
  203. return nil, err
  204. }
  205. body["script"] = out
  206. }
  207. src, err := s.source.Source()
  208. if err != nil {
  209. return nil, err
  210. }
  211. body["source"] = src
  212. dst, err := s.destination.Source()
  213. if err != nil {
  214. return nil, err
  215. }
  216. body["dest"] = dst
  217. return body, nil
  218. }
  219. // Do executes the operation.
  220. func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error) {
  221. // Check pre-conditions
  222. if err := s.Validate(); err != nil {
  223. return nil, err
  224. }
  225. // Get URL for request
  226. path, params, err := s.buildURL()
  227. if err != nil {
  228. return nil, err
  229. }
  230. // Setup HTTP request body
  231. body, err := s.getBody()
  232. if err != nil {
  233. return nil, err
  234. }
  235. // Get HTTP response
  236. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  237. if err != nil {
  238. return nil, err
  239. }
  240. // Return operation response
  241. ret := new(BulkIndexByScrollResponse)
  242. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  243. return nil, err
  244. }
  245. return ret, nil
  246. }
  247. // DoAsync executes the reindexing operation asynchronously by starting a new task.
  248. // Callers need to use the Task Management API to watch the outcome of the reindexing
  249. // operation.
  250. func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
  251. // Check pre-conditions
  252. if err := s.Validate(); err != nil {
  253. return nil, err
  254. }
  255. // DoAsync only makes sense with WaitForCompletion set to true
  256. if s.waitForCompletion != nil && *s.waitForCompletion {
  257. return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
  258. }
  259. f := false
  260. s.waitForCompletion = &f
  261. // Get URL for request
  262. path, params, err := s.buildURL()
  263. if err != nil {
  264. return nil, err
  265. }
  266. // Setup HTTP request body
  267. body, err := s.getBody()
  268. if err != nil {
  269. return nil, err
  270. }
  271. // Get HTTP response
  272. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  273. if err != nil {
  274. return nil, err
  275. }
  276. // Return operation response
  277. ret := new(StartTaskResult)
  278. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  279. return nil, err
  280. }
  281. return ret, nil
  282. }
  283. // -- Source of Reindex --
  284. // ReindexSource specifies the source of a Reindex process.
  285. type ReindexSource struct {
  286. searchType string // default in ES is "query_then_fetch"
  287. indices []string
  288. types []string
  289. routing *string
  290. preference *string
  291. requestCache *bool
  292. scroll string
  293. query Query
  294. sorts []SortInfo
  295. sorters []Sorter
  296. searchSource *SearchSource
  297. remoteInfo *ReindexRemoteInfo
  298. }
  299. // NewReindexSource creates a new ReindexSource.
  300. func NewReindexSource() *ReindexSource {
  301. return &ReindexSource{}
  302. }
  303. // SearchType is the search operation type. Possible values are
  304. // "query_then_fetch" and "dfs_query_then_fetch".
  305. func (r *ReindexSource) SearchType(searchType string) *ReindexSource {
  306. r.searchType = searchType
  307. return r
  308. }
  309. func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource {
  310. return r.SearchType("dfs_query_then_fetch")
  311. }
  312. func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource {
  313. return r.SearchType("query_then_fetch")
  314. }
  315. func (r *ReindexSource) Index(indices ...string) *ReindexSource {
  316. r.indices = append(r.indices, indices...)
  317. return r
  318. }
  319. func (r *ReindexSource) Type(types ...string) *ReindexSource {
  320. r.types = append(r.types, types...)
  321. return r
  322. }
  323. func (r *ReindexSource) Preference(preference string) *ReindexSource {
  324. r.preference = &preference
  325. return r
  326. }
  327. func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource {
  328. r.requestCache = &requestCache
  329. return r
  330. }
  331. func (r *ReindexSource) Scroll(scroll string) *ReindexSource {
  332. r.scroll = scroll
  333. return r
  334. }
  335. func (r *ReindexSource) Query(query Query) *ReindexSource {
  336. r.query = query
  337. return r
  338. }
  339. // Sort adds a sort order.
  340. func (s *ReindexSource) Sort(field string, ascending bool) *ReindexSource {
  341. s.sorts = append(s.sorts, SortInfo{Field: field, Ascending: ascending})
  342. return s
  343. }
  344. // SortWithInfo adds a sort order.
  345. func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
  346. s.sorts = append(s.sorts, info)
  347. return s
  348. }
  349. // SortBy adds a sort order.
  350. func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
  351. s.sorters = append(s.sorters, sorter...)
  352. return s
  353. }
  354. // RemoteInfo sets up reindexing from a remote cluster.
  355. func (s *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
  356. s.remoteInfo = ri
  357. return s
  358. }
  359. // Source returns a serializable JSON request for the request.
  360. func (r *ReindexSource) Source() (interface{}, error) {
  361. source := make(map[string]interface{})
  362. if r.query != nil {
  363. src, err := r.query.Source()
  364. if err != nil {
  365. return nil, err
  366. }
  367. source["query"] = src
  368. } else if r.searchSource != nil {
  369. src, err := r.searchSource.Source()
  370. if err != nil {
  371. return nil, err
  372. }
  373. source["source"] = src
  374. }
  375. if r.searchType != "" {
  376. source["search_type"] = r.searchType
  377. }
  378. switch len(r.indices) {
  379. case 0:
  380. case 1:
  381. source["index"] = r.indices[0]
  382. default:
  383. source["index"] = r.indices
  384. }
  385. switch len(r.types) {
  386. case 0:
  387. case 1:
  388. source["type"] = r.types[0]
  389. default:
  390. source["type"] = r.types
  391. }
  392. if r.preference != nil && *r.preference != "" {
  393. source["preference"] = *r.preference
  394. }
  395. if r.requestCache != nil {
  396. source["request_cache"] = fmt.Sprintf("%v", *r.requestCache)
  397. }
  398. if r.scroll != "" {
  399. source["scroll"] = r.scroll
  400. }
  401. if r.remoteInfo != nil {
  402. src, err := r.remoteInfo.Source()
  403. if err != nil {
  404. return nil, err
  405. }
  406. source["remote"] = src
  407. }
  408. if len(r.sorters) > 0 {
  409. var sortarr []interface{}
  410. for _, sorter := range r.sorters {
  411. src, err := sorter.Source()
  412. if err != nil {
  413. return nil, err
  414. }
  415. sortarr = append(sortarr, src)
  416. }
  417. source["sort"] = sortarr
  418. } else if len(r.sorts) > 0 {
  419. var sortarr []interface{}
  420. for _, sort := range r.sorts {
  421. src, err := sort.Source()
  422. if err != nil {
  423. return nil, err
  424. }
  425. sortarr = append(sortarr, src)
  426. }
  427. source["sort"] = sortarr
  428. }
  429. return source, nil
  430. }
  431. // ReindexRemoteInfo contains information for reindexing from a remote cluster.
  432. type ReindexRemoteInfo struct {
  433. host string
  434. username string
  435. password string
  436. socketTimeout string // e.g. "1m" or "30s"
  437. connectTimeout string // e.g. "1m" or "30s"
  438. }
  439. // NewReindexRemoteInfo creates a new ReindexRemoteInfo.
  440. func NewReindexRemoteInfo() *ReindexRemoteInfo {
  441. return &ReindexRemoteInfo{}
  442. }
  443. // Host sets the host information of the remote cluster.
  444. // It must be of the form "http(s)://<hostname>:<port>"
  445. func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo {
  446. ri.host = host
  447. return ri
  448. }
  449. // Username sets the username to authenticate with the remote cluster.
  450. func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo {
  451. ri.username = username
  452. return ri
  453. }
  454. // Password sets the password to authenticate with the remote cluster.
  455. func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo {
  456. ri.password = password
  457. return ri
  458. }
  459. // SocketTimeout sets the socket timeout to connect with the remote cluster.
  460. // Use ES compatible values like e.g. "30s" or "1m".
  461. func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo {
  462. ri.socketTimeout = timeout
  463. return ri
  464. }
  465. // ConnectTimeout sets the connection timeout to connect with the remote cluster.
  466. // Use ES compatible values like e.g. "30s" or "1m".
  467. func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo {
  468. ri.connectTimeout = timeout
  469. return ri
  470. }
  471. // Source returns the serializable JSON data for the request.
  472. func (ri *ReindexRemoteInfo) Source() (interface{}, error) {
  473. res := make(map[string]interface{})
  474. res["host"] = ri.host
  475. if len(ri.username) > 0 {
  476. res["username"] = ri.username
  477. }
  478. if len(ri.password) > 0 {
  479. res["password"] = ri.password
  480. }
  481. if len(ri.socketTimeout) > 0 {
  482. res["socket_timeout"] = ri.socketTimeout
  483. }
  484. if len(ri.connectTimeout) > 0 {
  485. res["connect_timeout"] = ri.connectTimeout
  486. }
  487. return res, nil
  488. }
  489. // -source Destination of Reindex --
  490. // ReindexDestination is the destination of a Reindex API call.
  491. // It is basically the meta data of a BulkIndexRequest.
  492. //
  493. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-reindex.html
  494. // fsourcer details.
  495. type ReindexDestination struct {
  496. index string
  497. typ string
  498. routing string
  499. parent string
  500. opType string
  501. version int64 // default is MATCH_ANY
  502. versionType string // default is "internal"
  503. }
  504. // NewReindexDestination returns a new ReindexDestination.
  505. func NewReindexDestination() *ReindexDestination {
  506. return &ReindexDestination{}
  507. }
  508. // Index specifies name of the Elasticsearch index to use as the destination
  509. // of a reindexing process.
  510. func (r *ReindexDestination) Index(index string) *ReindexDestination {
  511. r.index = index
  512. return r
  513. }
  514. // Type specifies the Elasticsearch type to use for reindexing.
  515. func (r *ReindexDestination) Type(typ string) *ReindexDestination {
  516. r.typ = typ
  517. return r
  518. }
  519. // Routing specifies a routing value for the reindexing request.
  520. // It can be "keep", "discard", or start with "=". The latter specifies
  521. // the routing on the bulk request.
  522. func (r *ReindexDestination) Routing(routing string) *ReindexDestination {
  523. r.routing = routing
  524. return r
  525. }
  526. // Keep sets the routing on the bulk request sent for each match to the routing
  527. // of the match (the default).
  528. func (r *ReindexDestination) Keep() *ReindexDestination {
  529. r.routing = "keep"
  530. return r
  531. }
  532. // Discard sets the routing on the bulk request sent for each match to null.
  533. func (r *ReindexDestination) Discard() *ReindexDestination {
  534. r.routing = "discard"
  535. return r
  536. }
  537. // Parent specifies the identifier of the parent document (if available).
  538. func (r *ReindexDestination) Parent(parent string) *ReindexDestination {
  539. r.parent = parent
  540. return r
  541. }
  542. // OpType specifies if this request should follow create-only or upsert
  543. // behavior. This follows the OpType of the standard document index API.
  544. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html#operation-type
  545. // for details.
  546. func (r *ReindexDestination) OpType(opType string) *ReindexDestination {
  547. r.opType = opType
  548. return r
  549. }
  550. // Version indicates the version of the document as part of an optimistic
  551. // concurrency model.
  552. func (r *ReindexDestination) Version(version int64) *ReindexDestination {
  553. r.version = version
  554. return r
  555. }
  556. // VersionType specifies how versions are created.
  557. func (r *ReindexDestination) VersionType(versionType string) *ReindexDestination {
  558. r.versionType = versionType
  559. return r
  560. }
  561. // Source returns a serializable JSON request for the request.
  562. func (r *ReindexDestination) Source() (interface{}, error) {
  563. source := make(map[string]interface{})
  564. if r.index != "" {
  565. source["index"] = r.index
  566. }
  567. if r.typ != "" {
  568. source["type"] = r.typ
  569. }
  570. if r.routing != "" {
  571. source["routing"] = r.routing
  572. }
  573. if r.opType != "" {
  574. source["op_type"] = r.opType
  575. }
  576. if r.parent != "" {
  577. source["parent"] = r.parent
  578. }
  579. if r.version > 0 {
  580. source["version"] = r.version
  581. }
  582. if r.versionType != "" {
  583. source["version_type"] = r.versionType
  584. }
  585. return source, nil
  586. }