bulk_update_request.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2012-2015 Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "strings"
  9. )
  10. // Bulk request to update document in Elasticsearch.
  11. type BulkUpdateRequest struct {
  12. BulkableRequest
  13. index string
  14. typ string
  15. id string
  16. routing string
  17. parent string
  18. script string
  19. scriptType string
  20. scriptLang string
  21. scriptParams map[string]interface{}
  22. version int64 // default is MATCH_ANY
  23. versionType string // default is "internal"
  24. retryOnConflict *int
  25. refresh *bool
  26. upsert interface{}
  27. docAsUpsert *bool
  28. doc interface{}
  29. ttl int64
  30. timestamp string
  31. }
  32. func NewBulkUpdateRequest() *BulkUpdateRequest {
  33. return &BulkUpdateRequest{}
  34. }
  35. func (r *BulkUpdateRequest) Index(index string) *BulkUpdateRequest {
  36. r.index = index
  37. return r
  38. }
  39. func (r *BulkUpdateRequest) Type(typ string) *BulkUpdateRequest {
  40. r.typ = typ
  41. return r
  42. }
  43. func (r *BulkUpdateRequest) Id(id string) *BulkUpdateRequest {
  44. r.id = id
  45. return r
  46. }
  47. func (r *BulkUpdateRequest) Routing(routing string) *BulkUpdateRequest {
  48. r.routing = routing
  49. return r
  50. }
  51. func (r *BulkUpdateRequest) Parent(parent string) *BulkUpdateRequest {
  52. r.parent = parent
  53. return r
  54. }
  55. func (r *BulkUpdateRequest) Script(script string) *BulkUpdateRequest {
  56. r.script = script
  57. return r
  58. }
  59. func (r *BulkUpdateRequest) ScriptType(scriptType string) *BulkUpdateRequest {
  60. r.scriptType = scriptType
  61. return r
  62. }
  63. func (r *BulkUpdateRequest) ScriptLang(scriptLang string) *BulkUpdateRequest {
  64. r.scriptLang = scriptLang
  65. return r
  66. }
  67. func (r *BulkUpdateRequest) ScriptParams(params map[string]interface{}) *BulkUpdateRequest {
  68. r.scriptParams = params
  69. return r
  70. }
  71. func (r *BulkUpdateRequest) RetryOnConflict(retryOnConflict int) *BulkUpdateRequest {
  72. r.retryOnConflict = &retryOnConflict
  73. return r
  74. }
  75. func (r *BulkUpdateRequest) Version(version int64) *BulkUpdateRequest {
  76. r.version = version
  77. return r
  78. }
  79. // VersionType can be "internal" (default), "external", "external_gte",
  80. // "external_gt", or "force".
  81. func (r *BulkUpdateRequest) VersionType(versionType string) *BulkUpdateRequest {
  82. r.versionType = versionType
  83. return r
  84. }
  85. func (r *BulkUpdateRequest) Refresh(refresh bool) *BulkUpdateRequest {
  86. r.refresh = &refresh
  87. return r
  88. }
  89. func (r *BulkUpdateRequest) Doc(doc interface{}) *BulkUpdateRequest {
  90. r.doc = doc
  91. return r
  92. }
  93. func (r *BulkUpdateRequest) DocAsUpsert(docAsUpsert bool) *BulkUpdateRequest {
  94. r.docAsUpsert = &docAsUpsert
  95. return r
  96. }
  97. func (r *BulkUpdateRequest) Upsert(doc interface{}) *BulkUpdateRequest {
  98. r.upsert = doc
  99. return r
  100. }
  101. func (r *BulkUpdateRequest) Ttl(ttl int64) *BulkUpdateRequest {
  102. r.ttl = ttl
  103. return r
  104. }
  105. func (r *BulkUpdateRequest) Timestamp(timestamp string) *BulkUpdateRequest {
  106. r.timestamp = timestamp
  107. return r
  108. }
  109. func (r *BulkUpdateRequest) String() string {
  110. lines, err := r.Source()
  111. if err == nil {
  112. return strings.Join(lines, "\n")
  113. }
  114. return fmt.Sprintf("error: %v", err)
  115. }
  116. func (r *BulkUpdateRequest) getSourceAsString(data interface{}) (string, error) {
  117. switch t := data.(type) {
  118. default:
  119. body, err := json.Marshal(data)
  120. if err != nil {
  121. return "", err
  122. }
  123. return string(body), nil
  124. case json.RawMessage:
  125. return string(t), nil
  126. case *json.RawMessage:
  127. return string(*t), nil
  128. case string:
  129. return t, nil
  130. case *string:
  131. return *t, nil
  132. }
  133. }
  134. func (r BulkUpdateRequest) Source() ([]string, error) {
  135. // { "update" : { "_index" : "test", "_type" : "type1", "_id" : "1", ... } }
  136. // { "doc" : { "field1" : "value1", ... } }
  137. // or
  138. // { "update" : { "_index" : "test", "_type" : "type1", "_id" : "1", ... } }
  139. // { "script" : { ... } }
  140. lines := make([]string, 2)
  141. // "update" ...
  142. command := make(map[string]interface{})
  143. updateCommand := make(map[string]interface{})
  144. if r.index != "" {
  145. updateCommand["_index"] = r.index
  146. }
  147. if r.typ != "" {
  148. updateCommand["_type"] = r.typ
  149. }
  150. if r.id != "" {
  151. updateCommand["_id"] = r.id
  152. }
  153. if r.routing != "" {
  154. updateCommand["_routing"] = r.routing
  155. }
  156. if r.parent != "" {
  157. updateCommand["_parent"] = r.parent
  158. }
  159. if r.timestamp != "" {
  160. updateCommand["_timestamp"] = r.timestamp
  161. }
  162. if r.ttl > 0 {
  163. updateCommand["_ttl"] = r.ttl
  164. }
  165. if r.version > 0 {
  166. updateCommand["_version"] = r.version
  167. }
  168. if r.versionType != "" {
  169. updateCommand["_version_type"] = r.versionType
  170. }
  171. if r.refresh != nil {
  172. updateCommand["refresh"] = *r.refresh
  173. }
  174. if r.retryOnConflict != nil {
  175. updateCommand["_retry_on_conflict"] = *r.retryOnConflict
  176. }
  177. if r.upsert != nil {
  178. updateCommand["upsert"] = r.upsert
  179. }
  180. command["update"] = updateCommand
  181. line, err := json.Marshal(command)
  182. if err != nil {
  183. return nil, err
  184. }
  185. lines[0] = string(line)
  186. // 2nd line: {"doc" : { ... }} or {"script": {...}}
  187. source := make(map[string]interface{})
  188. if r.docAsUpsert != nil {
  189. source["doc_as_upsert"] = *r.docAsUpsert
  190. }
  191. if r.doc != nil {
  192. // {"doc":{...}}
  193. source["doc"] = r.doc
  194. } else if r.script != "" {
  195. // {"script":...}
  196. source["script"] = r.script
  197. if r.scriptLang != "" {
  198. source["lang"] = r.scriptLang
  199. }
  200. /*
  201. if r.scriptType != "" {
  202. source["script_type"] = r.scriptType
  203. }
  204. */
  205. if r.scriptParams != nil && len(r.scriptParams) > 0 {
  206. source["params"] = r.scriptParams
  207. }
  208. }
  209. lines[1], err = r.getSourceAsString(source)
  210. if err != nil {
  211. return nil, err
  212. }
  213. return lines, nil
  214. }