bulk_index_request.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "strings"
  9. )
  10. // Bulk request to add document to Elasticsearch.
  11. type BulkIndexRequest struct {
  12. BulkableRequest
  13. index string
  14. typ string
  15. id string
  16. opType string
  17. routing string
  18. parent string
  19. timestamp string
  20. ttl int64
  21. refresh *bool
  22. version int64 // default is MATCH_ANY
  23. versionType string // default is "internal"
  24. doc interface{}
  25. }
  26. func NewBulkIndexRequest() *BulkIndexRequest {
  27. return &BulkIndexRequest{
  28. opType: "index",
  29. }
  30. }
  31. func (r *BulkIndexRequest) Index(index string) *BulkIndexRequest {
  32. r.index = index
  33. return r
  34. }
  35. func (r *BulkIndexRequest) Type(typ string) *BulkIndexRequest {
  36. r.typ = typ
  37. return r
  38. }
  39. func (r *BulkIndexRequest) Id(id string) *BulkIndexRequest {
  40. r.id = id
  41. return r
  42. }
  43. func (r *BulkIndexRequest) OpType(opType string) *BulkIndexRequest {
  44. r.opType = opType
  45. return r
  46. }
  47. func (r *BulkIndexRequest) Routing(routing string) *BulkIndexRequest {
  48. r.routing = routing
  49. return r
  50. }
  51. func (r *BulkIndexRequest) Parent(parent string) *BulkIndexRequest {
  52. r.parent = parent
  53. return r
  54. }
  55. func (r *BulkIndexRequest) Timestamp(timestamp string) *BulkIndexRequest {
  56. r.timestamp = timestamp
  57. return r
  58. }
  59. func (r *BulkIndexRequest) Ttl(ttl int64) *BulkIndexRequest {
  60. r.ttl = ttl
  61. return r
  62. }
  63. func (r *BulkIndexRequest) Refresh(refresh bool) *BulkIndexRequest {
  64. r.refresh = &refresh
  65. return r
  66. }
  67. func (r *BulkIndexRequest) Version(version int64) *BulkIndexRequest {
  68. r.version = version
  69. return r
  70. }
  71. func (r *BulkIndexRequest) VersionType(versionType string) *BulkIndexRequest {
  72. r.versionType = versionType
  73. return r
  74. }
  75. func (r *BulkIndexRequest) Doc(doc interface{}) *BulkIndexRequest {
  76. r.doc = doc
  77. return r
  78. }
  79. func (r *BulkIndexRequest) String() string {
  80. lines, err := r.Source()
  81. if err == nil {
  82. return strings.Join(lines, "\n")
  83. }
  84. return fmt.Sprintf("error: %v", err)
  85. }
  86. func (r *BulkIndexRequest) Source() ([]string, error) {
  87. // { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
  88. // { "field1" : "value1" }
  89. lines := make([]string, 2)
  90. // "index" ...
  91. command := make(map[string]interface{})
  92. indexCommand := make(map[string]interface{})
  93. if r.index != "" {
  94. indexCommand["_index"] = r.index
  95. }
  96. if r.typ != "" {
  97. indexCommand["_type"] = r.typ
  98. }
  99. if r.id != "" {
  100. indexCommand["_id"] = r.id
  101. }
  102. if r.routing != "" {
  103. indexCommand["_routing"] = r.routing
  104. }
  105. if r.parent != "" {
  106. indexCommand["_parent"] = r.parent
  107. }
  108. if r.timestamp != "" {
  109. indexCommand["_timestamp"] = r.timestamp
  110. }
  111. if r.ttl > 0 {
  112. indexCommand["_ttl"] = r.ttl
  113. }
  114. if r.version > 0 {
  115. indexCommand["_version"] = r.version
  116. }
  117. if r.versionType != "" {
  118. indexCommand["_version_type"] = r.versionType
  119. }
  120. if r.refresh != nil {
  121. indexCommand["refresh"] = *r.refresh
  122. }
  123. command[r.opType] = indexCommand
  124. line, err := json.Marshal(command)
  125. if err != nil {
  126. return nil, err
  127. }
  128. lines[0] = string(line)
  129. // "field1" ...
  130. if r.doc != nil {
  131. switch t := r.doc.(type) {
  132. default:
  133. body, err := json.Marshal(r.doc)
  134. if err != nil {
  135. return nil, err
  136. }
  137. lines[1] = string(body)
  138. case json.RawMessage:
  139. lines[1] = string(t)
  140. case *json.RawMessage:
  141. lines[1] = string(*t)
  142. case string:
  143. lines[1] = t
  144. case *string:
  145. lines[1] = *t
  146. }
  147. } else {
  148. lines[1] = "{}"
  149. }
  150. return lines, nil
  151. }