|
@@ -20,9 +20,11 @@ var (
|
|
|
CurrentColl string //当前表名
|
|
|
//SkipCollName string
|
|
|
//sendMsg string
|
|
|
- //collCount int
|
|
|
-
|
|
|
- saveArr [][]map[string]interface{}
|
|
|
+ collCount int // 当前表数据量
|
|
|
+ insertCount int // insert数据
|
|
|
+ updateCount int // update数据
|
|
|
+ saveLog = make(map[string]interface{})
|
|
|
+ saveArr [][]map[string]interface{}
|
|
|
|
|
|
UdpClient udp.UdpClient
|
|
|
qyxyEsAddr *net.UDPAddr
|
|
@@ -86,6 +88,10 @@ func dealPath(path string) {
|
|
|
}
|
|
|
//std 程序只需要关注6个表
|
|
|
for _, c := range CollArr {
|
|
|
+ start := time.Now()
|
|
|
+ collCount = 0
|
|
|
+ insertCount = 0
|
|
|
+ updateCount = 0
|
|
|
CurrentColl = c
|
|
|
subPath := path + c + "/"
|
|
|
//判断文件夹存在
|
|
@@ -109,8 +115,20 @@ func dealPath(path string) {
|
|
|
MongoTool.UpSertBulk(Sysconfig["dbSave"].(string), tmps...)
|
|
|
saveArr = [][]map[string]interface{}{}
|
|
|
}
|
|
|
+
|
|
|
+ duration := time.Since(start)
|
|
|
+ result := map[string]interface{}{
|
|
|
+ "count": collCount,
|
|
|
+ "duration": duration.Minutes(), //运行时长
|
|
|
+ "insert": insertCount, //插入数量
|
|
|
+ "update": updateCount, //更新数量
|
|
|
+ }
|
|
|
+
|
|
|
+ saveLog[c] = result
|
|
|
}
|
|
|
|
|
|
+ MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
|
|
|
+
|
|
|
//执行完毕通知es 程序
|
|
|
data := map[string]interface{}{
|
|
|
"start": true,
|
|
@@ -123,41 +141,44 @@ func dealPath(path string) {
|
|
|
func dealinfo(path string) {
|
|
|
count := 0
|
|
|
file := path + "/split.json.gz"
|
|
|
- util.Debug("current date: ", file)
|
|
|
- // 打开本地gz格式压缩包
|
|
|
- fr, err := os.Open(file)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- } else {
|
|
|
- println("open file success!")
|
|
|
- }
|
|
|
- // defer: 在函数退出时,执行关闭文件
|
|
|
- defer fr.Close()
|
|
|
- // 创建gzip文件读取对象
|
|
|
- gr, err := gzip.NewReader(fr)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- }
|
|
|
- // defer: 在函数退出时,执行关闭gzip对象
|
|
|
- defer gr.Close()
|
|
|
-
|
|
|
- bfRd := bufio.NewReader(gr)
|
|
|
- for {
|
|
|
- line, err := bfRd.ReadBytes('\n')
|
|
|
+ util.Debug("current date: ", file)
|
|
|
+ _, err := os.Stat(file)
|
|
|
+ if err == nil {
|
|
|
+ // 打开本地gz格式压缩包
|
|
|
+ fr, err := os.Open(file)
|
|
|
if err != nil {
|
|
|
- if err == io.EOF {
|
|
|
- fmt.Println("read gzip data finish! ")
|
|
|
- break
|
|
|
- } else {
|
|
|
- fmt.Println("[read gzip data err]: ", err)
|
|
|
- }
|
|
|
+ panic(err)
|
|
|
+ } else {
|
|
|
+ println("open file success!")
|
|
|
}
|
|
|
- if len(line) > 0 {
|
|
|
- count = hookfn(line, count)
|
|
|
+ // defer: 在函数退出时,执行关闭文件
|
|
|
+ defer fr.Close()
|
|
|
+ // 创建gzip文件读取对象
|
|
|
+ gr, err := gzip.NewReader(fr)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
}
|
|
|
+ // defer: 在函数退出时,执行关闭gzip对象
|
|
|
+ defer gr.Close()
|
|
|
+
|
|
|
+ bfRd := bufio.NewReader(gr)
|
|
|
+ for {
|
|
|
+ line, err := bfRd.ReadBytes('\n')
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ fmt.Println("read gzip data finish! ")
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ fmt.Println("[read gzip data err]: ", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(line) > 0 {
|
|
|
+ count = hookfn(line, count)
|
|
|
+ }
|
|
|
|
|
|
- if count%1000 == 0 {
|
|
|
- util.Debug("current exc---", file, count)
|
|
|
+ if count%1000 == 0 {
|
|
|
+ util.Debug("current exc---", file, count)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -170,8 +191,16 @@ func hookfn(line []byte, count int) int {
|
|
|
util.Debug("err---", err)
|
|
|
}
|
|
|
count++
|
|
|
+ collCount++
|
|
|
|
|
|
if _, ok := tmp["company_id"]; ok {
|
|
|
+ //统计插入、更新数量
|
|
|
+ if util.ObjToString(tmp["_operation_type"]) == "insert" {
|
|
|
+ insertCount++
|
|
|
+ } else {
|
|
|
+ updateCount++
|
|
|
+ }
|
|
|
+
|
|
|
//针对数据表 不同处理
|
|
|
switch CurrentColl {
|
|
|
case "company_base":
|