rz 1 year ago
parent
commit
8ef198e7a0
4 changed files with 106 additions and 15 deletions
  1. 36 1
      README.md
  2. 6 4
      config.yaml
  3. 7 6
      config_n.yaml
  4. 57 4
      main.go

+ 36 - 1
README.md

@@ -1,3 +1,38 @@
 # synces
 # synces
 
 
-同步es,目前采用reindex模式,使用二分法快速查找同步,支持多种同步方式,单向同步、双向同步,定时单向,定时双向。
+同步es,目前采用reindex模式,使用二分法快速查找同步,支持多种同步方式,单向同步、双向同步,定时单向,定时双向。
+
+# 示例sql
+  
+```
+{
+  "query": {
+    "bool": {
+      "filter": [
+        {
+          "range": {
+            "id": {
+              "gt": "6509c7d80000000000000000",
+              "lte": "650fbff80000000000000000"
+            }
+          }
+        },
+        {
+          "bool": {
+            "must_not": [
+              {
+                "terms": {
+                  "toptype": [
+                    "采购意向",
+                    "拟建"
+                  ]
+                }
+              }
+            ]
+          }
+        }
+      ]
+    }
+  }
+}
+```

+ 6 - 4
config.yaml

@@ -5,6 +5,8 @@ ses:
   sync: "http://192.168.0.229:45008"
   sync: "http://192.168.0.229:45008"
   index: "bidding_v1"
   index: "bidding_v1"
   stype: "bidding"
   stype: "bidding"
+  #可以增加sql,限定数据范围
+  #boolsql: '{"bool":{"must_not":[{"terms":{"toptype":["采购意向","拟建"]}}]}}'
   user: "es_all"
   user: "es_all"
   pwd: "TopJkO2E_d1x"
   pwd: "TopJkO2E_d1x"
   size: 3
   size: 3
@@ -13,15 +15,15 @@ des:
   index: "bidding_v1"
   index: "bidding_v1"
   stype: "bidding"
   stype: "bidding"
   #当需要指定同步字段时要使用,比如同步到未登录用户只需要个别字段,同步全部字段请注释此行
   #当需要指定同步字段时要使用,比如同步到未登录用户只需要个别字段,同步全部字段请注释此行
-  fields: ["are","autoid","bidamount","bidstatus","budget","buyer","buyerclass","city","comeintime","createtime","dataweight","detail","district","entidlist","href","id","multipackage","pici","projectcode","publishtime","s_subscopeclass","s_topscopeclass","site","subtype","title","topscopeclass","toptype"]
+  #fields: ["are","autoid","bidamount","bidstatus","budget","buyer","buyerclass","city","comeintime","createtime","dataweight","detail","district","entidlist","href","id","multipackage","pici","projectcode","publishtime","s_subscopeclass","s_topscopeclass","site","subtype","title","topscopeclass","toptype"]
   user: "jybid"
   user: "jybid"
   pwd: "Top2023_JEB01i@31"
   pwd: "Top2023_JEB01i@31"
   size: 3
   size: 3
 
 
 # id支持时间19位格式和id24位格式---------id支持两种模式,不能混合使用
 # id支持时间19位格式和id24位格式---------id支持两种模式,不能混合使用
 #gtid: "650ea35ce17a7c80fbe231ed"
 #gtid: "650ea35ce17a7c80fbe231ed"
-gtid: "2021-09-20 00:10:00"
-lteid: "2023-09-24 12:50:00"
+gtid: "2023-09-24 00:10:00"
+lteid: "2023-09-25 11:50:00"
 #lteid: "650eb13de2d7d34fa0415373"
 #lteid: "650eb13de2d7d34fa0415373"
 
 
 #同步模式mode  1是初始模式源到目标  2双向同步   3是定时源到目标模式  4双向定时
 #同步模式mode  1是初始模式源到目标  2双向同步   3是定时源到目标模式  4双向定时
@@ -29,7 +31,7 @@ mode: 1
 # 数据连续模式 0默认 1最大id  2比较
 # 数据连续模式 0默认 1最大id  2比较
 lastmode: 0
 lastmode: 0
 # 空跑输出差异结果,不同步数据,false时表示不空跑且同步数据
 # 空跑输出差异结果,不同步数据,false时表示不空跑且同步数据
