main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "app.yhyue.com/moapp/jybase/es"
  15. "github.com/dgraph-io/badger/v4"
  16. "github.com/gogf/gf/v2/util/gconv"
  17. esv7 "github.com/olivere/elastic/v7"
  18. "gopkg.in/yaml.v3"
  19. )
  20. //命令默认是config.yaml,可以传递 -f 指定配置文件
  21. type (
  22. cfg struct {
  23. Ses *els
  24. Des *els
  25. Gtid string
  26. Lteid string
  27. Mode int //同步模式
  28. Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
  29. Sync []*syobj
  30. Synctest bool //是否空跑
  31. }
  32. syobj struct {
  33. Freq int64
  34. Before int64
  35. Scope int64
  36. Lastmode int //0默认取时间 1取最大id 2取最大id和时间的最小值
  37. Force bool //强制同步,不对比
  38. }
  39. sypici struct { //pici模式时需要传递时间对象
  40. Gt int64
  41. Lte int64
  42. }
  43. els struct {
  44. Addr string
  45. Sync string
  46. Index string
  47. Stype string
  48. Boolsql string
  49. Fields []string
  50. User string
  51. Pwd string
  52. Size int
  53. ES *es.EsV7
  54. currentTask int
  55. taskLock sync.Mutex
  56. }
  57. )
  58. var (
  59. Cfg = &cfg{}
  60. //统计sql
  61. sql_count = `{"query":{"range":{"id":{"gt":"%s","lte":"%s"}}}}`
  62. //取最大id sql
  63. sql_last = `{"query":{"match_all":{}},"sort":[{"id":"desc"}],"_source":["id"],"size":1}`
  64. sql_picilast = `{"query":{"match_all":{}},"sort":[{"pici":"desc"}],"_source":["pici"],"size":1}`
  65. sql_piciid = `{"query": {"range": {"pici":{"gt":%d,"lte":%d}}},"sort": [{"id": "asc"}],"_source": ["id"],"size": 1}`
  66. layout = "2006-01-02 15:04:05"
  67. //本在数据库
  68. db = &bdb{DB()}
  69. conf = "./config.yaml"
  70. )
  71. // 加载配置文件
  72. func LoadConf(file string, cfg any) {
  73. bs, err := ioutil.ReadFile(file)
  74. if err != nil {
  75. log.Fatal("loadcfg", err.Error())
  76. }
  77. err = yaml.Unmarshal(bs, cfg)
  78. if err != nil {
  79. log.Fatal("loadcfg json", err.Error())
  80. }
  81. }
  82. func inits() {
  83. LoadConf(conf, &Cfg)
  84. if len(Cfg.Gtid) == 19 {
  85. Cfg.Gtid = Parse2id(Cfg.Gtid)
  86. }
  87. if len(Cfg.Lteid) == 19 {
  88. Cfg.Lteid = Parse2id(Cfg.Lteid)
  89. }
  90. Cfg.Ses.ES = es.NewEs("v7", Cfg.Ses.Addr, Cfg.Ses.Size, Cfg.Ses.User, Cfg.Ses.Pwd).(*es.EsV7)
  91. Cfg.Des.ES = es.NewEs("v7", Cfg.Des.Addr, Cfg.Des.Size, Cfg.Des.User, Cfg.Des.Pwd).(*es.EsV7)
  92. }
  93. func main() {
  94. flag.StringVar(&conf, "f", "./config.yaml", "配置文件")
  95. flag.Parse()
  96. inits()
  97. // 6509c5800000000000000000 650ak6g000000000000000zv 95790 9562
  98. // 6509c5800000000000000000 650ak6g00000000000000001 95790 9562
  99. // 6509c5800000000000000000 650ak6g00000000000000000 95790 9562
  100. // log.Println(GetMid("6509c5800000000000000000", "650ak6g000000000000000zv"))
  101. // log.Println(GetMid("6509c5800000000000000000", "650ak6g00000000000000000"))
  102. // int64Value, _ := strconv.ParseInt("6509c580", 16, 64)
  103. // int64Value2, _ := strconv.ParseInt("6509c580", 16, 64)
  104. // v := (int64Value + int64Value2) / 2
  105. // log.Println(int64Value, int64Value2, fmt.Sprintf("%x", v))
  106. if len(Cfg.Ses.Boolsql) > 0 {
  107. sql := esv7.NewBoolQuery().Filter(esv7.NewRawStringQuery(Cfg.Ses.Boolsql))
  108. sqls, err := sql.Source()
  109. by, _ := json.Marshal(sqls)
  110. log.Println("限定查询范围:", string(by), err)
  111. }
  112. if len(Cfg.Des.Fields) > 0 {
  113. log.Println("限定同步字段", Cfg.Des.Fields)
  114. }
  115. log.Println("准备执行任务...")
  116. time.Sleep(1 * time.Second)
  117. switch Cfg.Mode {
  118. case 1, 2: //单向取时间段
  119. func() {
  120. go GetTask(Cfg.Des, "atask")
  121. gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Des, Cfg.Ses, Cfg.Lastmode)
  122. BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", Cfg.Lastmode, piciobj)
  123. log.Println("atask", "over...")
  124. }()
  125. if Cfg.Mode == 2 {
  126. func() {
  127. go GetTask(Cfg.Ses, "btask")
  128. gid, eid, piciobj := GetIds(Cfg.Gtid, Cfg.Lteid, Cfg.Ses, Cfg.Des, Cfg.Lastmode)
  129. BinarySearch(Cfg.Des, Cfg.Ses, gid, eid, "btask", Cfg.Lastmode, piciobj)
  130. log.Println("btask", "over...")
  131. }()
  132. }
  133. case 3, 4: //单向,定时同步多少秒的数据
  134. go GetTask(Cfg.Des, "atask")
  135. if Cfg.Mode == 4 {
  136. go GetTask(Cfg.Ses, "btask")
  137. }
  138. for _, obj := range Cfg.Sync {
  139. go func(obj *syobj) {
  140. for {
  141. now := time.Now().Unix()
  142. //下次执行时间
  143. nt := now + obj.Freq
  144. no := now - obj.Before //从几分钟前开始
  145. gtid := TimestampToId(no - obj.Scope)
  146. lteid := TimestampToId(no)
  147. gid, eid, piciobj := GetIds(gtid, lteid, Cfg.Des, Cfg.Ses, obj.Lastmode)
  148. if obj.Force {
  149. if !Cfg.Synctest {
  150. Reindex(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
  151. }
  152. } else {
  153. BinarySearch(Cfg.Ses, Cfg.Des, gid, eid, "atask", obj.Lastmode, piciobj)
  154. }
  155. log.Println("atask", "over...", "freq", obj.Freq)
  156. //-----------------
  157. if Cfg.Mode == 4 {
  158. gid1, eid1, piciobj := GetIds(gtid, lteid, Cfg.Ses, Cfg.Des, obj.Lastmode)
  159. if obj.Force {
  160. if !Cfg.Synctest {
  161. Reindex(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
  162. }
  163. } else {
  164. BinarySearch(Cfg.Des, Cfg.Ses, gid1, eid1, "atask", obj.Lastmode, piciobj)
  165. }
  166. log.Println("btask", "over...", "freq", obj.Freq)
  167. }
  168. last := nt - time.Now().Unix()
  169. if last > 0 {
  170. time.Sleep(time.Second * time.Duration(last))
  171. }
  172. }
  173. }(obj)
  174. time.Sleep(25 * time.Second)
  175. }
  176. }
  177. time.Sleep(99999 * time.Hour)
  178. }
  179. // 获取统计sql
  180. func GetCountSql(source *els, target *els, sid, eid string, lastmode int, piciobj *sypici) any {
  181. count := esv7.NewBoolQuery()
  182. if lastmode >= 3 { //使用pici
  183. count.Filter(esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
  184. } else {
  185. count.Filter(esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
  186. }
  187. if len(source.Boolsql) > 0 {
  188. count.Filter(esv7.NewRawStringQuery(source.Boolsql))
  189. }
  190. PrintSql(count, "count")
  191. return count
  192. }
  193. func PrintSql(query esv7.Query, note string) {
  194. v, _ := query.Source()
  195. bs, _ := json.Marshal(v)
  196. log.Println("query:", note, string(bs))
  197. }
  198. // 二分法查找执行任务
  199. func BinarySearch(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
  200. countSql := GetCountSql(source, target, sid, eid, lastmode, piciobj)
  201. scount := source.ES.Count(source.Index, source.Stype, countSql)
  202. dcount := target.ES.Count(target.Index, target.Stype, countSql)
  203. log.Println("compare:", lastmode, key, sid, eid, scount, dcount)
  204. if scount > 0 && scount != dcount && !(scount < dcount && dcount < 10000) {
  205. if lastmode > 2 { //pici模式
  206. if dcount > scount { //目标集群比源集群量多,不继续
  207. return
  208. }
  209. mid := (piciobj.Gt + piciobj.Lte) / 2
  210. if dcount == 0 || piciobj.Gt == piciobj.Lte || piciobj.Gt == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
  211. log.Println("sync-pici", !Cfg.Synctest, key, piciobj.Gt, piciobj.Lte, mid, scount, dcount)
  212. if !Cfg.Synctest {
  213. Reindex(source, target, sid, eid, key, lastmode, piciobj)
  214. }
  215. } else {
  216. BinarySearch(source, target, sid, "", key, lastmode, &sypici{piciobj.Gt, mid})
  217. BinarySearch(source, target, "", eid, key, lastmode, &sypici{mid, piciobj.Lte})
  218. }
  219. } else {
  220. if dcount-scount == 1 { //目标集群比源集群量多1条,不继续
  221. return
  222. }
  223. //满足条件则开启同步
  224. isid, _ := strconv.ParseInt(sid[:8], 16, 64)
  225. ieid, _ := strconv.ParseInt(eid[:8], 16, 64)
  226. mid := TimestampToId((isid + ieid) / 2)
  227. //6509dbd90000000000000000 6509dbda0000000000000000
  228. if dcount == 0 || isid == ieid || sid == mid || (dcount < scount && (float64(scount-dcount)/float64(scount)) > 0.98) {
  229. log.Println("sync", !Cfg.Synctest, key, sid, eid, scount, dcount)
  230. if !Cfg.Synctest {
  231. Reindex(source, target, sid, eid, key, lastmode, piciobj)
  232. }
  233. } else {
  234. BinarySearch(source, target, sid, mid, key, lastmode, piciobj)
  235. BinarySearch(source, target, mid, eid, key, lastmode, piciobj)
  236. }
  237. }
  238. }
  239. }
  240. // 定时获取任务列表并清除队列,只为修改currentTask值
  241. func GetTask(target *els, key string) {
  242. conn := target.ES.GetEsConn()
  243. defer target.ES.DestoryEsConn(conn)
  244. tasks := conn.TasksList()
  245. tr, _ := tasks.Do(context.Background())
  246. target.taskLock.Lock()
  247. v := db.Get(key)
  248. if tr != nil && len(v) > 0 {
  249. var vm map[string]interface{}
  250. json.Unmarshal([]byte(v), &vm)
  251. if vm != nil {
  252. var newmap = map[string]bool{}
  253. for k1, _ := range vm {
  254. vs := strings.Split(k1, ":")
  255. if len(vs) == 2 {
  256. tn := tr.Nodes[vs[0]]
  257. if tn != nil {
  258. ti := tn.Tasks[k1]
  259. if ti != nil {
  260. newmap[k1] = true
  261. }
  262. }
  263. }
  264. }
  265. target.currentTask = len(newmap)
  266. by, _ := json.Marshal(newmap)
  267. if target.currentTask > 0 {
  268. log.Println(key, vm, target.currentTask)
  269. }
  270. db.Set(key, string(by))
  271. }
  272. }
  273. target.taskLock.Unlock()
  274. //WQTx3SgKTiy_kR0Zk25hzg WQTx3SgKTiy_kR0Zk25hzg:1176185 1176185 WQTx3SgKTiy_kR0Zk25hzg
  275. time.AfterFunc(time.Second*1, func() {
  276. GetTask(target, key)
  277. })
  278. }
  279. // 获取同步的id范围
  280. func GetIds(Gtid, Lteid string, target *els, source *els, lastmode int) (gid, eid string, piciobj *sypici) {
  281. switch lastmode {
  282. case 0:
  283. gid = Gtid
  284. eid = Lteid
  285. case 1:
  286. gid = GetLastId(target)
  287. eid = Lteid
  288. if eid < gid {
  289. eid = gid
  290. }
  291. case 2:
  292. id := GetLastId(target)
  293. if id < Gtid {
  294. gid = id
  295. } else {
  296. gid = Gtid
  297. }
  298. eid = Lteid
  299. if eid < gid {
  300. eid = gid
  301. }
  302. //pici模式,只需要pici即可,只有定时模式适用!!!---删除---取源端的最大id和最小id
  303. case 3, 4:
  304. // lastmode > 2 {
  305. piciMin := int64(0)
  306. piciMax, _ := strconv.ParseInt(Lteid[0:8], 16, 64)
  307. if lastmode == 3 { //取target的最大pici
  308. piciMin = GetPiciLast(target)
  309. } else if lastmode == 4 { //取传递的最小pici
  310. piciMin, _ = strconv.ParseInt(Gtid[0:8], 16, 64)
  311. }
  312. piciobj = &sypici{piciMin, piciMax}
  313. // } else {
  314. // gid = Gtid
  315. // // //到源端去查找最小id,这个id有可能也没同步,所有用gt会漏掉这一条数据
  316. // res := source.ES.Get(source.Index, source.Stype, fmt.Sprintf(sql_piciid, piciMin, piciMax))
  317. // if res != nil && len(*res) == 1 {
  318. // gid, _ = (*res)[0]["id"].(string)
  319. // //id减1
  320. // gid = IdSub1(gid)
  321. // }
  322. // //最近10天内
  323. // //gid = TimestampToId(piciMin - 864000)
  324. // //此处如果还用原来的lteid,在同一个时间戳内有数据会被漏掉,所以要用大的id段,最当前时间的id段
  325. // // eid = Lteid
  326. // eid = TimestampToId(time.Now().Unix())
  327. // }
  328. }
  329. return
  330. }
  331. // id减1
  332. func IdSub1(id1 string) string {
  333. str := ""
  334. bf := false
  335. for k := len(id1) - 1; k >= 0; k-- {
  336. v := id1[k]
  337. if !bf {
  338. n, _ := strconv.ParseInt(string(v), 16, 64)
  339. if n > 0 {
  340. n--
  341. str = fmt.Sprintf("%x", n) + str
  342. bf = true
  343. } else {
  344. str = "9" + str
  345. }
  346. } else {
  347. //str = string(v) + str
  348. str = string(id1[:k+1]) + str
  349. break
  350. }
  351. }
  352. return str
  353. }
  354. // 获取最大id,无数据时返回默认的一个id
  355. func GetLastId(target *els) (id string) {
  356. res := target.ES.Get(target.Index, target.Stype, sql_last)
  357. if res != nil && len(*res) == 1 {
  358. id, _ = (*res)[0]["id"].(string)
  359. }
  360. if len(id) == 0 {
  361. id = "410d207be17a7c80fbde6709"
  362. }
  363. log.Println("----id---", id)
  364. return
  365. }
  366. // 默认取3天前
  367. func GetPiciLast(target *els) (pici int64) {
  368. res := target.ES.Get(target.Index, target.Stype, sql_picilast)
  369. if res != nil && len(*res) == 1 {
  370. pici = gconv.Int64((*res)[0]["pici"])
  371. }
  372. if pici == 0 {
  373. pici = time.Now().Unix() - 3*86400
  374. }
  375. log.Println("----pici---", pici)
  376. return
  377. }
  378. // 使用redindex模式
  379. func Reindex(source *els, target *els, sid, eid, key string, lastmode int, piciobj *sypici) {
  380. for {
  381. if target.currentTask < 5 {
  382. break
  383. }
  384. time.Sleep(50 * time.Millisecond)
  385. }
  386. conn := target.ES.GetEsConn()
  387. defer target.ES.DestoryEsConn(conn)
  388. //创建reindex对象
  389. rs := esv7.NewReindexSource()
  390. rs.Index(source.Index)
  391. //不在同一集群时,要增加remote,同时确保目标端能访问这个配置
  392. if target.Addr != source.Addr {
  393. ri := esv7.NewReindexRemoteInfo()
  394. addr := source.Addr
  395. if source.Sync != "" {
  396. addr = source.Sync
  397. }
  398. ri.Host(addr).Username(source.User).Password(source.Pwd)
  399. rs.RemoteInfo(ri)
  400. }
  401. //生成目标端查询sql
  402. find := esv7.NewBoolQuery()
  403. querys := []esv7.Query{}
  404. if !(lastmode > 2 && sid == "" && eid == "") { //当是pici模式时,不再传递id段,只适用于定时模式
  405. querys = append(querys, esv7.NewRangeQuery("id").Gt(sid).Lte(eid))
  406. }
  407. //自定义了查询,增加
  408. if len(source.Boolsql) > 0 {
  409. rawsql := esv7.NewRawStringQuery(source.Boolsql)
  410. _, err := rawsql.Source()
  411. if err == nil {
  412. querys = append(querys, rawsql)
  413. } else {
  414. log.Println("sql转换出错", err, source.Boolsql)
  415. os.Exit(1)
  416. }
  417. }
  418. if lastmode >= 3 && piciobj != nil {
  419. querys = append(querys, esv7.NewRangeQuery("pici").Gt(piciobj.Gt).Lte(piciobj.Lte))
  420. }
  421. find.Filter(querys...)
  422. rs.Query(find)
  423. PrintSql(rs, "reindex")
  424. //限定了查询字段
  425. if len(target.Fields) > 0 {
  426. rs = rs.FetchSourceIncludeExclude(target.Fields, []string{})
  427. }
  428. ds := esv7.NewReindexDestination()
  429. ds.Index(target.Index)
  430. reindex := conn.Reindex().Source(rs).Destination(ds).Conflicts("proceed")
  431. res, err := reindex.DoAsync(context.Background())
  432. if err != nil {
  433. log.Println("reindex err", err)
  434. } else {
  435. target.currentTask++
  436. //放入数据库
  437. target.taskLock.Lock()
  438. v := db.Get(key)
  439. vm := map[string]interface{}{}
  440. if len(v) > 0 {
  441. json.Unmarshal([]byte(v), &vm)
  442. }
  443. vm[res.TaskId] = true
  444. by, _ := json.Marshal(vm)
  445. db.Set(key, string(by))
  446. target.taskLock.Unlock()
  447. log.Println("reindex start:", res.TaskId, sid, eid)
  448. }
  449. }
  450. // 日期转id
  451. func Parse2id(v string) string {
  452. t, err := time.ParseInLocation(layout, v, time.Local)
  453. if err != nil {
  454. log.Println("时间格式错误", v)
  455. } else {
  456. return TimestampToId(t.Unix())
  457. }
  458. return ""
  459. }
  460. // 时间转id
  461. func TimestampToId(timestamp int64) string {
  462. return fmt.Sprintf("%x0000000000000000", timestamp)
  463. }
  464. // 使用本地数据库
  465. func DB() *badger.DB {
  466. opts := badger.DefaultOptions("./data")
  467. opts.ValueLogFileSize = 1 << 24
  468. opts.ValueLogMaxEntries = 10
  469. db, err := badger.Open(opts)
  470. if err != nil {
  471. log.Fatal(err)
  472. }
  473. return db
  474. }
  475. // 数据库操作对象
  476. type bdb struct {
  477. db *badger.DB
  478. }
  479. // 保存
  480. func (db *bdb) Set(k, v string) {
  481. err := db.db.Update(func(txn *badger.Txn) error {
  482. err := txn.Set([]byte(k), []byte(v))
  483. return err
  484. })
  485. if err != nil {
  486. log.Println("保存出错:", err)
  487. }
  488. }
  489. // 删除
  490. func (db *bdb) Del(k string) {
  491. err := db.db.Update(func(txn *badger.Txn) error {
  492. return txn.Delete([]byte(k))
  493. })
  494. if err != nil {
  495. log.Println("删除error", err)
  496. }
  497. }
  498. // 获取
  499. func (db *bdb) Get(k string) string {
  500. v := ""
  501. err := db.db.View(func(txn *badger.Txn) error {
  502. item, err := txn.Get([]byte(k))
  503. if err != nil {
  504. return err
  505. }
  506. err = item.Value(func(val []byte) error {
  507. v = string(val)
  508. return nil
  509. })
  510. if err != nil {
  511. return err
  512. }
  513. return nil
  514. })
  515. if err != nil {
  516. log.Println("获取出错", err)
  517. }
  518. return v
  519. }