|
@@ -5,7 +5,7 @@ import (
|
|
"app.yhyue.com/data_processing/common_utils/udp"
|
|
"app.yhyue.com/data_processing/common_utils/udp"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
- "github.com/fsnotify/fsnotify"
|
|
|
|
|
|
+ "github.com/robfig/cron/v3"
|
|
"log"
|
|
"log"
|
|
"net"
|
|
"net"
|
|
"os"
|
|
"os"
|
|
@@ -29,8 +29,6 @@ var (
|
|
LastPath string //上传的最新文件夹
|
|
LastPath string //上传的最新文件夹
|
|
)
|
|
)
|
|
|
|
|
|
-//var option utils.Option
|
|
|
|
-
|
|
|
|
func init() {
|
|
func init() {
|
|
util.ReadConfig(&Sysconfig)
|
|
util.ReadConfig(&Sysconfig)
|
|
localPort := Sysconfig["local_port"].(string)
|
|
localPort := Sysconfig["local_port"].(string)
|
|
@@ -62,173 +60,104 @@ func init() {
|
|
}
|
|
}
|
|
|
|
|
|
func main() {
|
|
func main() {
|
|
- //监听目录
|
|
|
|
- watchPath := util.ObjToString(Sysconfig["watch_path"])
|
|
|
|
- pathinfo, _ := os.Stat(watchPath)
|
|
|
|
- if pathinfo.IsDir() {
|
|
|
|
- //先读取upload 所有文件夹
|
|
|
|
- dir := utils.Option{
|
|
|
|
- RootPath: watchPath,
|
|
|
|
- SubFlag: false,
|
|
|
|
- IgnorePath: []string{`.git`, `.svn`},
|
|
|
|
- IgnoreFile: []string{`.DS_Store`, `.gitignore`},
|
|
|
|
- }
|
|
|
|
- dirNodes, err := utils.Explorer(dir)
|
|
|
|
- if err != nil {
|
|
|
|
- util.Debug("utils.Explorer err =>", err)
|
|
|
|
- }
|
|
|
|
|
|
+ local, _ := time.LoadLocation("Asia/Shanghai")
|
|
|
|
+ cronServer := cron.New(cron.WithLocation(local), cron.WithSeconds())
|
|
|
|
|
|
- //获取三天内新建的文件夹,表示需要读取 的文件夹
|
|
|
|
- for _, v := range dirNodes.Children {
|
|
|
|
- if v.ModTime.AddDate(0, 0, 3).Sub(time.Now()) > 0 {
|
|
|
|
- LastPath = v.Path
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ //spec := "0 */1 * * * *" //秒,分,时,日,月,星期
|
|
|
|
+ spec := "0 57 09 * * 1"
|
|
|
|
+ cronServer.AddFunc(spec, dealWatch)
|
|
|
|
+ //c.AddFunc("0 9 * * 1", dealWatch)
|
|
|
|
|
|
- option.RootPath = LastPath
|
|
|
|
- rootNodes, err := utils.Explorer(option)
|
|
|
|
- if err != nil {
|
|
|
|
- util.Debug("utils.Explorer err =>", err)
|
|
|
|
- }
|
|
|
|
|
|
+ cronServer.Start()
|
|
|
|
|
|
- //1.判断数据表数量
|
|
|
|
- if len(rootNodes.Children) != pathTableSize {
|
|
|
|
- log.Printf("%s 目录下文件夹数量为:%s", rootNodes.Path, len(rootNodes.Children))
|
|
|
|
- } else {
|
|
|
|
- for _, v := range rootNodes.Children {
|
|
|
|
- fmt.Println(v.Name)
|
|
|
|
- if len(v.Children) != tableDateSize {
|
|
|
|
- log.Printf("%s 目录下文件夹数量为:%s", v.Path, len(v.Children))
|
|
|
|
- } else {
|
|
|
|
- for _, vv := range v.Children {
|
|
|
|
- for _, file := range vv.Children {
|
|
|
|
- if file.Name == "split.json.gz" {
|
|
|
|
- dateSlice = append(dateSlice, true)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if len(dateSlice) == tableDateSize {
|
|
|
|
- tableSlice = append(tableSlice, true)
|
|
|
|
- }
|
|
|
|
- dateSlice = make([]bool, 0)
|
|
|
|
|
|
+ select {}
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func test() {
|
|
|
|
+ fmt.Println("test -----")
|
|
|
|
+ log.Println("start dealWatch")
|
|
|
|
+ fmt.Println("test >>>>>>")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//dealWatch 处理监听程序,获取最新的文件夹路径
|
|
|
|
+func dealWatch() {
|
|
|
|
+ log.Println("start dealWatch")
|
|
|
|
+ //手动配置的文件路径
|
|
|
|
+ manualPath := util.ObjToString(Sysconfig["manual_path"])
|
|
|
|
+ if manualPath == "" {
|
|
|
|
+ //监听目录
|
|
|
|
+ watchPath := util.ObjToString(Sysconfig["watch_path"])
|
|
|
|
+ pathinfo, _ := os.Stat(watchPath)
|
|
|
|
+ if pathinfo.IsDir() {
|
|
|
|
+ //先读取upload 所有文件夹
|
|
|
|
+ dir := utils.Option{
|
|
|
|
+ RootPath: watchPath,
|
|
|
|
+ SubFlag: false,
|
|
|
|
+ IgnorePath: []string{`.git`, `.svn`},
|
|
|
|
+ IgnoreFile: []string{`.DS_Store`, `.gitignore`},
|
|
|
|
+ }
|
|
|
|
+ dirNodes, err := utils.Explorer(dir)
|
|
|
|
+ if err != nil {
|
|
|
|
+ util.Debug("utils.Explorer err =>", err)
|
|
}
|
|
}
|
|
|
|
|
|
- //满足条件,触发同步
|
|
|
|
- if len(tableSlice) == pathTableSize {
|
|
|
|
- data := map[string]interface{}{
|
|
|
|
- "start": true,
|
|
|
|
- "path": LastPath,
|
|
|
|
|
|
+ //获取三天内新建的文件夹,表示需要读取 的文件夹
|
|
|
|
+ for _, v := range dirNodes.Children {
|
|
|
|
+ if v.ModTime.AddDate(0, 0, 3).Sub(time.Now()) > 0 {
|
|
|
|
+ LastPath = v.Path
|
|
}
|
|
}
|
|
- SendUdpMsg(data, incDataAddr) //inc_data
|
|
|
|
- SendUdpMsg(data, incStdAddr) // inc_std
|
|
|
|
}
|
|
}
|
|
- tableSlice = make([]bool, 0)
|
|
|
|
|
|
+ } else {
|
|
|
|
+ LastPath = manualPath
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //真正读取压缩包
|
|
|
|
+ dealFiles()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func main2() {
|
|
|
|
- // Create new watcher.
|
|
|
|
- watcher, err := fsnotify.NewWatcher()
|
|
|
|
|
|
+//dealFiles 处理路径下的文件
|
|
|
|
+func dealFiles() {
|
|
|
|
+ option.RootPath = LastPath
|
|
|
|
+ rootNodes, err := utils.Explorer(option)
|
|
if err != nil {
|
|
if err != nil {
|
|
- log.Fatal("NewWatcher :", err)
|
|
|
|
|
|
+ util.Debug("utils.Explorer err =>", err)
|
|
}
|
|
}
|
|
- defer watcher.Close()
|
|
|
|
|
|
|
|
- // 添加监听目录
|
|
|
|
- err = watcher.Add(Sysconfig["watch_path"].(string))
|
|
|
|
- if err != nil {
|
|
|
|
- log.Fatal(err)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Start listening for events.
|
|
|
|
- go func() {
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case event, ok := <-watcher.Events:
|
|
|
|
- if !ok {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- //判断事件发生的类型,如下5种
|
|
|
|
- // Create 创建
|
|
|
|
- // Write 写入
|
|
|
|
- // Remove 删除
|
|
|
|
- // Rename 重命名
|
|
|
|
- // Chmod 修改权限
|
|
|
|
- if event.Op&fsnotify.Create == fsnotify.Create {
|
|
|
|
- log.Println("创建文件 : ", event.Name)
|
|
|
|
- option.RootPath = event.Name
|
|
|
|
- pathinfo, _ := os.Stat(event.Name)
|
|
|
|
- if pathinfo.IsDir() {
|
|
|
|
- rootNodes, err := utils.Explorer(option)
|
|
|
|
- if err != nil {
|
|
|
|
- util.Debug("utils.Explorer err =>", err)
|
|
|
|
|
|
+ //1.判断数据表数量
|
|
|
|
+ if len(rootNodes.Children) != pathTableSize {
|
|
|
|
+ log.Printf("%s 目录下文件夹数量为:%s", rootNodes.Path, len(rootNodes.Children))
|
|
|
|
+ } else {
|
|
|
|
+ for _, v := range rootNodes.Children {
|
|
|
|
+ fmt.Println(v.Name)
|
|
|
|
+ if len(v.Children) != tableDateSize {
|
|
|
|
+ log.Printf("%s 目录下文件夹数量为:%s", v.Path, len(v.Children))
|
|
|
|
+ } else {
|
|
|
|
+ for _, vv := range v.Children {
|
|
|
|
+ for _, file := range vv.Children {
|
|
|
|
+ if file.Name == "split.json.gz" {
|
|
|
|
+ dateSlice = append(dateSlice, true)
|
|
}
|
|
}
|
|
- //1.判断数据表数量
|
|
|
|
- if len(rootNodes.Children) != pathTableSize {
|
|
|
|
- log.Printf("%s 目录下文件夹数量为:%s", rootNodes.Path, len(rootNodes.Children))
|
|
|
|
- } else {
|
|
|
|
- for _, v := range rootNodes.Children {
|
|
|
|
- fmt.Println(v.Name)
|
|
|
|
- if len(v.Children) != tableDateSize {
|
|
|
|
- log.Printf("%s 目录下文件夹数量为:%s", v.Path, len(v.Children))
|
|
|
|
- } else {
|
|
|
|
- for _, vv := range v.Children {
|
|
|
|
- for _, file := range vv.Children {
|
|
|
|
- if file.Name == "split.json.gz" {
|
|
|
|
- dateSlice = append(dateSlice, true)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if len(dateSlice) == tableDateSize {
|
|
|
|
- tableSlice = append(tableSlice, true)
|
|
|
|
- }
|
|
|
|
- dateSlice = make([]bool, 0)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //fmt.Println("aaaa=>", len(tableSlice))
|
|
|
|
- //满足条件,触发同步
|
|
|
|
- if len(tableSlice) == pathTableSize {
|
|
|
|
- data := map[string]interface{}{
|
|
|
|
- "start": true,
|
|
|
|
- "path": event.Name,
|
|
|
|
- }
|
|
|
|
- SendUdpMsg(data, incDataAddr)
|
|
|
|
- SendUdpMsg(data, incStdAddr)
|
|
|
|
- }
|
|
|
|
- tableSlice = make([]bool, 0)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- if event.Op&fsnotify.Write == fsnotify.Write {
|
|
|
|
- log.Println("写入文件 : ", event.Name)
|
|
|
|
- }
|
|
|
|
- if event.Op&fsnotify.Remove == fsnotify.Remove {
|
|
|
|
- log.Println("删除文件 : ", event.Name)
|
|
|
|
- }
|
|
|
|
- if event.Op&fsnotify.Rename == fsnotify.Rename {
|
|
|
|
- log.Println("重命名文件 : ", event.Name)
|
|
|
|
- }
|
|
|
|
- if event.Op&fsnotify.Chmod == fsnotify.Chmod {
|
|
|
|
- log.Println("修改权限 : ", event.Name)
|
|
|
|
- }
|
|
|
|
- case err, ok := <-watcher.Errors:
|
|
|
|
- if !ok {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- log.Println("error:", err)
|
|
|
|
}
|
|
}
|
|
|
|
+ if len(dateSlice) == tableDateSize {
|
|
|
|
+ tableSlice = append(tableSlice, true)
|
|
|
|
+ }
|
|
|
|
+ dateSlice = make([]bool, 0)
|
|
}
|
|
}
|
|
- }()
|
|
|
|
-
|
|
|
|
- // Block main goroutine forever.
|
|
|
|
- <-make(chan struct{})
|
|
|
|
|
|
|
|
|
|
+ //满足条件,触发同步
|
|
|
|
+ if len(tableSlice) == pathTableSize {
|
|
|
|
+ data := map[string]interface{}{
|
|
|
|
+ "start": true,
|
|
|
|
+ "path": LastPath,
|
|
|
|
+ }
|
|
|
|
+ SendUdpMsg(data, incDataAddr) //inc_data
|
|
|
|
+ SendUdpMsg(data, incStdAddr) // inc_std
|
|
|
|
+ }
|
|
|
|
+ tableSlice = make([]bool, 0)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
//SendUdpMsg 通知处理企业新增数据
|
|
//SendUdpMsg 通知处理企业新增数据
|
|
@@ -239,6 +168,7 @@ func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
|
|
util.Debug("target :=>", target.IP, target.Port)
|
|
util.Debug("target :=>", target.IP, target.Port)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+//processUdpMsg 处理udp
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
switch act {
|
|
switch act {
|
|
case udp.OP_TYPE_DATA:
|
|
case udp.OP_TYPE_DATA:
|