-synctest: false 
+synctest: true 
 # 当为定时模式时,使用sync下的配置,支持多套定时方案,且lastmode使用下面的子配置
 # 当为定时模式时,使用sync下的配置,支持多套定时方案,且lastmode使用下面的子配置
 sync: 
 sync: 
   -
   -

+ 7 - 6
config_n.yaml

@@ -4,22 +4,23 @@ ses:
   stype: "bidding"
   stype: "bidding"
   user: "jybid"
   user: "jybid"
   pwd: "Top2023_JEB01i@31"
   pwd: "Top2023_JEB01i@31"
-  size: 1
+  size: 2
 des:
 des:
   addr: "http://127.0.0.1:19902"
   addr: "http://127.0.0.1:19902"
-  index: "bidding_year1"
+  index: "bidding_temp"
   stype: "bidding"
   stype: "bidding"
   user: "jybid"
   user: "jybid"
   pwd: "Top2023_JEB01i@31"
   pwd: "Top2023_JEB01i@31"
-  fields: ["are","autoid","bidamount","bidstatus","budget","buyer","buyerclass","city","comeintime","createtime","dataweight","detail","district","entidlist","href","id","multipackage","pici","projectcode","publishtime","s_subscopeclass","s_topscopeclass","site","subtype","title","topscopeclass","toptype"]
-  size: 1
+  #fields: ["are","autoid","bidamount","bidstatus","budget","buyer","buyerclass","city","comeintime","createtime","dataweight","detail","district","entidlist","href","id","multipackage","pici","projectcode","publishtime","s_subscopeclass","s_topscopeclass","site","subtype","title","topscopeclass","toptype"]
+  size: 2
 # id支持时间19位格式和id24位格式
 # id支持时间19位格式和id24位格式
-gtid: "2023-09-24 21:00:00"
-lteid: "2023-09-25 22:00:00"
+gtid: "2023-08-25 07:00:00"
+lteid: "2023-09-25 10:00:00"
 # 同步模式mode 1,2是初始模式  3,4定时
 # 同步模式mode 1,2是初始模式  3,4定时
 mode: 1
 mode: 1
 # 数据连续模式 0默认 1最大id  2比较
 # 数据连续模式 0默认 1最大id  2比较
 lastmode: 0
 lastmode: 0
+synctest: true 
 sync: 
 sync: 
   -
   -
     freq: 60
     freq: 60

+ 57 - 4
main.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
 	"log"
 	"log"
