|
@@ -2,12 +2,15 @@ package main
|
|
|
|
|
|
import (
|
|
|
. "app.yhyue.com/moapp/jybase/mongodb"
|
|
|
+ osr "app.yhyue.com/moapp/jybase/overseer"
|
|
|
"flag"
|
|
|
+ "fmt"
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
"github.com/gogf/gf/v2/os/gctx"
|
|
|
"github.com/gogf/gf/v2/os/gfsnotify"
|
|
|
"google.golang.org/grpc"
|
|
|
"gopkg.in/natefinch/lumberjack.v2"
|
|
|
+ "io"
|
|
|
"jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
|
|
|
"jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
|
|
|
"jygit.jydev.jianyu360.cn/BaseService/ossService/config"
|
|
@@ -17,16 +20,33 @@ import (
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/rpc"
|
|
|
+ "os"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+const Version = "V1.3"
|
|
|
+
|
|
|
func main() {
|
|
|
+ ctx := gctx.New()
|
|
|
+ osr.Run(osr.Config{
|
|
|
+ Addresses: []string{g.Config().MustGet(ctx, "port").String(), g.Config().MustGet(ctx, "grpcPort").String()}, // 多个监听地址
|
|
|
+ Program: program,
|
|
|
+ })
|
|
|
+}
|
|
|
+func program(state osr.State) {
|
|
|
+ pid := os.Getpid()
|
|
|
+ log.Println("ppid", os.Getppid(), "子进程", pid, "启动,监听端口", strings.Join(state.Addresses, " "))
|
|
|
ctx := gctx.New()
|
|
|
var logger *lumberjack.Logger
|
|
|
g.Config().MustGet(ctx, "logger").Struct(&logger)
|
|
|
- log.SetOutput(logger)
|
|
|
+ writers := []io.Writer{logger}
|
|
|
+ if g.Config().MustGet(ctx, "logger.console").Bool() {
|
|
|
+ writers = append(writers, os.Stdout)
|
|
|
+ }
|
|
|
+ log.SetOutput(io.MultiWriter(writers...))
|
|
|
+ config.InitDb()
|
|
|
// 初始化OSS帐号与bucket信息
|
|
|
ossService.LoadOSSAccounts()
|
|
|
// 注册一个回调函数,当配置发生变更时会被调用
|
|
@@ -56,7 +76,6 @@ func main() {
|
|
|
go func() {
|
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
|
var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
|
|
|
- ctx := gctx.New()
|
|
|
for range ticker.C {
|
|
|
util.SendHeartbeat(ctx)
|
|
|
util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
|
|
@@ -65,37 +84,40 @@ func main() {
|
|
|
util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
|
|
|
}
|
|
|
}()
|
|
|
- go func() {
|
|
|
- //创建一个grpc 服务器
|
|
|
- s := grpc.NewServer()
|
|
|
- //注册事件
|
|
|
- pb.RegisterServiceServer(s, &ossService.Grpc{})
|
|
|
- grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
|
|
|
- //处理链接
|
|
|
- listen, err := net.Listen("tcp", grpcPort)
|
|
|
- if err != nil {
|
|
|
- log.Println(err)
|
|
|
- } else {
|
|
|
- log.Println("grpc server is listening", grpcPort)
|
|
|
- }
|
|
|
- s.Serve(listen)
|
|
|
- }()
|
|
|
+ // state.Listeners 与配置的 Addresses 顺序一致
|
|
|
+ servers := make([]osr.Server, len(state.Listeners))
|
|
|
// 启动RPC服务:注册OSSService,实现接口调用
|
|
|
rpcService := new(ossService.OSSService)
|
|
|
rpc.Register(rpcService)
|
|
|
rpc.HandleHTTP()
|
|
|
-
|
|
|
- http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
|
|
|
- http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
|
|
|
- http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
|
|
|
- http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
|
|
|
- http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
|
|
|
- port := g.Config().MustGet(ctx, "port").String()
|
|
|
- log.Println("HTTP server started on " + port)
|
|
|
- if err := http.ListenAndServe(port, nil); err != nil {
|
|
|
- //if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
|
|
|
- log.Fatalln("HTTP server error", err)
|
|
|
+ mux := http.DefaultServeMux
|
|
|
+ mux.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
|
|
|
+ mux.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
|
|
|
+ mux.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
|
|
|
+ mux.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
|
|
|
+ mux.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
|
|
|
+ mux.HandleFunc("/ossservice/version", func(w http.ResponseWriter, r *http.Request) {
|
|
|
+ fmt.Fprint(w, "版本号:", Version)
|
|
|
+ })
|
|
|
+ servers[0] = &http.Server{Handler: mux}
|
|
|
+ //创建一个grpc 服务器
|
|
|
+ grpcServer := grpc.NewServer()
|
|
|
+ defer grpcServer.GracefulStop()
|
|
|
+ //注册事件
|
|
|
+ pb.RegisterServiceServer(grpcServer, &ossService.Grpc{})
|
|
|
+ servers[1] = grpcServer
|
|
|
+ // 启动所有服务(每个监听对应一个服务器)
|
|
|
+ for i, listener := range state.Listeners {
|
|
|
+ go func(srv osr.Server, l net.Listener) {
|
|
|
+ log.Println("子进程", pid, "启动端口", l.Addr().String())
|
|
|
+ if err := srv.Serve(l); err != nil {
|
|
|
+ log.Println("端口", l.Addr().String(), "服务启动失败", err)
|
|
|
+ }
|
|
|
+ }(servers[i], listener)
|
|
|
}
|
|
|
+ // 监听关闭信号,优雅关闭所有服务器
|
|
|
+ <-state.GracefulShutdown
|
|
|
+ log.Println("子进程", pid, "停止运行。。。")
|
|
|
}
|
|
|
|
|
|
// /////////////////
|