1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package main
- import (
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gfsnotify"
- "google.golang.org/grpc"
- "gopkg.in/natefinch/lumberjack.v2"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
- ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
- "log"
- "net"
- "net/http"
- "net/rpc"
- "time"
- )
- func main() {
- ctx := gctx.New()
- var logger *lumberjack.Logger
- g.Config().MustGet(ctx, "logger").Struct(&logger)
- log.SetOutput(logger)
- // 初始化OSS帐号与bucket信息
- ossService.LoadOSSAccounts()
- // 注册一个回调函数,当配置发生变更时会被调用
- downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int()
- getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
- getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
- gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
- log.Println(event.String())
- if event.IsWrite() || event.IsChmod() || event.IsRename() {
- log.Println("配置文件有变化,更新内存。。。")
- ossService.LoadOSSAccounts()
- if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize {
- downLoadPoolSize = poolSize
- ossService.DownLoadPool = make(chan bool, downLoadPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
- getDetailFromEsPoolSize = poolSize
- ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
- getDetailFromMgoPoolSize = poolSize
- ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
- }
- }
- })
- // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
- go func() {
- ticker := time.NewTicker(5 * time.Second)
- var onlineNodesPrevWarn, downLoadQueuePrevWarn int64
- ctx := gctx.New()
- for range ticker.C {
- util.SendHeartbeat(ctx)
- util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
- util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn)
- }
- }()
- go func() {
- //创建一个grpc 服务器
- s := grpc.NewServer()
- //注册事件
- pb.RegisterServiceServer(s, &ossService.Grpc{})
- grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
- //处理链接
- listen, err := net.Listen("tcp", grpcPort)
- if err != nil {
- log.Println(err)
- } else {
- log.Println("grpc server is listening", grpcPort)
- }
- s.Serve(listen)
- }()
- // 启动RPC服务:注册OSSService,实现接口调用
- rpcService := new(ossService.OSSService)
- rpc.Register(rpcService)
- rpc.HandleHTTP()
- http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
- http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
- http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
- http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
- http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
- port := g.Config().MustGet(ctx, "port").String()
- log.Println("HTTP server started on " + port)
- if err := http.ListenAndServe(port, nil); err != nil {
- //if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
- log.Fatalln("HTTP server error", err)
- }
- }
|