123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package main
- import (
- "app.yhyue.com/moapp/jybase/go-xweb/httpsession"
- "app.yhyue.com/moapp/jybase/go-xweb/xweb"
- "context"
- "fmt"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/health"
- "google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- "log"
- "net"
- "net/http"
- "os"
- "os/signal"
- . "rpc/chat"
- "rpc/config"
- _ "rpc/filter"
- "rpc/service"
- "sync"
- "syscall"
- "time"
- )
- type server struct {
- httpServer *http.Server
- grpcServer *grpc.Server
- healthServer *health.Server
- wg sync.WaitGroup
- startTime time.Time
- }
- func main() {
- // 初始化配置检查
- if config.DbConf == nil {
- log.Fatal("配置未初始化")
- }
- // 创建服务器实例
- srv := &server{
- startTime: time.Now(),
- }
- // 设置上下文
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
- srv.initGRPCServer(config.DbConf.Password)
- // 启动服务
- srv.startServices()
- // 设置定时任务
- srv.setupTimedTasks(ctx)
- // 等待终止信号
- srv.waitForShutdown()
- // 优雅关闭
- srv.gracefulShutdown()
- }()
- // 初始化服务
- srv.initHTTPServer()
- }
- func (s *server) initHTTPServer() {
- // 1. 初始化xweb配置
- httpsession.IsRedisSessionStore = true
- httpsession.RedisNotLoginKey = "userId"
- xweb.Config.Profiler = true
- xweb.RootApp().BasePath = "/wxRobot"
- xweb.RootApp().AppConfig.StaticFileVersion = false
- xweb.RootApp().AppConfig.CheckXsrf = false
- xweb.RootApp().AppConfig.EnableHttpCache = false
- xweb.RootApp().AppConfig.Mode = xweb.Product
- xweb.RootApp().AppConfig.ReloadTemplates = true
- xweb.RootApp().AppConfig.SessionTimeout = 7 * 24 * time.Hour
- xweb.RootApp().Logger.SetOutputLevel(1)
- // 2. 注册控制器
- xweb.AddAction(&service.WXroBot{})
- mux1 := http.NewServeMux()
- xweb.RunBase(":"+config.DbConf.WebPort, mux1)
- }
- func (s *server) initGRPCServer(password string) {
- s.grpcServer = grpc.NewServer(
- grpc.Creds(insecure.NewCredentials()),
- grpc.ChainUnaryInterceptor(
- PasswordAuthInterceptor(password),
- ),
- grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: 10 * time.Minute,
- Timeout: 3 * time.Second,
- }),
- grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
- MinTime: 30 * time.Second,
- PermitWithoutStream: true,
- }),
- )
- // 3. 注册服务
- RegisterChatServiceServer(s.grpcServer, service.Chatserver)
- s.healthServer = health.NewServer()
- grpc_health_v1.RegisterHealthServer(s.grpcServer, s.healthServer)
- s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
- }
- func (s *server) startServices() {
- // 只保留服务启动逻辑
- lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.DbConf.GrpcWebPort))
- if err != nil {
- log.Fatalf("无法监听gRPC端口 %s: %v", config.DbConf.GrpcWebPort, err)
- }
- go func() {
- log.Printf("gRPC服务已启动,监听端口 :%s [PID: %d]",
- config.DbConf.GrpcWebPort, os.Getpid())
- if err := s.grpcServer.Serve(lis); err != nil {
- log.Fatalf("gRPC服务启动失败: %v", err)
- }
- }()
- }
- func (s *server) setupTimedTasks(ctx context.Context) {
- tasks := []struct {
- interval time.Duration
- taskName string
- }{
- {1 * time.Minute, "sendTalk"},
- {12 * time.Hour, "getContacts"},
- }
- for _, task := range tasks {
- task := task // 创建局部变量
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- service.Chatserver.StartTimedMessages(ctx, task.interval, task.taskName)
- }()
- }
- }
- func (s *server) waitForShutdown() {
- done := make(chan os.Signal, 1)
- signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
- <-done
- log.Printf("\n接收到关闭信号 [运行时间: %s]", time.Since(s.startTime).Round(time.Second))
- }
- func (s *server) gracefulShutdown() {
- // 标记服务为不健康
- s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
- // 创建关闭上下文
- shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
- defer shutdownCancel()
- // 关闭HTTP服务器
- go func() {
- if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
- log.Printf("HTTP服务关闭错误: %v", err)
- }
- }()
- // 关闭gRPC服务器
- grpcStopped := make(chan struct{})
- go func() {
- s.grpcServer.GracefulStop()
- close(grpcStopped)
- }()
- // 等待所有服务关闭或超时
- select {
- case <-grpcStopped:
- log.Println("gRPC服务已正常关闭")
- case <-shutdownCtx.Done():
- log.Println("警告: 优雅关闭超时,强制终止gRPC服务")
- s.grpcServer.Stop()
- }
- // 等待所有定时任务结束
- s.wg.Wait()
- log.Println("服务已完全关闭")
- }
- // 认证拦截器
- func PasswordAuthInterceptor(password string) grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- // 从上下文中获取元数据
- md, ok := metadata.FromIncomingContext(ctx)
- if !ok {
- log.Printf("认证失败: 缺少元数据 (请求方法: %s)", info.FullMethod)
- return MessageAck{
- MessageId: "缺少认证信息",
- }, status.Error(codes.Unauthenticated, "缺少认证信息")
- }
- // 检查密码
- pass := md.Get("password")
- if len(pass) == 0 {
- log.Printf("认证失败: 未提供密码 (请求方法: %s)", info.FullMethod)
- return MessageAck{
- MessageId: "未提供密码",
- }, status.Error(codes.Unauthenticated, "未提供密码")
- }
- if pass[0] != password {
- log.Printf("认证失败: 密码错误 (请求方法: %s)", info.FullMethod)
- return MessageAck{
- MessageId: "密码错误",
- }, status.Error(codes.PermissionDenied, "密码错误")
- }
- log.Printf("认证成功 (请求方法: %s)", info.FullMethod)
- return handler(ctx, req)
- }
- }
|