main.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "time"
  10. "github.com/olivere/elastic/v7"
  11. )
  12. const (
  13. elasticsearchURL = "http://192.168.3.241:9206"
  14. watcherID = "watcher_bidding_final_changes"
  15. targetIndex1 = "bidding_year"
  16. targetIndex2 = "bidding_free"
  17. targetIndex3 = "bidding_temp"
  18. )
  19. func main() {
  20. client, err := elastic.NewClient(elastic.SetURL(elasticsearchURL))
  21. if err != nil {
  22. log.Fatalf("Error creating Elasticsearch client: %v", err)
  23. }
  24. // 设置 Watcher
  25. watcherBody := fmt.Sprintf(`
  26. {
  27. "trigger": {
  28. "schedule": {
  29. "interval": "10s"
  30. }
  31. },
  32. "input": {
  33. "search": {
  34. "request": {
  35. "indices": ["bidding_final"],
  36. "body": {
  37. "query": {
  38. "range": {
  39. "comeintime": {
  40. "gte": "now-7d/d"
  41. }
  42. }
  43. }
  44. }
  45. }
  46. }
  47. },
  48. "condition": {
  49. "always": {}
  50. },
  51. "actions": [
  52. {
  53. "transform": {
  54. "script": {
  55. "source": "ctx.payload.hits.hits[0]._source"
  56. }
  57. },
  58. "index": {
  59. "index": "%s",
  60. "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
  61. }
  62. },
  63. {
  64. "transform": {
  65. "script": {
  66. "source": "ctx.payload.hits.hits[0]._source"
  67. }
  68. },
  69. "index": {
  70. "index": "%s",
  71. "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
  72. }
  73. },
  74. {
  75. "transform": {
  76. "script": {
  77. "source": "ctx.payload.hits.hits[0]._source"
  78. }
  79. },
  80. "index": {
  81. "index": "%s",
  82. "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
  83. }
  84. }
  85. ]
  86. }`, targetIndex1, targetIndex2, targetIndex3)
  87. // 创建 Watcher
  88. resp, err := http.Post(elasticsearchURL+"/_watcher/watch/"+watcherID,
  89. "application/json", bytes.NewBuffer([]byte(watcherBody)))
  90. if err != nil {
  91. log.Fatalf("Error creating watcher: %v", err)
  92. }
  93. defer resp.Body.Close()
  94. if resp.StatusCode != http.StatusOK {
  95. log.Fatalf("Error creating watcher. Status code: %d", resp.StatusCode)
  96. }
  97. fmt.Println("Watcher created successfully.")
  98. // 定时检查 Watcher 的执行结果
  99. for {
  100. // 等待一段时间
  101. time.Sleep(5 * time.Second)
  102. // 获取 Watcher 的执行状态
  103. resp, err := http.Get(elasticsearchURL + "/_watcher/watch/" + watcherID + "/_execute")
  104. if err != nil {
  105. log.Printf("Error getting watcher execution status: %v", err)
  106. continue
  107. }
  108. defer resp.Body.Close()
  109. if resp.StatusCode != http.StatusOK {
  110. log.Printf("Error getting watcher execution status. Status code: %d", resp.StatusCode)
  111. continue
  112. }
  113. // 解析 Watcher 执行结果
  114. var result map[string]interface{}
  115. err = json.NewDecoder(resp.Body).Decode(&result)
  116. if err != nil {
  117. log.Printf("Error decoding watcher execution result: %v", err)
  118. continue
  119. }
  120. // 检查触发条件是否满足
  121. if result["triggered"].(bool) {
  122. fmt.Printf("Watcher %s executed\n", watcherID)
  123. // 获取变更的文档
  124. var doc map[string]interface{}
  125. err = json.Unmarshal([]byte(result["input"].(map[string]interface{})["search"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})[0].(map[string]interface{})["_source"].(string)), &doc)
  126. if err != nil {
  127. log.Printf("Error decoding changed document: %v", err)
  128. continue
  129. }
  130. // 获取文档数量
  131. docCount := int(result["input"].(map[string]interface{})["search"].(map[string]interface{})["hits"].(map[string]interface{})["total"].(float64))
  132. // 判断操作类型
  133. switch {
  134. case docCount == 0:
  135. fmt.Println("Document deleted")
  136. // 执行同步删除操作
  137. err = syncDeleteDocument(client, targetIndex1, doc)
  138. if err != nil {
  139. log.Printf("Error syncing delete to %s: %v", targetIndex1, err)
  140. }
  141. err = syncDeleteDocument(client, targetIndex2, doc)
  142. if err != nil {
  143. log.Printf("Error syncing delete to %s: %v", targetIndex2, err)
  144. }
  145. err = syncDeleteDocument(client, targetIndex3, doc)
  146. if err != nil {
  147. log.Printf("Error syncing delete to %s: %v", targetIndex3, err)
  148. }
  149. case docCount == 1:
  150. fmt.Println("Document updated")
  151. // 执行同步更新操作
  152. err = syncUpdateDocument(client, targetIndex1, doc)
  153. if err != nil {
  154. log.Printf("Error syncing update to %s: %v", targetIndex1, err)
  155. }
  156. err = syncUpdateDocument(client, targetIndex2, doc)
  157. if err != nil {
  158. log.Printf("Error syncing update to %s: %v", targetIndex2, err)
  159. }
  160. err = syncUpdateDocument(client, targetIndex3, doc)
  161. if err != nil {
  162. log.Printf("Error syncing update to %s: %v", targetIndex3, err)
  163. }
  164. }
  165. }
  166. }
  167. }
  168. // 同步删除文档到目标索引
  169. func syncDeleteDocument(client *elastic.Client, targetIndex string, doc map[string]interface{}) error {
  170. _, err := client.Delete().Index(targetIndex).Id(doc["id"].(string)).Do(context.Background())
  171. return err
  172. }
  173. // 同步更新文档到目标索引
  174. func syncUpdateDocument(client *elastic.Client, targetIndex string, doc map[string]interface{}) error {
  175. _, err := client.Index().Index(targetIndex).Id(doc["id"].(string)).BodyJson(doc).Do(context.Background())
  176. return err
  177. }