package main import ( "context" "fmt" "github.com/robfig/cron/v3" "github.com/spf13/viper" "github.com/studio-b12/gowebdav" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "os" "path/filepath" "time" "yusuan_oss/oss" ) type GlobalConf struct { Wanpan WanpanConf Oss OssConf MongoB MgoConf Cron CronConf } type WanpanConf struct { Webdav string `json:"webdav"` User string `json:"user"` Sercert string `json:"sercert"` } type OssConf struct { Ossaddr string Bucketid string } type MgoConf struct { Host string DB string Coll string // 查询表 Username string Password string Size int Direct bool } type CronConf struct { Spec string Start int End int } var ( GF GlobalConf MgoB *mongodb.MongodbSim // ) func init() { err := InitConfig() if err != nil { log.Println(err) } InitMgo() } func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitMgo() { //bidding 查询 MgoB = &mongodb.MongodbSim{ MongodbAddr: GF.MongoB.Host, Size: 10, DbName: GF.MongoB.DB, UserName: GF.MongoB.Username, Password: GF.MongoB.Password, Direct: GF.MongoB.Direct, } MgoB.InitPool() err := MgoB.C.Ping(context.Background(), nil) if err != nil { log.Println("MgoB", GF.MongoB.Host, "链接失败") } } func main() { //TODO 遍历mgo库,yusuan.yusuan_fileitem表 //allData() //处理预算文件存量数据 local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) _, err := c.AddFunc(GF.Cron.Spec, incData) if err != nil { log.Println("main", err) } log.Println("main", GF.Cron.Spec, "定时任务") c.Start() defer c.Stop() select {} } // incData 处理增量数据 func incData() { log.Println("开始处理预算文件 --- 增量数据") sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) now := time.Now() Yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local) where := map[string]interface{}{ "updatetime": map[string]interface{}{ "$lte": Yesterday.Unix(), }, } query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Println("current:", count, tmp["file_path"], tmp["_id"]) } id := mongodb.BsonIdToSId(tmp["_id"]) fpath := util.ObjToString(tmp["file_path"]) err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id) if err != nil { log.Println("oss 预算文件上传失败", id, fpath, err) } } log.Println("incData", "预算文件,增量数据处理完毕", count) } // allData 处理预算文件存量数据 func allData() { log.Println("开始处理预算文件存量数据") sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) where := map[string]interface{}{ "updatetime": map[string]interface{}{ "$lte": 1753718400, }, } // query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Println("current:", count, tmp["file_path"], tmp["_id"]) } id := mongodb.BsonIdToSId(tmp["_id"]) fpath := util.ObjToString(tmp["file_path"]) err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id) if err != nil { log.Println("oss 预算文件上传失败", id, fpath, err) } } log.Println("allData", "预算文件,存量数据处理完毕") } // 云盘获取数据方法 func download(webdav, user, secert, fpath string) { client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secert)) client.Connect() filename := filepath.Base(fpath) localFilePath := fmt.Sprintf("./%s", filename) //log.Println(localFilePath) reader, err := client.ReadStream(fpath) if err != nil { log.Println("yunpan download err:", err) } file, err := os.Create(localFilePath) if err != nil { log.Println("create file err:", err) } defer file.Close() io.Copy(file, reader) } // uploadFileContent 获取附件内容,然后上传OSS func uploadFileContent(webdav, user, secret, fpath, object_name string) error { client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secret)) err := client.Connect() if err != nil { return fmt.Errorf("webdav connect error: %v", err) } filename := filepath.Base(fpath) localFilePath := fmt.Sprintf("./%s", filename) reader, err := client.ReadStream(fpath) if err != nil { return fmt.Errorf("yunpan download error: %v", err) } defer reader.Close() // 创建本地文件 file, err := os.Create(localFilePath) if err != nil { return fmt.Errorf("create file error: %v", err) } _, err = io.Copy(file, reader) file.Close() // 注意:复制完就关闭 if err != nil { return fmt.Errorf("copy file error: %v", err) } // 读取文件内容 contentBytes, err := os.ReadFile(localFilePath) if err != nil { return fmt.Errorf("read file error: %v", err) } // 删除文件 err = os.Remove(localFilePath) if err != nil { return fmt.Errorf("delete file error: %v", err) } //上传附件内容到OSS res := oss.UpLoadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, object_name, contentBytes, false) if res.Error_msg != oss.UploadSuccess { log.Println("oss 上传失败", res, object_name, fpath, res) } ////获取oss 附件内容 //res2 := oss.DownloadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, "6603e3e8138c4f04f7fc0ca2") //log.Println(res2.Error_code, res2.Error_msg) return nil }