+	"os"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
@@ -42,6 +43,7 @@ type (
 		Sync        string
 		Sync        string
 		Index       string
 		Index       string
 		Stype       string
 		Stype       string
+		Boolsql     string
 		Fields      []string
 		Fields      []string
 		User        string
 		User        string
 		Pwd         string
 		Pwd         string
@@ -102,6 +104,17 @@ func main() {
 	// v := (int64Value + int64Value2) / 2
 	// v := (int64Value + int64Value2) / 2
 
 
 	// log.Println(int64Value, int64Value2, fmt.Sprintf("%x", v))
 	// log.Println(int64Value, int64Value2, fmt.Sprintf("%x", v))
+	if len(Cfg.Ses.Boolsql) > 0 {
+		sql := esv7.NewBoolQuery().Filter(esv7.NewRawStringQuery(Cfg.Ses.Boolsql))
+		sqls, err := sql.Source()
+		by, _ := json.Marshal(sqls)
+		log.Println("限定查询范围:", string(by), err)
+	}
+	if len(Cfg.Des.Fields) > 0 {
+		log.Println("限定同步字段", Cfg.Des.Fields)
+	}
+	log.Println("准备执行任务...")
+	time.Sleep(1 * time.Second)
 
 
 	switch Cfg.Mode {
 	switch Cfg.Mode {
 	case 1, 2: //单向取时间段
 	case 1, 2: //单向取时间段
@@ -151,10 +164,30 @@ func main() {
 	time.Sleep(99999 * time.Hour)
 	time.Sleep(99999 * time.Hour)
 }
 }
 
 
+// 获取统计sql
+func GetCountSql(source *els, target *els, sid, eid string) any {
+	if len(source.Boolsql) > 0 {
+		rawsql := esv7.NewRawStringQuery(source.Boolsql)
+		_, err := rawsql.Source()
+		if err == nil {
+			count := esv7.NewBoolQuery()
+			count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid), rawsql)
+			return count
+		} else {
+			log.Println("sql转换出错", err, source.Boolsql)
+			os.Exit(1)
+		}
+	}
+	return fmt.Sprintf(sql_count, sid, eid)
+
+}
+
 // 二分法查找执行任务
 // 二分法查找执行任务
 func BinarySearch(source *els, target *els, sid, eid, key string) {
 func BinarySearch(source *els, target *els, sid, eid, key string) {
-	scount := source.ES.Count(source.Index, source.Stype, fmt.Sprintf(sql_count, sid, eid))
-	dcount := target.ES.Count(target.Index, target.Stype, fmt.Sprintf(sql_count, sid, eid))
+	// scount := source.ES.Count(source.Index, source.Stype, fmt.Sprintf(sql_count, sid, eid))
+	// dcount := target.ES.Count(target.Index, target.Stype, fmt.Sprintf(sql_count, sid, eid))
+	scount := source.ES.Count(source.Index, source.Stype, GetCountSql(source, target, sid, eid))
+	dcount := target.ES.Count(target.Index, target.Stype, GetCountSql(source, target, sid, eid))
 	log.Println("compare:", key, sid, eid, scount, dcount)
 	log.Println("compare:", key, sid, eid, scount, dcount)
 	if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) {
 	if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) {
 		if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
 		if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
@@ -274,8 +307,10 @@ func Reindex(source *els, target *els, sid, eid, key string) {
 	}
 	}
 	conn := target.ES.GetEsConn()
 	conn := target.ES.GetEsConn()
 	defer target.ES.DestoryEsConn(conn)
 	defer target.ES.DestoryEsConn(conn)
+	//创建reindex对象
 	rs := esv7.NewReindexSource()
 	rs := esv7.NewReindexSource()
 	rs.Index(source.Index)
 	rs.Index(source.Index)
+	//不在同一集群时,要增加remote,同时确保目标端能访问这个配置
 	if target.Addr != source.Addr {
 	if target.Addr != source.Addr {
 		ri := esv7.NewReindexRemoteInfo()
 		ri := esv7.NewReindexRemoteInfo()
 		addr := source.Addr
 		addr := source.Addr
@@ -285,11 +320,29 @@ func Reindex(source *els, target *els, sid, eid, key string) {
 		ri.Host(addr).Username(source.User).Password(source.Pwd)
 		ri.Host(addr).Username(source.User).Password(source.Pwd)
 		rs.RemoteInfo(ri)
 		rs.RemoteInfo(ri)
 	}
 	}
-
-	rs.Query(esv7.NewBoolQuery().Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid)))
+	//生成目标端查询sql
+	find := esv7.NewBoolQuery()
+	querys := []esv7.Query{esv7.NewRangeQuery("id").Gt(sid).Lte(eid)}
+	//自定义了查询,增加
+	if len(source.Boolsql) > 0 {
+		rawsql := esv7.NewRawStringQuery(source.Boolsql)
+		_, err := rawsql.Source()
+		if err == nil {
+			querys = append(querys, rawsql)
+		} else {
+			log.Println("sql转换出错", err, source.Boolsql)
+			os.Exit(1)
+		}
+	}
+	find.Filter(querys...)
+	rs.Query(find)
+	// s, _ := rs.Source()
+	// log.Println(s)
+	//限定了查询字段
 	if len(target.Fields) > 0 {
 	if len(target.Fields) > 0 {
 		rs = rs.FetchSourceIncludeExclude(target.Fields, []string{})
 		rs = rs.FetchSourceIncludeExclude(target.Fields, []string{})
 	}
 	}
+
 	ds := esv7.NewReindexDestination()
 	ds := esv7.NewReindexDestination()
 	ds.Index(target.Index)
 	ds.Index(target.Index)
 	reindex := conn.Reindex().Source(rs).Destination(ds).Conflicts("proceed")
 	reindex := conn.Reindex().Source(rs).Destination(ds).Conflicts("proceed")