package main import ( "context" "encoding/json" "flag" "fmt" "io/ioutil" "log" "os" "strconv" "strings" "sync" "time" "app.yhyue.com/moapp/jybase/es" "github.com/dgraph-io/badger/v4" "github.com/gogf/gf/v2/util/gconv" esv7 "github.com/olivere/elastic/v7" "gopkg.in/yaml.v3" ) //命令默认是config.yaml,可以传递 -f 指定配置文件 type ( cfg struct { Ses *els Des *els Gtid string Lteid string Mode int //同步模式 Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值 Sync []*syobj Synctest bool //是否空跑 } syobj struct { Freq int64 Before int64 Scope int64 Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值 Force bool //强制同步,不对比 } sypici struct { //pici模式时需要传递时间对象 Gt int64 Lte int64 } els struct { Addr string Sync string Index string Stype string Boolsql string Fields []string User string Pwd string Size int ES *es.EsV7 currentTask int taskLock sync.Mutex } ) var ( Cfg = &cfg{} //统计sql sql_count = `{"query":{"range":{"id":{"gt":"%s","lte":"%s"}}}}` //取最大id sql sql_last = `{"query":{"match_all":{}},"sort":[{"id":"desc"}],"_source":["id"],"size":1}` sql_picilast = `{"query":{"match_all":{}},"sort":[{"pici":"desc"}],"_source":["pici"],"size":1}` sql_piciid = `{"query": {"range": {"pici":{"gt":%d,"lte":%d}}},"sort": [{"id": "asc"}],"_source": ["id"],"size": 1}` layout = "2006-01-02 15:04:05" //本在数据库 db = &bdb{DB()} conf = "./config.yaml" ) // 加载配置文件 func LoadConf(file string, cfg any) { bs, err := ioutil.ReadFile(file) if err != nil { log.Fatal("loadcfg", err.Error()) } err = yaml.Unmarshal(bs, cfg) if err != nil { log.Fatal("loadcfg json", err.Error()) } } func inits() { LoadConf(conf, &Cfg) if len(Cfg.Gtid) == 19 { Cfg.Gtid = Parse2id(Cfg.Gtid) } if len(Cfg.Lteid) == 19 { Cfg.Lteid = Parse2id(Cfg.Lteid) } Cfg.Ses.ES = es.NewEs("v7", Cfg.Ses.Addr, Cfg.Ses.Size, Cfg.Ses.User, Cfg.Ses.Pwd).(*es.EsV7) Cfg.Des.ES = es.NewEs("v7", Cfg.Des.Addr, Cfg.Des.Size, Cfg.Des.User, Cfg.Des.Pwd).(*es.EsV7) } func main() { flag.StringVar(&conf, "f", "./config.yaml", "配置文件") flag.Parse() inits() // 6509c5800000000000000000 650ak6g000000000000000zv 95790 9562 // 6509c5800000000000000000 650ak6g00000000000000001 95790 9562 // 6509c5800000000000000000 650ak6g00000000000000000 95790 9562 // log.Println(GetMid("6509c5800000000000000000", "650ak6g000000000000000zv")) // log.Println(GetMid("6509c5800000000000000000", "650ak6g00000000000000000")) // int64Value, _ := strconv.ParseInt("6509c580", 16, 64) // int64Value2, _ := strconv.ParseInt("6509c580", 16, 64) // v := (int64Value + int64Value2) / 2 // log.Println(int64Value, int64Value2, fmt.Sprintf("%x", v)) if len(Cfg.Ses.Boolsql) > 0 { sql := esv7.NewBoolQuery().Filter(esv7.NewRawStringQuery(Cfg.Ses.Boolsql)) sqls, err := sql.Source() by, _ := json.Marshal(sqls) log.Println("限定查询范围:", string(by), err) } if len(Cfg.Des.Fields) > 0 { log.Println("限定同步字段", Cfg.Des.Fields) } log.Println("准备执行任务...") time.Sleep(1 * time.Second) switch Cfg.Mode { case 1, 2: //单向取时间段 func() { go GetTask(Cfg.Des, "atask") gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Des, Cfg.Ses, Cfg.Lastmode) BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", Cfg.Lastmode, piciobj) log.Println("atask", "over...") }() if Cfg.Mode == 2 { func() { go GetTask(Cfg.Ses, "btask") gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Ses, Cfg.Des, Cfg.Lastmode) BinarySearch(Cfg.Des, Cfg.Ses, gid, eid, "btask", Cfg.Lastmode, piciobj) log.Println("btask", "over...") }() } case 3, 4: //单向,定时同步多少秒的数据 go GetTask(Cfg.Des, "atask") if Cfg.Mode == 4 { go GetTask(Cfg.Ses, "btask") } for _, obj := range Cfg.Sync { go func(obj *syobj) { for { now := time.Now().Unix() //下次执行时间 nt := now + obj.Freq no := now - obj.Before //从几分钟前开始 gtid := TimestampToId(no - obj.Scope) lteid := TimestampToId(no) gid, eid, piciobj := GetIds(gtid, lteid, Cfg.Des, Cfg.Ses, obj.Lastmode) if obj.Force { if !Cfg.Synctest { Reindex(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj) } } else { BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj) } log.Println("atask", "over...", "freq", obj.Freq) //----------------- if Cfg.Mode == 4 { gid1, eid1, piciobj := GetIds(gtid, lteid, Cfg.Ses, Cfg.Des, obj.Lastmode) if obj.Force { if !Cfg.Synctest { Reindex(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj) } } else { BinarySearch(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj) } log.Println("btask", "over...", "freq", obj.Freq) } last := nt - time.Now().Unix() if last > 0 { time.Sleep(time.Second * time.Duration(last)) } } }(obj) time.Sleep(25 * time.Second) } } time.Sleep(99999 * time.Hour) } // 获取统计sql func GetCountSql(source *els, target *els, sid, eid string, lastmode int, piciobj *sypici) any { count := esv7.NewBoolQuery() if lastmode >= 3 { //使用pici count.Filter(esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte)) } else { count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid)) } if len(source.Boolsql) > 0 { count.Filter(esv7.NewRawStringQuery(source.Boolsql)) } PrintSql(count, "count") return count } func PrintSql(query esv7.Query, note string) { v, _ := query.Source() bs, _ := json.Marshal(v) log.Println("query:", note, string(bs)) } // 二分法查找执行任务 func BinarySearch(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) { countSql := GetCountSql(source, target, sid, eid, lastmode, piciobj) scount := source.ES.Count(source.Index, source.Stype, countSql) dcount := target.ES.Count(target.Index, target.Stype, countSql) log.Println("compare:", lastmode, key, sid, eid, scount, dcount) if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) { if lastmode > 2 { //pici模式 if dcount > scount { //目标集群比源集群量多,不继续 return } mid := (piciobj.Gt + piciobj.Lte) / 2 if dcount == 0 || piciobj.Gt == piciobj.Lte || piciobj.Gt == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) { log.Println("sync-pici", !Cfg.Synctest, key, piciobj.Gt, piciobj.Lte, mid, scount, dcount) if !Cfg.Synctest { Reindex(source, target, sid, eid, key, lastmode, piciobj) } } else { BinarySearch(source, target, sid, "", key, lastmode, &sypici{piciobj.Gt, mid}) BinarySearch(source, target, "", eid, key, lastmode, &sypici{mid, piciobj.Lte}) } } else { if dcount-scount == 1 { //目标集群比源集群量多1条,不继续 return } //满足条件则开启同步 isid, _ := strconv.ParseInt(sid[:8], 16, 64) ieid, _ := strconv.ParseInt(eid[:8], 16, 64) mid := TimestampToId((isid + ieid) / 2) //6509dbd90000000000000000 6509dbda0000000000000000 if dcount == 0 || isid == ieid || sid == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) { log.Println("sync", !Cfg.Synctest, key, sid, eid, scount, dcount) if !Cfg.Synctest { Reindex(source, target, sid, eid, key, lastmode, piciobj) } } else { BinarySearch(source, target, sid, mid, key, lastmode, piciobj) BinarySearch(source, target, mid, eid, key, lastmode, piciobj) } } } } // 定时获取任务列表并清除队列,只为修改currentTask值 func GetTask(target *els, key string) { conn := target.ES.GetEsConn() defer target.ES.DestoryEsConn(conn) tasks := conn.TasksList() tr, _ := tasks.Do(context.Background()) target.taskLock.Lock() v := db.Get(key) if tr != nil && len(v) > 0 { var vm map[string]interface{} json.Unmarshal([]byte(v), &vm) if vm != nil { var newmap = map[string]bool{} for k1, _ := range vm { vs := strings.Split(k1, ":") if len(vs) == 2 { tn := tr.Nodes[vs[0]] if tn != nil { ti := tn.Tasks[k1] if ti != nil { newmap[k1] = true } } } } target.currentTask = len(newmap) by, _ := json.Marshal(newmap) if target.currentTask > 0 { log.Println(key, vm, target.currentTask) } db.Set(key, string(by)) } } target.taskLock.Unlock() //WQTx3SgKTiy_kR0Zk25hzg WQTx3SgKTiy_kR0Zk25hzg:1176185 1176185 WQTx3SgKTiy_kR0Zk25hzg time.AfterFunc(time.Second*1, func() { GetTask(target, key) }) } // 获取同步的id范围 func GetIds(Gtid, Lteid string, target *els, source *els, lastmode int) (gid, eid string, piciobj *sypici) { switch lastmode { case 0: gid = Gtid eid = Lteid case 1: gid = GetLastId(target) eid = Lteid if eid < gid { eid = gid } case 2: id := GetLastId(target) if id < Gtid { gid = id } else { gid = Gtid } eid = Lteid if eid < gid { eid = gid } //pici模式,只需要pici即可,只有定时模式适用!!!---删除---取源端的最大id和最小id case 3, 4: // lastmode > 2 { piciMin := int64(0) piciMax, _ := strconv.ParseInt(Lteid[0:8], 16, 64) if lastmode == 3 { //取target的最大pici piciMin = GetPiciLast(target) } else if lastmode == 4 { //取传递的最小pici piciMin, _ = strconv.ParseInt(Gtid[0:8], 16, 64) } piciobj = &sypici{piciMin, piciMax} // } else { // gid = Gtid // // //到源端去查找最小id,这个id有可能也没同步,所有用gt会漏掉这一条数据 // res := source.ES.Get(source.Index, source.Stype, fmt.Sprintf(sql_piciid, piciMin, piciMax)) // if res != nil && len(*res) == 1 { // gid, _ = (*res)[0]["id"].(string) // //id减1 // gid = IdSub1(gid) // } // //最近10天内 // //gid = TimestampToId(piciMin - 864000) // //此处如果还用原来的lteid,在同一个时间戳内有数据会被漏掉,所以要用大的id段,最当前时间的id段 // // eid = Lteid // eid = TimestampToId(time.Now().Unix()) // } } return } // id减1 func IdSub1(id1 string) string { str := "" bf := false for k := len(id1) - 1; k >= 0; k-- { v := id1[k] if !bf { n, _ := strconv.ParseInt(string(v), 16, 64) if n > 0 { n-- str = fmt.Sprintf("%x", n) + str bf = true } else { str = "9" + str } } else { //str = string(v) + str str = string(id1[:k+1]) + str break } } return str } // 获取最大id,无数据时返回默认的一个id func GetLastId(target *els) (id string) { res := target.ES.Get(target.Index, target.Stype, sql_last) if res != nil && len(*res) == 1 { id, _ = (*res)[0]["id"].(string) } if len(id) == 0 { id = "410d207be17a7c80fbde6709" } log.Println("----id---", id) return } // 默认取3天前 func GetPiciLast(target *els) (pici int64) { res := target.ES.Get(target.Index, target.Stype, sql_picilast) if res != nil && len(*res) == 1 { pici = gconv.Int64((*res)[0]["pici"]) } if pici == 0 { pici = time.Now().Unix() - 3*86400 } log.Println("----pici---", pici) return } // 使用redindex模式 func Reindex(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) { for { if target.currentTask < 5 { break } time.Sleep(50 * time.Millisecond) } conn := target.ES.GetEsConn() defer target.ES.DestoryEsConn(conn) //创建reindex对象 rs := esv7.NewReindexSource() rs.Index(source.Index) //不在同一集群时,要增加remote,同时确保目标端能访问这个配置 if target.Addr != source.Addr { ri := esv7.NewReindexRemoteInfo() addr := source.Addr if source.Sync != "" { addr = source.Sync } ri.Host(addr).Username(source.User).Password(source.Pwd) rs.RemoteInfo(ri) } //生成目标端查询sql find := esv7.NewBoolQuery() querys := []esv7.Query{} if !(lastmode > 2 && sid == "" && eid == "") { //当是pici模式时,不再传递id段,只适用于定时模式 querys = append(querys, esv7.NewRangeQuery("id").Gt(sid).Lte(eid)) } //自定义了查询,增加 if len(source.Boolsql) > 0 { rawsql := esv7.NewRawStringQuery(source.Boolsql) _, err := rawsql.Source() if err == nil { querys = append(querys, rawsql) } else { log.Println("sql转换出错", err, source.Boolsql) os.Exit(1) } } if lastmode >= 3 && piciobj != nil { querys = append(querys, esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte)) } find.Filter(querys...) rs.Query(find) PrintSql(rs, "reindex") //限定了查询字段 if len(target.Fields) > 0 { rs = rs.FetchSourceIncludeExclude(target.Fields, []string{}) } ds := esv7.NewReindexDestination() ds.Index(target.Index) reindex := conn.Reindex().Source(rs).Destination(ds).Conflicts("proceed") res, err := reindex.DoAsync(context.Background()) if err != nil { log.Println("reindex err", err) } else { target.currentTask++ //放入数据库 target.taskLock.Lock() v := db.Get(key) vm := map[string]interface{}{} if len(v) > 0 { json.Unmarshal([]byte(v), &vm) } vm[res.TaskId] = true by, _ := json.Marshal(vm) db.Set(key, string(by)) target.taskLock.Unlock() log.Println("reindex start:", res.TaskId, sid, eid) } } // 日期转id func Parse2id(v string) string { t, err := time.ParseInLocation(layout, v, time.Local) if err != nil { log.Println("时间格式错误", v) } else { return TimestampToId(t.Unix()) } return "" } // 时间转id func TimestampToId(timestamp int64) string { return fmt.Sprintf("%x0000000000000000", timestamp) } // 使用本地数据库 func DB() *badger.DB { opts := badger.DefaultOptions("./data") opts.ValueLogFileSize = 1 << 24 opts.ValueLogMaxEntries = 10 db, err := badger.Open(opts) if err != nil { log.Fatal(err) } return db } // 数据库操作对象 type bdb struct { db *badger.DB } // 保存 func (db *bdb) Set(k, v string) { err := db.db.Update(func(txn *badger.Txn) error { err := txn.Set([]byte(k), []byte(v)) return err }) if err != nil { log.Println("保存出错:", err) } } // 删除 func (db *bdb) Del(k string) { err := db.db.Update(func(txn *badger.Txn) error { return txn.Delete([]byte(k)) }) if err != nil { log.Println("删除error", err) } } // 获取 func (db *bdb) Get(k string) string { v := "" err := db.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(k)) if err != nil { return err } err = item.Value(func(val []byte) error { v = string(val) return nil }) if err != nil { return err } return nil }) if err != nil { log.Println("获取出错", err) } return v }