package main import ( "app.yhyue.com/moapp/jybase/go-xweb/httpsession" "app.yhyue.com/moapp/jybase/go-xweb/xweb" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "log" "net" "net/http" "os" "os/signal" . "rpc/chat" "rpc/config" _ "rpc/filter" "rpc/service" "sync" "syscall" "time" ) type server struct { httpServer *http.Server grpcServer *grpc.Server healthServer *health.Server wg sync.WaitGroup startTime time.Time } func main() { // 初始化配置检查 if config.DbConf == nil { log.Fatal("配置未初始化") } // 创建服务器实例 srv := &server{ startTime: time.Now(), } // 设置上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { srv.initGRPCServer(config.DbConf.Password) // 启动服务 srv.startServices() // 设置定时任务 srv.setupTimedTasks(ctx) // 等待终止信号 srv.waitForShutdown() // 优雅关闭 srv.gracefulShutdown() }() // 初始化服务 srv.initHTTPServer() } func (s *server) initHTTPServer() { // 1. 初始化xweb配置 httpsession.IsRedisSessionStore = true httpsession.RedisNotLoginKey = "userId" xweb.Config.Profiler = true xweb.RootApp().BasePath = "/wxRobot" xweb.RootApp().AppConfig.StaticFileVersion = false xweb.RootApp().AppConfig.CheckXsrf = false xweb.RootApp().AppConfig.EnableHttpCache = false xweb.RootApp().AppConfig.Mode = xweb.Product xweb.RootApp().AppConfig.ReloadTemplates = true xweb.RootApp().AppConfig.SessionTimeout = 7 * 24 * time.Hour xweb.RootApp().Logger.SetOutputLevel(1) // 2. 注册控制器 xweb.AddAction(&service.WXroBot{}) mux1 := http.NewServeMux() xweb.RunBase(":"+config.DbConf.WebPort, mux1) } func (s *server) initGRPCServer(password string) { s.grpcServer = grpc.NewServer( grpc.Creds(insecure.NewCredentials()), grpc.ChainUnaryInterceptor( PasswordAuthInterceptor(password), ), grpc.KeepaliveParams(keepalive.ServerParameters{ Time: 10 * time.Minute, Timeout: 3 * time.Second, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 30 * time.Second, PermitWithoutStream: true, }), ) // 3. 注册服务 RegisterChatServiceServer(s.grpcServer, service.Chatserver) s.healthServer = health.NewServer() grpc_health_v1.RegisterHealthServer(s.grpcServer, s.healthServer) s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) } func (s *server) startServices() { // 只保留服务启动逻辑 lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.DbConf.GrpcWebPort)) if err != nil { log.Fatalf("无法监听gRPC端口 %s: %v", config.DbConf.GrpcWebPort, err) } go func() { log.Printf("gRPC服务已启动,监听端口 :%s [PID: %d]", config.DbConf.GrpcWebPort, os.Getpid()) if err := s.grpcServer.Serve(lis); err != nil { log.Fatalf("gRPC服务启动失败: %v", err) } }() } func (s *server) setupTimedTasks(ctx context.Context) { tasks := []struct { interval time.Duration taskName string }{ {1 * time.Minute, "sendTalk"}, {12 * time.Hour, "getContacts"}, } for _, task := range tasks { task := task // 创建局部变量 s.wg.Add(1) go func() { defer s.wg.Done() service.Chatserver.StartTimedMessages(ctx, task.interval, task.taskName) }() } } func (s *server) waitForShutdown() { done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-done log.Printf("\n接收到关闭信号 [运行时间: %s]", time.Since(s.startTime).Round(time.Second)) } func (s *server) gracefulShutdown() { // 标记服务为不健康 s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) // 创建关闭上下文 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() // 关闭HTTP服务器 go func() { if err := s.httpServer.Shutdown(shutdownCtx); err != nil { log.Printf("HTTP服务关闭错误: %v", err) } }() // 关闭gRPC服务器 grpcStopped := make(chan struct{}) go func() { s.grpcServer.GracefulStop() close(grpcStopped) }() // 等待所有服务关闭或超时 select { case <-grpcStopped: log.Println("gRPC服务已正常关闭") case <-shutdownCtx.Done(): log.Println("警告: 优雅关闭超时,强制终止gRPC服务") s.grpcServer.Stop() } // 等待所有定时任务结束 s.wg.Wait() log.Println("服务已完全关闭") } // 认证拦截器 func PasswordAuthInterceptor(password string) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // 从上下文中获取元数据 md, ok := metadata.FromIncomingContext(ctx) if !ok { log.Printf("认证失败: 缺少元数据 (请求方法: %s)", info.FullMethod) return MessageAck{ MessageId: "缺少认证信息", }, status.Error(codes.Unauthenticated, "缺少认证信息") } // 检查密码 pass := md.Get("password") if len(pass) == 0 { log.Printf("认证失败: 未提供密码 (请求方法: %s)", info.FullMethod) return MessageAck{ MessageId: "未提供密码", }, status.Error(codes.Unauthenticated, "未提供密码") } if pass[0] != password { log.Printf("认证失败: 密码错误 (请求方法: %s)", info.FullMethod) return MessageAck{ MessageId: "密码错误", }, status.Error(codes.PermissionDenied, "密码错误") } log.Printf("认证成功 (请求方法: %s)", info.FullMethod) return handler(ctx, req) } }