123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- package main
- import (
- "context"
- "fmt"
- "github.com/robfig/cron/v3"
- "github.com/spf13/viper"
- "github.com/studio-b12/gowebdav"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "os"
- "path/filepath"
- "time"
- "yusuan_oss/oss"
- )
- type GlobalConf struct {
- Wanpan WanpanConf
- Oss OssConf
- MongoB MgoConf
- Cron CronConf
- }
- type WanpanConf struct {
- Webdav string `json:"webdav"`
- User string `json:"user"`
- Sercert string `json:"sercert"`
- }
- type OssConf struct {
- Ossaddr string
- Bucketid string
- }
- type MgoConf struct {
- Host string
- DB string
- Coll string // 查询表
- Username string
- Password string
- Size int
- Direct bool
- }
- type CronConf struct {
- Spec string
- Start int
- End int
- }
- var (
- GF GlobalConf
- MgoB *mongodb.MongodbSim //
- )
- func init() {
- err := InitConfig()
- if err != nil {
- log.Println(err)
- }
- InitMgo()
- }
- func InitConfig() (err error) {
- viper.SetConfigFile("config.toml") // 指定配置文件路径
- viper.SetConfigName("config") // 配置文件名称(无扩展名)
- viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
- viper.AddConfigPath("./")
- viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
- viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
- err = viper.ReadInConfig() // 查找并读取配置文件
- if err != nil { // 处理读取配置文件的错误
- return
- }
- err = viper.Unmarshal(&GF)
- return err
- }
- func InitMgo() {
- //bidding 查询
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: GF.MongoB.Host,
- Size: 10,
- DbName: GF.MongoB.DB,
- UserName: GF.MongoB.Username,
- Password: GF.MongoB.Password,
- Direct: GF.MongoB.Direct,
- }
- MgoB.InitPool()
- err := MgoB.C.Ping(context.Background(), nil)
- if err != nil {
- log.Println("MgoB", GF.MongoB.Host, "链接失败")
- }
- }
- func main() {
- //TODO 遍历mgo库,yusuan.yusuan_fileitem表
- //allData() //处理预算文件存量数据
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- _, err := c.AddFunc(GF.Cron.Spec, incData)
- if err != nil {
- log.Println("main", err)
- }
- log.Println("main", GF.Cron.Spec, "定时任务")
- c.Start()
- defer c.Stop()
- select {}
- }
- // incData 处理增量数据
- func incData() {
- log.Println("开始处理预算文件 --- 增量数据")
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- now := time.Now()
- Yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
- where := map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gte": Yesterday.Unix(),
- },
- }
- query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current:", count, tmp["file_path"], tmp["_id"])
- }
- id := mongodb.BsonIdToSId(tmp["_id"])
- fpath := util.ObjToString(tmp["file_path"])
- err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id)
- if err != nil {
- log.Println("oss 预算文件上传失败", id, fpath, err)
- }
- }
- log.Println("incData", "预算文件,增量数据处理完毕", count)
- }
- // allData 处理预算文件存量数据
- func allData() {
- log.Println("开始处理预算文件存量数据")
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$lte": 1753718400,
- },
- }
- //
- query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current:", count, tmp["file_path"], tmp["_id"])
- }
- id := mongodb.BsonIdToSId(tmp["_id"])
- fpath := util.ObjToString(tmp["file_path"])
- err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id)
- if err != nil {
- log.Println("oss 预算文件上传失败", id, fpath, err)
- }
- }
- log.Println("allData", "预算文件,存量数据处理完毕")
- }
- // 云盘获取数据方法
- func download(webdav, user, secert, fpath string) {
- client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secert))
- client.Connect()
- filename := filepath.Base(fpath)
- localFilePath := fmt.Sprintf("./%s", filename)
- //log.Println(localFilePath)
- reader, err := client.ReadStream(fpath)
- if err != nil {
- log.Println("yunpan download err:", err)
- }
- file, err := os.Create(localFilePath)
- if err != nil {
- log.Println("create file err:", err)
- }
- defer file.Close()
- io.Copy(file, reader)
- }
- // uploadFileContent 获取附件内容,然后上传OSS
- func uploadFileContent(webdav, user, secret, fpath, object_name string) error {
- client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secret))
- err := client.Connect()
- if err != nil {
- return fmt.Errorf("webdav connect error: %v", err)
- }
- filename := filepath.Base(fpath)
- localFilePath := fmt.Sprintf("./%s", filename)
- reader, err := client.ReadStream(fpath)
- if err != nil {
- return fmt.Errorf("yunpan download error: %v", err)
- }
- defer reader.Close()
- // 创建本地文件
- file, err := os.Create(localFilePath)
- if err != nil {
- return fmt.Errorf("create file error: %v", err)
- }
- _, err = io.Copy(file, reader)
- file.Close() // 注意:复制完就关闭
- if err != nil {
- return fmt.Errorf("copy file error: %v", err)
- }
- // 读取文件内容
- contentBytes, err := os.ReadFile(localFilePath)
- if err != nil {
- return fmt.Errorf("read file error: %v", err)
- }
- // 删除文件
- err = os.Remove(localFilePath)
- if err != nil {
- return fmt.Errorf("delete file error: %v", err)
- }
- //上传附件内容到OSS
- res := oss.UpLoadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, object_name, contentBytes, false)
- if res.Error_msg != oss.UploadSuccess {
- log.Println("oss 上传失败", res, object_name, fpath, res)
- }
- ////获取oss 附件内容
- //res2 := oss.DownloadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, "6603e3e8138c4f04f7fc0ca2")
- //log.Println(res2.Error_code, res2.Error_msg)
- return nil
- }
|