Browse Source

增加scylla迁移到mongo库

5 years ago
parent
commit
543bfb0732
2 changed files with 195 additions and 0 deletions
  1. 15 0
      src/moveToMgo/config.json
  2. 180 0
      src/moveToMgo/main.go

+ 15 - 0
src/moveToMgo/config.json

@@ -0,0 +1,15 @@
+{
+    "ca_hosts": [
+        "172.17.145.178"
+    ],
+    "ca_port": 9042,
+    "cron": "0 50 8 * * ?",
+    "id": "2020-03-30",
+	"savedays":3,
+    "mgo_addr": "127.0.0.1:27083",
+    "tables": [
+        "jy_search",
+        "jy_fapp|id,client,date,ip,mdescribe,openid,uname,url,refer",
+        "jy_flogs|id,client,date,ip,mdescribe,openid,uname,url,refer|/front/getLoginNum,/biddetail/normal/qr,/front/hasSign,/front/share/,/jylab/supsearch/getstatus,/front/ajaxPolling,/front/pcAllNotice"
+    ]
+}

+ 180 - 0
src/moveToMgo/main.go

@@ -0,0 +1,180 @@
+package main
+
+//定时迁移到207上的mongo库
+import (
+	ca "cassandra"
+	"fmt"
+	"jy/mongodb"
+	"log"
+	"math/big"
+	"net/http"
+	"os"
+	"qfw/util"
+	"strings"
+	"time"
+	//	"github.com/gocql/gocql"
+	"github.com/robfig/cron"
+)
+
+var config map[string]interface{}
+var job = &Job{}
+
+type Job struct {
+	Ca     *ca.SimpleCassandra
+	tables []string //表名,支持字段、过滤url
+	Mgo    *mongodb.MongodbSim
+	pool   chan bool //保存到mongo的线程池
+	last   int64     //上次发送邮件报警的时间
+	fail   bool      //任务失败标志
+}
+
+func init() {
+	util.ReadConfig(&config)
+	job.Ca = &ca.SimpleCassandra{}
+	job.Ca.InitSimpleCassandra(
+		"jianyu",
+		1,
+		util.IntAllDef(config["ca_port"], 9042),
+		util.ObjArrToStringArr(config["ca_hosts"].([]interface{})))
+	job.tables = util.ObjArrToStringArr(config["tables"].([]interface{}))
+	job.Mgo = mongodb.NewMgo(config["mgo_addr"].(string), "jianyu", 6)
+	job.pool = make(chan bool, 4)
+}
+
+//迁移
+func (j *Job) run() {
+	date := config["id"].(string)
+	//迁移3天前的数据
+	end_id, _ := config["end_id"].(string)
+	if end_id == "" {
+		end_id = time.Now().AddDate(0, 0, -util.IntAllDef(config["savedays"], 2)).Format(util.Date_Short_Layout)
+	}
+	ds, _ := time.Parse(util.Date_Short_Layout, date)
+	de, _ := time.Parse(util.Date_Short_Layout, end_id)
+L:
+	for ds.Before(de) {
+		date = ds.Format(util.Date_Short_Layout)
+		for _, k1 := range j.tables {
+			ks := strings.Split(k1, "|")
+			k := ks[0]
+			f := "*"
+			if len(ks) > 1 {
+				f = ks[1]
+			}
+			url_prefix := []string{}
+			if len(ks) > 2 {
+				url_prefix = strings.Split(ks[2], ",")
+			}
+			iter := j.Ca.CDB.Query(fmt.Sprintf("select %s from %s where id=?", f, k), date).Iter()
+			log.Println("start", k, date)
+			count := 0
+			save := []map[string]interface{}{}
+		S:
+			for {
+				row := map[string]interface{}{}
+				if !iter.MapScan(row) {
+					break
+				}
+				count++
+				d, _ := row["date"].(time.Time)
+				for k, v := range row {
+					if v1, ok := v.(*big.Int); ok {
+						row[k] = v1.Int64()
+					}
+				}
+				//对url的过滤
+				if len(url_prefix) > 0 && row["url"] != nil {
+					url, _ := row["url"].(string)
+					for _, u := range url_prefix {
+						if strings.HasPrefix(url, u) {
+							continue S
+						}
+					}
+				}
+				//tuid := row["tuid"].(gocql.UUID).String()
+				delete(row, "tuid")
+				row["date"] = d.Unix()
+				save = append(save, row)
+				if len(save) > 800 {
+					j.pool <- true
+					go func(s []map[string]interface{}) {
+						defer func() {
+							<-j.pool
+						}()
+						j.save(k, s)
+					}(save)
+					save = []map[string]interface{}{}
+				}
+				if count%10000 == 0 {
+					log.Println("current:", k, date, count)
+				}
+				if j.fail { //当失败时退出
+					break L
+				}
+			}
+			if len(save) > 0 {
+				j.save(k, save)
+			}
+			//等待任务结束
+			for i := 0; i < 3; i++ {
+				j.pool <- true
+			}
+			log.Println("over:", k, date, count)
+			//删除已迁移的scylladb数据
+			if count > 10000 && !j.fail {
+				j.Ca.CDB.Query("delete from "+k+" where id=?", date).Exec()
+			}
+			job.pool = make(chan bool, 3)
+		}
+		ds = ds.AddDate(0, 0, 1)
+	}
+	if !j.fail {
+		config["id"] = ds.Format(util.Date_Short_Layout)
+		util.WriteSysConfig(config)
+	}
+	//重置失败状态
+	j.fail = false
+}
+
+//保存到mongo+重试
+func (j *Job) save(k string, s []map[string]interface{}) {
+	maxN := 1
+	bsave := false
+	for !bsave {
+		bsave := j.Mgo.SaveBulk(k, s...)
+		if bsave {
+			break
+		} else {
+			log.Println("saveid ", s[0])
+			time.Sleep(time.Duration(20*maxN) * time.Second)
+			if maxN < 8 {
+				maxN++
+			} else {
+				log.Println("save faild!!! ")
+				j.send(k + ",迁移失败")
+				j.fail = true
+				break
+			}
+		}
+	}
+}
+
+func main() {
+	if len(os.Args) > 1 && os.Args[1] == "r" {
+		log.Println("启动时运行迁移")
+		job.run()
+	}
+	c := cron.New()
+	c.AddFunc(config["cron"].(string), job.run)
+	c.Start()
+	defer c.Stop()
+	select {}
+}
+
+//发送邮件通知
+func (j *Job) send(content string) {
+	if time.Now().Unix()-j.last > 7200 {
+		j.last = time.Now().Unix()
+		http.Get(fmt.Sprintf("http://10.171.112.160:19281/_send/_mail?to=%s&title=%s&body=%s", "renzheng@topnet.net.cn", "迁移日志失败", content))
+	}
+}