123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- package main
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "log"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "app.yhyue.com/moapp/jybase/es"
- "github.com/dgraph-io/badger/v4"
- "github.com/gogf/gf/v2/util/gconv"
- esv7 "github.com/olivere/elastic/v7"
- "gopkg.in/yaml.v3"
- )
- //命令默认是config.yaml,可以传递 -f 指定配置文件
- type (
- cfg struct {
- Ses *els
- Des *els
- Gtid string
- Lteid string
- Mode int //同步模式
- Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
- Sync []*syobj
- Synctest bool //是否空跑
- }
- syobj struct {
- Freq int64
- Before int64
- Scope int64
- Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
- Force bool //强制同步,不对比
- }
- sypici struct { //pici模式时需要传递时间对象
- Gt int64
- Lte int64
- }
- els struct {
- Addr string
- Sync string
- Index string
- Stype string
- Boolsql string
- Fields []string
- User string
- Pwd string
- Size int
- ES *es.EsV7
- currentTask int
- taskLock sync.Mutex
- }
- )
- var (
- Cfg = &cfg{}
- //统计sql
- sql_count = `{"query":{"range":{"id":{"gt":"%s","lte":"%s"}}}}`
- //取最大id sql
- sql_last = `{"query":{"match_all":{}},"sort":[{"id":"desc"}],"_source":["id"],"size":1}`
- sql_picilast = `{"query":{"match_all":{}},"sort":[{"pici":"desc"}],"_source":["pici"],"size":1}`
- sql_piciid = `{"query": {"range": {"pici":{"gt":%d,"lte":%d}}},"sort": [{"id": "asc"}],"_source": ["id"],"size": 1}`
- layout = "2006-01-02 15:04:05"
- //本在数据库
- db = &bdb{DB()}
- conf = "./config.yaml"
- )
- // 加载配置文件
- func LoadConf(file string, cfg any) {
- bs, err := ioutil.ReadFile(file)
- if err != nil {
- log.Fatal("loadcfg", err.Error())
- }
- err = yaml.Unmarshal(bs, cfg)
- if err != nil {
- log.Fatal("loadcfg json", err.Error())
- }
- }
- func inits() {
- LoadConf(conf, &Cfg)
- if len(Cfg.Gtid) == 19 {
- Cfg.Gtid = Parse2id(Cfg.Gtid)
- }
- if len(Cfg.Lteid) == 19 {
- Cfg.Lteid = Parse2id(Cfg.Lteid)
- }
- Cfg.Ses.ES = es.NewEs("v7", Cfg.Ses.Addr, Cfg.Ses.Size, Cfg.Ses.User, Cfg.Ses.Pwd).(*es.EsV7)
- Cfg.Des.ES = es.NewEs("v7", Cfg.Des.Addr, Cfg.Des.Size, Cfg.Des.User, Cfg.Des.Pwd).(*es.EsV7)
- }
- func main() {
- flag.StringVar(&conf, "f", "./config.yaml", "配置文件")
- flag.Parse()
- inits()
- // 6509c5800000000000000000 650ak6g000000000000000zv 95790 9562
- // 6509c5800000000000000000 650ak6g00000000000000001 95790 9562
- // 6509c5800000000000000000 650ak6g00000000000000000 95790 9562
- // log.Println(GetMid("6509c5800000000000000000", "650ak6g000000000000000zv"))
- // log.Println(GetMid("6509c5800000000000000000", "650ak6g00000000000000000"))
- // int64Value, _ := strconv.ParseInt("6509c580", 16, 64)
- // int64Value2, _ := strconv.ParseInt("6509c580", 16, 64)
- // v := (int64Value + int64Value2) / 2
- // log.Println(int64Value, int64Value2, fmt.Sprintf("%x", v))
- if len(Cfg.Ses.Boolsql) > 0 {
- sql := esv7.NewBoolQuery().Filter(esv7.NewRawStringQuery(Cfg.Ses.Boolsql))
- sqls, err := sql.Source()
- by, _ := json.Marshal(sqls)
- log.Println("限定查询范围:", string(by), err)
- }
- if len(Cfg.Des.Fields) > 0 {
- log.Println("限定同步字段", Cfg.Des.Fields)
- }
- log.Println("准备执行任务...")
- time.Sleep(1 * time.Second)
- switch Cfg.Mode {
- case 1, 2: //单向取时间段
- func() {
- go GetTask(Cfg.Des, "atask")
- gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Des, Cfg.Ses, Cfg.Lastmode)
- BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", Cfg.Lastmode, piciobj)
- log.Println("atask", "over...")
- }()
- if Cfg.Mode == 2 {
- func() {
- go GetTask(Cfg.Ses, "btask")
- gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Ses, Cfg.Des, Cfg.Lastmode)
- BinarySearch(Cfg.Des, Cfg.Ses, gid, eid, "btask", Cfg.Lastmode, piciobj)
- log.Println("btask", "over...")
- }()
- }
- case 3, 4: //单向,定时同步多少秒的数据
- go GetTask(Cfg.Des, "atask")
- if Cfg.Mode == 4 {
- go GetTask(Cfg.Ses, "btask")
- }
- for _, obj := range Cfg.Sync {
- go func(obj *syobj) {
- for {
- now := time.Now().Unix()
- //下次执行时间
- nt := now + obj.Freq
- no := now - obj.Before //从几分钟前开始
- gtid := TimestampToId(no - obj.Scope)
- lteid := TimestampToId(no)
- gid, eid, piciobj := GetIds(gtid, lteid, Cfg.Des, Cfg.Ses, obj.Lastmode)
- if obj.Force {
- if !Cfg.Synctest {
- Reindex(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
- }
- } else {
- BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
- }
- log.Println("atask", "over...", "freq", obj.Freq)
- //-----------------
- if Cfg.Mode == 4 {
- gid1, eid1, piciobj := GetIds(gtid, lteid, Cfg.Ses, Cfg.Des, obj.Lastmode)
- if obj.Force {
- if !Cfg.Synctest {
- Reindex(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
- }
- } else {
- BinarySearch(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
- }
- log.Println("btask", "over...", "freq", obj.Freq)
- }
- last := nt - time.Now().Unix()
- if last > 0 {
- time.Sleep(time.Second * time.Duration(last))
- }
- }
- }(obj)
- time.Sleep(25 * time.Second)
- }
- }
- time.Sleep(99999 * time.Hour)
- }
- // 获取统计sql
- func GetCountSql(source *els, target *els, sid, eid string, lastmode int, piciobj *sypici) any {
- count := esv7.NewBoolQuery()
- if lastmode >= 3 { //使用pici
- count.Filter(esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
- } else {
- count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
- }
- if len(source.Boolsql) > 0 {
- count.Filter(esv7.NewRawStringQuery(source.Boolsql))
- }
- PrintSql(count, "count")
- return count
- }
- func PrintSql(query esv7.Query, note string) {
- v, _ := query.Source()
- bs, _ := json.Marshal(v)
- log.Println("query:", note, string(bs))
- }
- // 二分法查找执行任务
- func BinarySearch(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
- countSql := GetCountSql(source, target, sid, eid, lastmode, piciobj)
- scount := source.ES.Count(source.Index, source.Stype, countSql)
- dcount := target.ES.Count(target.Index, target.Stype, countSql)
- log.Println("compare:", lastmode, key, sid, eid, scount, dcount)
- if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) {
- if lastmode > 2 { //pici模式
- if dcount > scount { //目标集群比源集群量多,不继续
- return
- }
- mid := (piciobj.Gt + piciobj.Lte) / 2
- if dcount == 0 || piciobj.Gt == piciobj.Lte || piciobj.Gt == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
- log.Println("sync-pici", !Cfg.Synctest, key, piciobj.Gt, piciobj.Lte, mid, scount, dcount)
- if !Cfg.Synctest {
- Reindex(source, target, sid, eid, key, lastmode, piciobj)
- }
- } else {
- BinarySearch(source, target, sid, "", key, lastmode, &sypici{piciobj.Gt, mid})
- BinarySearch(source, target, "", eid, key, lastmode, &sypici{mid, piciobj.Lte})
- }
- } else {
- if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
- return
- }
- //满足条件则开启同步
- isid, _ := strconv.ParseInt(sid[:8], 16, 64)
- ieid, _ := strconv.ParseInt(eid[:8], 16, 64)
- mid := TimestampToId((isid + ieid) / 2)
- //6509dbd90000000000000000 6509dbda0000000000000000
- if dcount == 0 || isid == ieid || sid == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
- log.Println("sync", !Cfg.Synctest, key, sid, eid, scount, dcount)
- if !Cfg.Synctest {
- Reindex(source, target, sid, eid, key, lastmode, piciobj)
- }
- } else {
- BinarySearch(source, target, sid, mid, key, lastmode, piciobj)
- BinarySearch(source, target, mid, eid, key, lastmode, piciobj)
- }
- }
- }
- }
- // 定时获取任务列表并清除队列,只为修改currentTask值
- func GetTask(target *els, key string) {
- conn := target.ES.GetEsConn()
- defer target.ES.DestoryEsConn(conn)
- tasks := conn.TasksList()
- tr, _ := tasks.Do(context.Background())
- target.taskLock.Lock()
- v := db.Get(key)
- if tr != nil && len(v) > 0 {
- var vm map[string]interface{}
- json.Unmarshal([]byte(v), &vm)
- if vm != nil {
- var newmap = map[string]bool{}
- for k1, _ := range vm {
- vs := strings.Split(k1, ":")
- if len(vs) == 2 {
- tn := tr.Nodes[vs[0]]
- if tn != nil {
- ti := tn.Tasks[k1]
- if ti != nil {
- newmap[k1] = true
- }
- }
- }
- }
- target.currentTask = len(newmap)
- by, _ := json.Marshal(newmap)
- if target.currentTask > 0 {
- log.Println(key, vm, target.currentTask)
- }
- db.Set(key, string(by))
- }
- }
- target.taskLock.Unlock()
- //WQTx3SgKTiy_kR0Zk25hzg WQTx3SgKTiy_kR0Zk25hzg:1176185 1176185 WQTx3SgKTiy_kR0Zk25hzg
- time.AfterFunc(time.Second*1, func() {
- GetTask(target, key)
- })
- }
- // 获取同步的id范围
- func GetIds(Gtid, Lteid string, target *els, source *els, lastmode int) (gid, eid string, piciobj *sypici) {
- switch lastmode {
- case 0:
- gid = Gtid
- eid = Lteid
- case 1:
- gid = GetLastId(target)
- eid = Lteid
- if eid < gid {
- eid = gid
- }
- case 2:
- id := GetLastId(target)
- if id < Gtid {
- gid = id
- } else {
- gid = Gtid
- }
- eid = Lteid
- if eid < gid {
- eid = gid
- }
- //pici模式,只需要pici即可,只有定时模式适用!!!---删除---取源端的最大id和最小id
- case 3, 4:
- // lastmode > 2 {
- piciMin := int64(0)
- piciMax, _ := strconv.ParseInt(Lteid[0:8], 16, 64)
- if lastmode == 3 { //取target的最大pici
- piciMin = GetPiciLast(target)
- } else if lastmode == 4 { //取传递的最小pici
- piciMin, _ = strconv.ParseInt(Gtid[0:8], 16, 64)
- }
- piciobj = &sypici{piciMin, piciMax}
- // } else {
- // gid = Gtid
- // // //到源端去查找最小id,这个id有可能也没同步,所有用gt会漏掉这一条数据
- // res := source.ES.Get(source.Index, source.Stype, fmt.Sprintf(sql_piciid, piciMin, piciMax))
- // if res != nil && len(*res) == 1 {
- // gid, _ = (*res)[0]["id"].(string)
- // //id减1
- // gid = IdSub1(gid)
- // }
- // //最近10天内
- // //gid = TimestampToId(piciMin - 864000)
- // //此处如果还用原来的lteid,在同一个时间戳内有数据会被漏掉,所以要用大的id段,最当前时间的id段
- // // eid = Lteid
- // eid = TimestampToId(time.Now().Unix())
- // }
- }
- return
- }
- // id减1
- func IdSub1(id1 string) string {
- str := ""
- bf := false
- for k := len(id1) - 1; k >= 0; k-- {
- v := id1[k]
- if !bf {
- n, _ := strconv.ParseInt(string(v), 16, 64)
- if n > 0 {
- n--
- str = fmt.Sprintf("%x", n) + str
- bf = true
- } else {
- str = "9" + str
- }
- } else {
- //str = string(v) + str
- str = string(id1[:k+1]) + str
- break
- }
- }
- return str
- }
- // 获取最大id,无数据时返回默认的一个id
- func GetLastId(target *els) (id string) {
- res := target.ES.Get(target.Index, target.Stype, sql_last)
- if res != nil && len(*res) == 1 {
- id, _ = (*res)[0]["id"].(string)
- }
- if len(id) == 0 {
- id = "410d207be17a7c80fbde6709"
- }
- log.Println("----id---", id)
- return
- }
- // 默认取3天前
- func GetPiciLast(target *els) (pici int64) {
- res := target.ES.Get(target.Index, target.Stype, sql_picilast)
- if res != nil && len(*res) == 1 {
- pici = gconv.Int64((*res)[0]["pici"])
- }
- if pici == 0 {
- pici = time.Now().Unix() - 3*86400
- }
- log.Println("----pici---", pici)
- return
- }
- // 使用redindex模式
- func Reindex(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
- for {
- if target.currentTask < 5 {
- break
- }
- time.Sleep(50 * time.Millisecond)
- }
- conn := target.ES.GetEsConn()
- defer target.ES.DestoryEsConn(conn)
- //创建reindex对象
- rs := esv7.NewReindexSource()
- rs.Index(source.Index)
- //不在同一集群时,要增加remote,同时确保目标端能访问这个配置
- if target.Addr != source.Addr {
- ri := esv7.NewReindexRemoteInfo()
- addr := source.Addr
- if source.Sync != "" {
- addr = source.Sync
- }
- ri.Host(addr).Username(source.User).Password(source.Pwd)
- rs.RemoteInfo(ri)
- }
- //生成目标端查询sql
- find := esv7.NewBoolQuery()
- querys := []esv7.Query{}
- if !(lastmode > 2 && sid == "" && eid == "") { //当是pici模式时,不再传递id段,只适用于定时模式
- querys = append(querys, esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
- }
- //自定义了查询,增加
- if len(source.Boolsql) > 0 {
- rawsql := esv7.NewRawStringQuery(source.Boolsql)
- _, err := rawsql.Source()
- if err == nil {
- querys = append(querys, rawsql)
- } else {
- log.Println("sql转换出错", err, source.Boolsql)
- os.Exit(1)
- }
- }
- if lastmode >= 3 && piciobj != nil {
- querys = append(querys, esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
- }
- find.Filter(querys...)
- rs.Query(find)
- PrintSql(rs, "reindex")
- //限定了查询字段
- if len(target.Fields) > 0 {
- rs = rs.FetchSourceIncludeExclude(target.Fields, []string{})
- }
- ds := esv7.NewReindexDestination()
- ds.Index(target.Index)
- reindex := conn.Reindex().Source(rs).Destination(ds).Conflicts("proceed")
- res, err := reindex.DoAsync(context.Background())
- if err != nil {
- log.Println("reindex err", err)
- } else {
- target.currentTask++
- //放入数据库
- target.taskLock.Lock()
- v := db.Get(key)
- vm := map[string]interface{}{}
- if len(v) > 0 {
- json.Unmarshal([]byte(v), &vm)
- }
- vm[res.TaskId] = true
- by, _ := json.Marshal(vm)
- db.Set(key, string(by))
- target.taskLock.Unlock()
- log.Println("reindex start:", res.TaskId, sid, eid)
- }
- }
- // 日期转id
- func Parse2id(v string) string {
- t, err := time.ParseInLocation(layout, v, time.Local)
- if err != nil {
- log.Println("时间格式错误", v)
- } else {
- return TimestampToId(t.Unix())
- }
- return ""
- }
- // 时间转id
- func TimestampToId(timestamp int64) string {
- return fmt.Sprintf("%x0000000000000000", timestamp)
- }
- // 使用本地数据库
- func DB() *badger.DB {
- opts := badger.DefaultOptions("./data")
- opts.ValueLogFileSize = 1 << 24
- opts.ValueLogMaxEntries = 10
- db, err := badger.Open(opts)
- if err != nil {
- log.Fatal(err)
- }
- return db
- }
- // 数据库操作对象
- type bdb struct {
- db *badger.DB
- }
- // 保存
- func (db *bdb) Set(k, v string) {
- err := db.db.Update(func(txn *badger.Txn) error {
- err := txn.Set([]byte(k), []byte(v))
- return err
- })
- if err != nil {
- log.Println("保存出错:", err)
- }
- }
- // 删除
- func (db *bdb) Del(k string) {
- err := db.db.Update(func(txn *badger.Txn) error {
- return txn.Delete([]byte(k))
- })
- if err != nil {
- log.Println("删除error", err)
- }
- }
- // 获取
- func (db *bdb) Get(k string) string {
- v := ""
- err := db.db.View(func(txn *badger.Txn) error {
- item, err := txn.Get([]byte(k))
- if err != nil {
- return err
- }
- err = item.Value(func(val []byte) error {
- v = string(val)
- return nil
- })
- if err != nil {
- return err
- }
- return nil
- })
- if err != nil {
- log.Println("获取出错", err)
- }
- return v
- }
|