123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package main
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "time"
- "github.com/olivere/elastic/v7"
- )
- const (
- elasticsearchURL = "http://192.168.3.241:9206"
- watcherID = "watcher_bidding_final_changes"
- targetIndex1 = "bidding_year"
- targetIndex2 = "bidding_free"
- targetIndex3 = "bidding_temp"
- )
- func main() {
- client, err := elastic.NewClient(elastic.SetURL(elasticsearchURL))
- if err != nil {
- log.Fatalf("Error creating Elasticsearch client: %v", err)
- }
- // 设置 Watcher
- watcherBody := fmt.Sprintf(`
- {
- "trigger": {
- "schedule": {
- "interval": "10s"
- }
- },
- "input": {
- "search": {
- "request": {
- "indices": ["bidding_final"],
- "body": {
- "query": {
- "range": {
- "comeintime": {
- "gte": "now-7d/d"
- }
- }
- }
- }
- }
- }
- },
- "condition": {
- "always": {}
- },
- "actions": [
- {
- "transform": {
- "script": {
- "source": "ctx.payload.hits.hits[0]._source"
- }
- },
- "index": {
- "index": "%s",
- "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
- }
- },
- {
- "transform": {
- "script": {
- "source": "ctx.payload.hits.hits[0]._source"
- }
- },
- "index": {
- "index": "%s",
- "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
- }
- },
- {
- "transform": {
- "script": {
- "source": "ctx.payload.hits.hits[0]._source"
- }
- },
- "index": {
- "index": "%s",
- "doc_id": "{{ctx.payload.hits.hits[0]._id}}"
- }
- }
- ]
- }`, targetIndex1, targetIndex2, targetIndex3)
- // 创建 Watcher
- resp, err := http.Post(elasticsearchURL+"/_watcher/watch/"+watcherID,
- "application/json", bytes.NewBuffer([]byte(watcherBody)))
- if err != nil {
- log.Fatalf("Error creating watcher: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- log.Fatalf("Error creating watcher. Status code: %d", resp.StatusCode)
- }
- fmt.Println("Watcher created successfully.")
- // 定时检查 Watcher 的执行结果
- for {
- // 等待一段时间
- time.Sleep(5 * time.Second)
- // 获取 Watcher 的执行状态
- resp, err := http.Get(elasticsearchURL + "/_watcher/watch/" + watcherID + "/_execute")
- if err != nil {
- log.Printf("Error getting watcher execution status: %v", err)
- continue
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- log.Printf("Error getting watcher execution status. Status code: %d", resp.StatusCode)
- continue
- }
- // 解析 Watcher 执行结果
- var result map[string]interface{}
- err = json.NewDecoder(resp.Body).Decode(&result)
- if err != nil {
- log.Printf("Error decoding watcher execution result: %v", err)
- continue
- }
- // 检查触发条件是否满足
- if result["triggered"].(bool) {
- fmt.Printf("Watcher %s executed\n", watcherID)
- // 获取变更的文档
- var doc map[string]interface{}
- err = json.Unmarshal([]byte(result["input"].(map[string]interface{})["search"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})[0].(map[string]interface{})["_source"].(string)), &doc)
- if err != nil {
- log.Printf("Error decoding changed document: %v", err)
- continue
- }
- // 获取文档数量
- docCount := int(result["input"].(map[string]interface{})["search"].(map[string]interface{})["hits"].(map[string]interface{})["total"].(float64))
- // 判断操作类型
- switch {
- case docCount == 0:
- fmt.Println("Document deleted")
- // 执行同步删除操作
- err = syncDeleteDocument(client, targetIndex1, doc)
- if err != nil {
- log.Printf("Error syncing delete to %s: %v", targetIndex1, err)
- }
- err = syncDeleteDocument(client, targetIndex2, doc)
- if err != nil {
- log.Printf("Error syncing delete to %s: %v", targetIndex2, err)
- }
- err = syncDeleteDocument(client, targetIndex3, doc)
- if err != nil {
- log.Printf("Error syncing delete to %s: %v", targetIndex3, err)
- }
- case docCount == 1:
- fmt.Println("Document updated")
- // 执行同步更新操作
- err = syncUpdateDocument(client, targetIndex1, doc)
- if err != nil {
- log.Printf("Error syncing update to %s: %v", targetIndex1, err)
- }
- err = syncUpdateDocument(client, targetIndex2, doc)
- if err != nil {
- log.Printf("Error syncing update to %s: %v", targetIndex2, err)
- }
- err = syncUpdateDocument(client, targetIndex3, doc)
- if err != nil {
- log.Printf("Error syncing update to %s: %v", targetIndex3, err)
- }
- }
- }
- }
- }
- // 同步删除文档到目标索引
- func syncDeleteDocument(client *elastic.Client, targetIndex string, doc map[string]interface{}) error {
- _, err := client.Delete().Index(targetIndex).Id(doc["id"].(string)).Do(context.Background())
- return err
- }
- // 同步更新文档到目标索引
- func syncUpdateDocument(client *elastic.Client, targetIndex string, doc map[string]interface{}) error {
- _, err := client.Index().Index(targetIndex).Id(doc["id"].(string)).BodyJson(doc).Do(context.Background())
- return err
- }
|