|
@@ -15,6 +15,7 @@ import (
|
|
|
|
|
|
"app.yhyue.com/moapp/jybase/es"
|
|
|
"github.com/dgraph-io/badger/v4"
|
|
|
+ "github.com/gogf/gf/v2/util/gconv"
|
|
|
esv7 "github.com/olivere/elastic/v7"
|
|
|
"gopkg.in/yaml.v3"
|
|
|
)
|
|
@@ -36,7 +37,12 @@ type (
|
|
|
Freq int64
|
|
|
Before int64
|
|
|
Scope int64
|
|
|
- Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
|
|
|
+ Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
|
|
|
+ Force bool //强制同步,不对比
|
|
|
+ }
|
|
|
+ sypici struct { //pici模式时需要传递时间对象
|
|
|
+ Gt int64
|
|
|
+ Lte int64
|
|
|
}
|
|
|
els struct {
|
|
|
Addr string
|
|
@@ -59,8 +65,10 @@ var (
|
|
|
//统计sql
|
|
|
sql_count = `{"query":{"range":{"id":{"gt":"%s","lte":"%s"}}}}`
|
|
|
//取最大id sql
|
|
|
- sql_last = `{"query":{"match_all":{}},"sort":[{"id":"desc"}],"_source":["id"],"size":1}`
|
|
|
- layout = "2006-01-02 15:04:05"
|
|
|
+ sql_last = `{"query":{"match_all":{}},"sort":[{"id":"desc"}],"_source":["id"],"size":1}`
|
|
|
+ sql_picilast = `{"query":{"match_all":{}},"sort":[{"pici":"desc"}],"_source":["pici"],"size":1}`
|
|
|
+ sql_piciid = `{"query": {"range": {"pici":{"gt":%d,"lte":%d}}},"sort": [{"id": "asc"}],"_source": ["id"],"size": 1}`
|
|
|
+ layout = "2006-01-02 15:04:05"
|
|
|
//本在数据库
|
|
|
db = &bdb{DB()}
|
|
|
conf = "./config.yaml"
|
|
@@ -120,15 +128,15 @@ func main() {
|
|
|
case 1, 2: //单向取时间段
|
|
|
func() {
|
|
|
go GetTask(Cfg.Des, "atask")
|
|
|
- gid, eid := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Des, Cfg.Lastmode)
|
|
|
- BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask")
|
|
|
+ gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Des, Cfg.Ses, Cfg.Lastmode)
|
|
|
+ BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", Cfg.Lastmode, piciobj)
|
|
|
log.Println("atask", "over...")
|
|
|
}()
|
|
|
if Cfg.Mode == 2 {
|
|
|
func() {
|
|
|
go GetTask(Cfg.Ses, "btask")
|
|
|
- gid, eid := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Ses, Cfg.Lastmode)
|
|
|
- BinarySearch(Cfg.Des, Cfg.Ses, gid, eid, "btask")
|
|
|
+ gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Ses, Cfg.Des, Cfg.Lastmode)
|
|
|
+ BinarySearch(Cfg.Des, Cfg.Ses, gid, eid, "btask", Cfg.Lastmode, piciobj)
|
|
|
log.Println("btask", "over...")
|
|
|
}()
|
|
|
}
|
|
@@ -146,13 +154,25 @@ func main() {
|
|
|
no := now - obj.Before //从几分钟前开始
|
|
|
gtid := TimestampToId(no - obj.Scope)
|
|
|
lteid := TimestampToId(no)
|
|
|
- gid, eid := GetIds(gtid, lteid, Cfg.Des, obj.Lastmode)
|
|
|
- BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask")
|
|
|
+ gid, eid, piciobj := GetIds(gtid, lteid, Cfg.Des, Cfg.Ses, obj.Lastmode)
|
|
|
+ if obj.Force {
|
|
|
+ if !Cfg.Synctest {
|
|
|
+ Reindex(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
|
|
|
+ }
|
|
|
log.Println("atask", "over...", "freq", obj.Freq)
|
|
|
//-----------------
|
|
|
if Cfg.Mode == 4 {
|
|
|
- gid1, eid1 := GetIds(gtid, lteid, Cfg.Ses, obj.Lastmode)
|
|
|
- BinarySearch(Cfg.Des, Cfg.Ses, gid1, eid1, "atask")
|
|
|
+ gid1, eid1, piciobj := GetIds(gtid, lteid, Cfg.Ses, Cfg.Des, obj.Lastmode)
|
|
|
+ if obj.Force {
|
|
|
+ if !Cfg.Synctest {
|
|
|
+ Reindex(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ BinarySearch(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
|
|
|
+ }
|
|
|
log.Println("btask", "over...", "freq", obj.Freq)
|
|
|
}
|
|
|
last := nt - time.Now().Unix()
|
|
@@ -161,63 +181,77 @@ func main() {
|
|
|
}
|
|
|
}
|
|
|
}(obj)
|
|
|
+ time.Sleep(25 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
}
|
|
|
|
|
|
// 获取统计sql
|
|
|
-func GetCountSql(source *els, target *els, sid, eid string) any {
|
|
|
+func GetCountSql(source *els, target *els, sid, eid string, lastmode int, piciobj *sypici) any {
|
|
|
+ count := esv7.NewBoolQuery()
|
|
|
+ if lastmode >= 3 { //使用pici
|
|
|
+ count.Filter(esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
|
|
|
+ } else {
|
|
|
+ count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
|
|
|
+ }
|
|
|
if len(source.Boolsql) > 0 {
|
|
|
- rawsql := esv7.NewRawStringQuery(source.Boolsql)
|
|
|
- _, err := rawsql.Source()
|
|
|
- if err == nil {
|
|
|
- count := esv7.NewBoolQuery()
|
|
|
- count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid), rawsql)
|
|
|
- return count
|
|
|
- } else {
|
|
|
- log.Println("sql转换出错", err, source.Boolsql)
|
|
|
- os.Exit(1)
|
|
|
- }
|
|
|
+ count.Filter(esv7.NewRawStringQuery(source.Boolsql))
|
|
|
}
|
|
|
- return fmt.Sprintf(sql_count, sid, eid)
|
|
|
+ PrintSql(count, "count")
|
|
|
+ return count
|
|
|
+}
|
|
|
|
|
|
+func PrintSql(query esv7.Query, note string) {
|
|
|
+ v, _ := query.Source()
|
|
|
+ bs, _ := json.Marshal(v)
|
|
|
+ log.Println("query:", note, string(bs))
|
|
|
}
|
|
|
|
|
|
// 二分法查找执行任务
|
|
|
-func BinarySearch(source *els, target *els, sid, eid, key string) {
|
|
|
- // scount := source.ES.Count(source.Index, source.Stype, fmt.Sprintf(sql_count, sid, eid))
|
|
|
- // dcount := target.ES.Count(target.Index, target.Stype, fmt.Sprintf(sql_count, sid, eid))
|
|
|
- scount := source.ES.Count(source.Index, source.Stype, GetCountSql(source, target, sid, eid))
|
|
|
- dcount := target.ES.Count(target.Index, target.Stype, GetCountSql(source, target, sid, eid))
|
|
|
- log.Println("compare:", key, sid, eid, scount, dcount)
|
|
|
+func BinarySearch(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
|
|
|
+ countSql := GetCountSql(source, target, sid, eid, lastmode, piciobj)
|
|
|
+ scount := source.ES.Count(source.Index, source.Stype, countSql)
|
|
|
+ dcount := target.ES.Count(target.Index, target.Stype, countSql)
|
|
|
+ log.Println("compare:", lastmode, key, sid, eid, scount, dcount)
|
|
|
if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) {
|
|
|
- if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
|
|
|
- return
|
|
|
- }
|
|
|
- //满足条件则开启同步
|
|
|
- isid, _ := strconv.ParseInt(sid[:8], 16, 64)
|
|
|
- ieid, _ := strconv.ParseInt(eid[:8], 16, 64)
|
|
|
- mid := TimestampToId((isid + ieid) / 2)
|
|
|
- //6509dbd90000000000000000 6509dbda0000000000000000
|
|
|
- if dcount == 0 || isid == ieid || sid == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
|
|
|
- log.Println("sync", !Cfg.Synctest, key, sid, eid, scount, dcount)
|
|
|
- if !Cfg.Synctest {
|
|
|
- Reindex(source, target, sid, eid, key)
|
|
|
+ if lastmode > 2 { //pici模式
|
|
|
+ if dcount > scount { //目标集群比源集群量多,不继续
|
|
|
+ return
|
|
|
+ }
|
|
|
+ mid := (piciobj.Gt + piciobj.Lte) / 2
|
|
|
+ if dcount == 0 || piciobj.Gt == piciobj.Lte || piciobj.Gt == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
|
|
|
+ log.Println("sync-pici", !Cfg.Synctest, key, piciobj.Gt, piciobj.Lte, mid, scount, dcount)
|
|
|
+ if !Cfg.Synctest {
|
|
|
+ Reindex(source, target, sid, eid, key, lastmode, piciobj)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ BinarySearch(source, target, sid, "", key, lastmode, &sypici{piciobj.Gt, mid})
|
|
|
+ BinarySearch(source, target, "", eid, key, lastmode, &sypici{mid, piciobj.Lte})
|
|
|
}
|
|
|
+
|
|
|
} else {
|
|
|
- BinarySearch(source, target, sid, mid, key)
|
|
|
- BinarySearch(source, target, mid, eid, key)
|
|
|
+ if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //满足条件则开启同步
|
|
|
+ isid, _ := strconv.ParseInt(sid[:8], 16, 64)
|
|
|
+ ieid, _ := strconv.ParseInt(eid[:8], 16, 64)
|
|
|
+ mid := TimestampToId((isid + ieid) / 2)
|
|
|
+ //6509dbd90000000000000000 6509dbda0000000000000000
|
|
|
+ if dcount == 0 || isid == ieid || sid == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
|
|
|
+ log.Println("sync", !Cfg.Synctest, key, sid, eid, scount, dcount)
|
|
|
+ if !Cfg.Synctest {
|
|
|
+ Reindex(source, target, sid, eid, key, lastmode, piciobj)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ BinarySearch(source, target, sid, mid, key, lastmode, piciobj)
|
|
|
+ BinarySearch(source, target, mid, eid, key, lastmode, piciobj)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// var (
|
|
|
-// taskLock = &sync.Mutex{}
|
|
|
-// //key = "tasks"
|
|
|
-// currentTask = 0
|
|
|
-// )
|
|
|
-
|
|
|
// 定时获取任务列表并清除队列,只为修改currentTask值
|
|
|
func GetTask(target *els, key string) {
|
|
|
conn := target.ES.GetEsConn()
|
|
@@ -259,8 +293,8 @@ func GetTask(target *els, key string) {
|
|
|
}
|
|
|
|
|
|
// 获取同步的id范围
|
|
|
-func GetIds(Gtid, Lteid string, target *els, mode int) (gid, eid string) {
|
|
|
- switch mode {
|
|
|
+func GetIds(Gtid, Lteid string, target *els, source *els, lastmode int) (gid, eid string, piciobj *sypici) {
|
|
|
+ switch lastmode {
|
|
|
case 0:
|
|
|
gid = Gtid
|
|
|
eid = Lteid
|
|
@@ -281,10 +315,60 @@ func GetIds(Gtid, Lteid string, target *els, mode int) (gid, eid string) {
|
|
|
if eid < gid {
|
|
|
eid = gid
|
|
|
}
|
|
|
+ //pici模式,只需要pici即可,只有定时模式适用!!!---删除---取源端的最大id和最小id
|
|
|
+ case 3, 4:
|
|
|
+ // lastmode > 2 {
|
|
|
+ piciMin := int64(0)
|
|
|
+ piciMax, _ := strconv.ParseInt(Lteid[0:8], 16, 64)
|
|
|
+ if lastmode == 3 { //取target的最大pici
|
|
|
+ piciMin = GetPiciLast(target)
|
|
|
+ } else if lastmode == 4 { //取传递的最小pici
|
|
|
+ piciMin, _ = strconv.ParseInt(Gtid[0:8], 16, 64)
|
|
|
+ }
|
|
|
+ piciobj = &sypici{piciMin, piciMax}
|
|
|
+ // } else {
|
|
|
+ // gid = Gtid
|
|
|
+ // // //到源端去查找最小id,这个id有可能也没同步,所有用gt会漏掉这一条数据
|
|
|
+ // res := source.ES.Get(source.Index, source.Stype, fmt.Sprintf(sql_piciid, piciMin, piciMax))
|
|
|
+ // if res != nil && len(*res) == 1 {
|
|
|
+ // gid, _ = (*res)[0]["id"].(string)
|
|
|
+ // //id减1
|
|
|
+ // gid = IdSub1(gid)
|
|
|
+ // }
|
|
|
+ // //最近10天内
|
|
|
+ // //gid = TimestampToId(piciMin - 864000)
|
|
|
+ // //此处如果还用原来的lteid,在同一个时间戳内有数据会被漏掉,所以要用大的id段,最当前时间的id段
|
|
|
+ // // eid = Lteid
|
|
|
+ // eid = TimestampToId(time.Now().Unix())
|
|
|
+ // }
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+// id减1
|
|
|
+func IdSub1(id1 string) string {
|
|
|
+ str := ""
|
|
|
+ bf := false
|
|
|
+ for k := len(id1) - 1; k >= 0; k-- {
|
|
|
+ v := id1[k]
|
|
|
+ if !bf {
|
|
|
+ n, _ := strconv.ParseInt(string(v), 16, 64)
|
|
|
+ if n > 0 {
|
|
|
+ n--
|
|
|
+ str = fmt.Sprintf("%x", n) + str
|
|
|
+ bf = true
|
|
|
+ } else {
|
|
|
+ str = "9" + str
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //str = string(v) + str
|
|
|
+ str = string(id1[:k+1]) + str
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return str
|
|
|
+}
|
|
|
+
|
|
|
// 获取最大id,无数据时返回默认的一个id
|
|
|
func GetLastId(target *els) (id string) {
|
|
|
res := target.ES.Get(target.Index, target.Stype, sql_last)
|
|
@@ -296,11 +380,23 @@ func GetLastId(target *els) (id string) {
|
|
|
}
|
|
|
log.Println("----id---", id)
|
|
|
return
|
|
|
+}
|
|
|
|
|
|
+// 默认取3天前
|
|
|
+func GetPiciLast(target *els) (pici int64) {
|
|
|
+ res := target.ES.Get(target.Index, target.Stype, sql_picilast)
|
|
|
+ if res != nil && len(*res) == 1 {
|
|
|
+ pici = gconv.Int64((*res)[0]["pici"])
|
|
|
+ }
|
|
|
+ if pici == 0 {
|
|
|
+ pici = time.Now().Unix() - 3*86400
|
|
|
+ }
|
|
|
+ log.Println("----pici---", pici)
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
// 使用redindex模式
|
|
|
-func Reindex(source *els, target *els, sid, eid, key string) {
|
|
|
+func Reindex(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
|
|
|
for {
|
|
|
if target.currentTask < 5 {
|
|
|
break
|
|
@@ -324,7 +420,10 @@ func Reindex(source *els, target *els, sid, eid, key string) {
|
|
|
}
|
|
|
//生成目标端查询sql
|
|
|
find := esv7.NewBoolQuery()
|
|
|
- querys := []esv7.Query{esv7.NewRangeQuery("id").Gt(sid).Lte(eid)}
|
|
|
+ querys := []esv7.Query{}
|
|
|
+ if !(lastmode > 2 && sid == "" && eid == "") { //当是pici模式时,不再传递id段,只适用于定时模式
|
|
|
+ querys = append(querys, esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
|
|
|
+ }
|
|
|
//自定义了查询,增加
|
|
|
if len(source.Boolsql) > 0 {
|
|
|
rawsql := esv7.NewRawStringQuery(source.Boolsql)
|
|
@@ -336,10 +435,12 @@ func Reindex(source *els, target *els, sid, eid, key string) {
|
|
|
os.Exit(1)
|
|
|
}
|
|
|
}
|
|
|
+ if lastmode >= 3 && piciobj != nil {
|
|
|
+ querys = append(querys, esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
|
|
|
+ }
|
|
|
find.Filter(querys...)
|
|
|
rs.Query(find)
|
|
|
- // s, _ := rs.Source()
|
|
|
- // log.Println(s)
|
|
|
+ PrintSql(rs, "reindex")
|
|
|
//限定了查询字段
|
|
|
if len(target.Fields) > 0 {
|
|
|
rs = rs.FetchSourceIncludeExclude(target.Fields, []string{})
|