main.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "github.com/spf13/viper"
  7. "github.com/studio-b12/gowebdav"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "os"
  13. "path/filepath"
  14. "time"
  15. "yusuan_oss/oss"
  16. )
  17. type GlobalConf struct {
  18. Wanpan WanpanConf
  19. Oss OssConf
  20. MongoB MgoConf
  21. Cron CronConf
  22. }
  23. type WanpanConf struct {
  24. Webdav string `json:"webdav"`
  25. User string `json:"user"`
  26. Sercert string `json:"sercert"`
  27. }
  28. type OssConf struct {
  29. Ossaddr string
  30. Bucketid string
  31. }
  32. type MgoConf struct {
  33. Host string
  34. DB string
  35. Coll string // 查询表
  36. Username string
  37. Password string
  38. Size int
  39. Direct bool
  40. }
  41. type CronConf struct {
  42. Spec string
  43. Start int
  44. End int
  45. }
  46. var (
  47. GF GlobalConf
  48. MgoB *mongodb.MongodbSim //
  49. )
  50. func init() {
  51. err := InitConfig()
  52. if err != nil {
  53. log.Println(err)
  54. }
  55. InitMgo()
  56. }
  57. func InitConfig() (err error) {
  58. viper.SetConfigFile("config.toml") // 指定配置文件路径
  59. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  60. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  61. viper.AddConfigPath("./")
  62. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  63. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  64. err = viper.ReadInConfig() // 查找并读取配置文件
  65. if err != nil { // 处理读取配置文件的错误
  66. return
  67. }
  68. err = viper.Unmarshal(&GF)
  69. return err
  70. }
  71. func InitMgo() {
  72. //bidding 查询
  73. MgoB = &mongodb.MongodbSim{
  74. MongodbAddr: GF.MongoB.Host,
  75. Size: 10,
  76. DbName: GF.MongoB.DB,
  77. UserName: GF.MongoB.Username,
  78. Password: GF.MongoB.Password,
  79. Direct: GF.MongoB.Direct,
  80. }
  81. MgoB.InitPool()
  82. err := MgoB.C.Ping(context.Background(), nil)
  83. if err != nil {
  84. log.Println("MgoB", GF.MongoB.Host, "链接失败")
  85. }
  86. }
  87. func main() {
  88. //TODO 遍历mgo库,yusuan.yusuan_fileitem表
  89. //allData() //处理预算文件存量数据
  90. local, _ := time.LoadLocation("Asia/Shanghai")
  91. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  92. _, err := c.AddFunc(GF.Cron.Spec, incData)
  93. if err != nil {
  94. log.Println("main", err)
  95. }
  96. log.Println("main", GF.Cron.Spec, "定时任务")
  97. c.Start()
  98. defer c.Stop()
  99. select {}
  100. }
  101. // incData 处理增量数据
  102. func incData() {
  103. log.Println("开始处理预算文件 --- 增量数据")
  104. sess := MgoB.GetMgoConn()
  105. defer MgoB.DestoryMongoConn(sess)
  106. now := time.Now()
  107. Yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  108. where := map[string]interface{}{
  109. "updatetime": map[string]interface{}{
  110. "$gte": Yesterday.Unix(),
  111. },
  112. }
  113. query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter()
  114. count := 0
  115. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  116. if count%100 == 0 {
  117. log.Println("current:", count, tmp["file_path"], tmp["_id"])
  118. }
  119. id := mongodb.BsonIdToSId(tmp["_id"])
  120. fpath := util.ObjToString(tmp["file_path"])
  121. err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id)
  122. if err != nil {
  123. log.Println("oss 预算文件上传失败", id, fpath, err)
  124. }
  125. }
  126. log.Println("incData", "预算文件,增量数据处理完毕", count)
  127. }
  128. // allData 处理预算文件存量数据
  129. func allData() {
  130. log.Println("开始处理预算文件存量数据")
  131. sess := MgoB.GetMgoConn()
  132. defer MgoB.DestoryMongoConn(sess)
  133. where := map[string]interface{}{
  134. "updatetime": map[string]interface{}{
  135. "$lte": 1753718400,
  136. },
  137. }
  138. //
  139. query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(where).Select(nil).Iter()
  140. count := 0
  141. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  142. if count%100 == 0 {
  143. log.Println("current:", count, tmp["file_path"], tmp["_id"])
  144. }
  145. id := mongodb.BsonIdToSId(tmp["_id"])
  146. fpath := util.ObjToString(tmp["file_path"])
  147. err := uploadFileContent(GF.Wanpan.Webdav, GF.Wanpan.User, GF.Wanpan.Sercert, fpath, id)
  148. if err != nil {
  149. log.Println("oss 预算文件上传失败", id, fpath, err)
  150. }
  151. }
  152. log.Println("allData", "预算文件,存量数据处理完毕")
  153. }
  154. // 云盘获取数据方法
  155. func download(webdav, user, secert, fpath string) {
  156. client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secert))
  157. client.Connect()
  158. filename := filepath.Base(fpath)
  159. localFilePath := fmt.Sprintf("./%s", filename)
  160. //log.Println(localFilePath)
  161. reader, err := client.ReadStream(fpath)
  162. if err != nil {
  163. log.Println("yunpan download err:", err)
  164. }
  165. file, err := os.Create(localFilePath)
  166. if err != nil {
  167. log.Println("create file err:", err)
  168. }
  169. defer file.Close()
  170. io.Copy(file, reader)
  171. }
  172. // uploadFileContent 获取附件内容,然后上传OSS
  173. func uploadFileContent(webdav, user, secret, fpath, object_name string) error {
  174. client := gowebdav.NewAuthClient(webdav, gowebdav.NewAutoAuth(user, secret))
  175. err := client.Connect()
  176. if err != nil {
  177. return fmt.Errorf("webdav connect error: %v", err)
  178. }
  179. filename := filepath.Base(fpath)
  180. localFilePath := fmt.Sprintf("./%s", filename)
  181. reader, err := client.ReadStream(fpath)
  182. if err != nil {
  183. return fmt.Errorf("yunpan download error: %v", err)
  184. }
  185. defer reader.Close()
  186. // 创建本地文件
  187. file, err := os.Create(localFilePath)
  188. if err != nil {
  189. return fmt.Errorf("create file error: %v", err)
  190. }
  191. _, err = io.Copy(file, reader)
  192. file.Close() // 注意:复制完就关闭
  193. if err != nil {
  194. return fmt.Errorf("copy file error: %v", err)
  195. }
  196. // 读取文件内容
  197. contentBytes, err := os.ReadFile(localFilePath)
  198. if err != nil {
  199. return fmt.Errorf("read file error: %v", err)
  200. }
  201. // 删除文件
  202. err = os.Remove(localFilePath)
  203. if err != nil {
  204. return fmt.Errorf("delete file error: %v", err)
  205. }
  206. //上传附件内容到OSS
  207. res := oss.UpLoadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, object_name, contentBytes, false)
  208. if res.Error_msg != oss.UploadSuccess {
  209. log.Println("oss 上传失败", res, object_name, fpath, res)
  210. }
  211. ////获取oss 附件内容
  212. //res2 := oss.DownloadByRestful(GF.Oss.Ossaddr, GF.Oss.Bucketid, "6603e3e8138c4f04f7fc0ca2")
  213. //log.Println(res2.Error_code, res2.Error_msg)
  214. return nil
  215. }