wangchuanjin 4 anos atrás
pai
commit
e1545d4969
4 arquivos alterados com 582 adições e 590 exclusões
  1. 573 0
      endless/endless.go
  2. 0 574
      endless/endless_linux.go
  3. 4 6
      go.mod
  4. 5 10
      go.sum

+ 573 - 0
endless/endless.go

@@ -1 +1,574 @@
+// +build linux
+
 package endless
+
+import (
+	"crypto/tls"
+	"errors"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+	"os"
+	"os/exec"
+	"os/signal"
+	"runtime"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	// "github.com/fvbock/uds-go/introspect"
+)
+
+const (
+	PRE_SIGNAL = iota
+	POST_SIGNAL
+
+	STATE_INIT
+	STATE_RUNNING
+	STATE_SHUTTING_DOWN
+	STATE_TERMINATE
+)
+
+var (
+	runningServerReg     sync.RWMutex
+	runningServers       map[string]*endlessServer
+	runningServersOrder  []string
+	socketPtrOffsetMap   map[string]uint
+	runningServersForked bool
+
+	DefaultReadTimeOut    time.Duration
+	DefaultWriteTimeOut   time.Duration
+	DefaultMaxHeaderBytes int
+	DefaultHammerTime     time.Duration
+
+	isChild     bool
+	socketOrder string
+
+	hookableSignals []os.Signal
+)
+
+func init() {
+	flag.BoolVar(&isChild, "continue", false, "listen on open fd (after forking)")
+	flag.StringVar(&socketOrder, "socketorder", "", "previous initialization order - used when more than one listener was started")
+
+	runningServerReg = sync.RWMutex{}
+	runningServers = make(map[string]*endlessServer)
+	runningServersOrder = []string{}
+	socketPtrOffsetMap = make(map[string]uint)
+
+	DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
+
+	// after a restart the parent will finish ongoing requests before
+	// shutting down. set to a negative value to disable
+	DefaultHammerTime = 60 * time.Second
+
+	hookableSignals = []os.Signal{
+		syscall.SIGHUP,
+		syscall.SIGUSR1,
+		syscall.SIGUSR2,
+		syscall.SIGINT,
+		syscall.SIGTERM,
+		syscall.SIGTSTP,
+	}
+}
+
+type endlessServer struct {
+	http.Server
+	EndlessListener  net.Listener
+	SignalHooks      map[int]map[os.Signal][]func()
+	tlsInnerListener *endlessListener
+	wg               sync.WaitGroup
+	sigChan          chan os.Signal
+	isChild          bool
+	state            uint8
+	lock             *sync.RWMutex
+}
+
+/*
+NewServer returns an intialized endlessServer Object. Calling Serve on it will
+actually "start" the server.
+*/
+func NewServer(addr string, handler http.Handler, fn func()) (srv *endlessServer) {
+	runningServerReg.Lock()
+	defer runningServerReg.Unlock()
+	if !flag.Parsed() {
+		flag.Parse()
+	}
+	if len(socketOrder) > 0 {
+		for i, addr := range strings.Split(socketOrder, ",") {
+			socketPtrOffsetMap[addr] = uint(i)
+		}
+	} else {
+		socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
+	}
+
+	srv = &endlessServer{
+		wg:      sync.WaitGroup{},
+		sigChan: make(chan os.Signal),
+		isChild: isChild,
+		SignalHooks: map[int]map[os.Signal][]func(){
+			PRE_SIGNAL: map[os.Signal][]func(){
+				syscall.SIGHUP: []func(){
+					fn,
+				},
+				syscall.SIGUSR1: []func(){},
+				syscall.SIGUSR2: []func(){},
+				syscall.SIGINT:  []func(){},
+				syscall.SIGTERM: []func(){},
+				syscall.SIGTSTP: []func(){},
+			},
+			POST_SIGNAL: map[os.Signal][]func(){
+				syscall.SIGHUP:  []func(){},
+				syscall.SIGUSR1: []func(){},
+				syscall.SIGUSR2: []func(){},
+				syscall.SIGINT:  []func(){},
+				syscall.SIGTERM: []func(){},
+				syscall.SIGTSTP: []func(){},
+			},
+		},
+		state: STATE_INIT,
+		lock:  &sync.RWMutex{},
+	}
+
+	srv.Server.Addr = addr
+	srv.Server.ReadTimeout = DefaultReadTimeOut
+	srv.Server.WriteTimeout = DefaultWriteTimeOut
+	srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
+	srv.Server.Handler = handler
+
+	runningServersOrder = append(runningServersOrder, addr)
+	runningServers[addr] = srv
+
+	return
+}
+
+func NetListen(addr string, handler http.Handler, fn func()) (net.Listener, error) {
+	server := NewServer(addr, handler, fn)
+	return server.ListenAndServe(false)
+}
+
+/*
+ListenAndServe listens on the TCP network address addr and then calls Serve
+with handler to handle requests on incoming connections. Handler is typically
+nil, in which case the DefaultServeMux is used.
+*/
+func ListenAndServe(addr string, handler http.Handler, fn func()) error {
+	server := NewServer(addr, handler, fn)
+	_, err := server.ListenAndServe(true)
+	return err
+}
+
+/*
+ListenAndServeTLS acts identically to ListenAndServe, except that it expects
+HTTPS connections. Additionally, files containing a certificate and matching
+private key for the server must be provided. If the certificate is signed by a
+certificate authority, the certFile should be the concatenation of the server's
+certificate followed by the CA's certificate.
+*/
+func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler, destoryfn func()) error {
+	server := NewServer(addr, handler, destoryfn)
+	return server.ListenAndServeTLS(certFile, keyFile, true)
+}
+
+func (srv *endlessServer) getState() uint8 {
+	srv.lock.RLock()
+	defer srv.lock.RUnlock()
+
+	return srv.state
+}
+
+func (srv *endlessServer) setState(st uint8) {
+	srv.lock.Lock()
+	defer srv.lock.Unlock()
+
+	srv.state = st
+}
+
+/*
+Serve accepts incoming HTTP connections on the listener l, creating a new
+service goroutine for each. The service goroutines read requests and then call
+handler to reply to them. Handler is typically nil, in which case the
+DefaultServeMux is used.
+
+In addition to the stl Serve behaviour each connection is added to a
+sync.Waitgroup so that all outstanding connections can be served before shutting
+down the server.
+*/
+func (srv *endlessServer) Serve() (err error) {
+	defer log.Println(syscall.Getpid(), "Serve() returning...")
+	srv.setState(STATE_RUNNING)
+	err = srv.Server.Serve(srv.EndlessListener)
+	log.Println(syscall.Getpid(), "Waiting for connections to finish...")
+	srv.wg.Wait()
+	srv.setState(STATE_TERMINATE)
+	return
+}
+
+/*
+ListenAndServe listens on the TCP network address srv.Addr and then calls Serve
+to handle requests on incoming connections. If srv.Addr is blank, ":http" is
+used.
+*/
+func (srv *endlessServer) ListenAndServe(isServer bool) (l net.Listener, err error) {
+	addr := srv.Addr
+	if addr == "" {
+		addr = ":http"
+	}
+
+	go srv.handleSignals(isServer)
+
+	l, err = srv.getListener(addr)
+	if err != nil {
+		log.Println(err)
+		return
+	}
+
+	srv.EndlessListener = newEndlessListener(l, srv)
+
+	if srv.isChild {
+		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
+	}
+
+	log.Println(syscall.Getpid(), srv.Addr)
+	if isServer {
+		return l, srv.Serve()
+	} else {
+		return l, err
+	}
+}
+
+/*
+ListenAndServeTLS listens on the TCP network address srv.Addr and then calls
+Serve to handle requests on incoming TLS connections.
+
+Filenames containing a certificate and matching private key for the server must
+be provided. If the certificate is signed by a certificate authority, the
+certFile should be the concatenation of the server's certificate followed by the
+CA's certificate.
+
+If srv.Addr is blank, ":https" is used.
+*/
+func (srv *endlessServer) ListenAndServeTLS(certFile, keyFile string, isServer bool) (err error) {
+	addr := srv.Addr
+	if addr == "" {
+		addr = ":https"
+	}
+
+	config := &tls.Config{}
+	if srv.TLSConfig != nil {
+		*config = *srv.TLSConfig
+	}
+	if config.NextProtos == nil {
+		config.NextProtos = []string{"http/1.1"}
+	}
+
+	config.Certificates = make([]tls.Certificate, 1)
+	config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
+	if err != nil {
+		return
+	}
+
+	go srv.handleSignals(isServer)
+
+	l, err := srv.getListener(addr)
+	if err != nil {
+		log.Println(err)
+		return
+	}
+
+	srv.tlsInnerListener = newEndlessListener(l, srv)
+	srv.EndlessListener = tls.NewListener(srv.tlsInnerListener, config)
+
+	if srv.isChild {
+		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
+	}
+
+	log.Println(syscall.Getpid(), srv.Addr)
+	return srv.Serve()
+}
+
+/*
+getListener either opens a new socket to listen on, or takes the acceptor socket
+it got passed when restarted.
+*/
+func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
+	if srv.isChild {
+		var ptrOffset uint = 0
+		runningServerReg.RLock()
+		defer runningServerReg.RUnlock()
+		if len(socketPtrOffsetMap) > 0 {
+			ptrOffset = socketPtrOffsetMap[laddr]
+			// log.Println("laddr", laddr, "ptr offset", socketPtrOffsetMap[laddr])
+		}
+
+		f := os.NewFile(uintptr(3+ptrOffset), "")
+		l, err = net.FileListener(f)
+		if err != nil {
+			err = fmt.Errorf("net.FileListener error: %v", err)
+			return
+		}
+	} else {
+		l, err = net.Listen("tcp", laddr)
+		if err != nil {
+			err = fmt.Errorf("net.Listen error: %v", err)
+			return
+		}
+	}
+	return
+}
+
+/*
+handleSignals listens for os Signals and calls any hooked in function that the
+user had registered with the signal.
+*/
+func (srv *endlessServer) handleSignals(isServer bool) {
+	var sig os.Signal
+
+	signal.Notify(
+		srv.sigChan,
+		hookableSignals...,
+	)
+
+	pid := syscall.Getpid()
+	for {
+		sig = <-srv.sigChan
+		srv.signalHooks(PRE_SIGNAL, sig)
+		switch sig {
+		case syscall.SIGHUP:
+			log.Println(pid, "Received SIGHUP. forking.")
+			err := srv.fork()
+			if err != nil {
+				log.Println("Fork err:", err)
+			}
+		case syscall.SIGUSR1:
+			log.Println(pid, "Received SIGUSR1.")
+		case syscall.SIGUSR2:
+			log.Println(pid, "Received SIGUSR2.")
+			srv.hammerTime(0 * time.Second)
+		case syscall.SIGINT:
+			log.Println(pid, "Received SIGINT.")
+			srv.shutdown(isServer)
+		case syscall.SIGTERM:
+			log.Println(pid, "Received SIGTERM.")
+			srv.shutdown(isServer)
+		case syscall.SIGTSTP:
+			log.Println(pid, "Received SIGTSTP.")
+		default:
+			log.Printf("Received %v: nothing i care about...\n", sig)
+		}
+		srv.signalHooks(POST_SIGNAL, sig)
+	}
+}
+
+func (srv *endlessServer) signalHooks(ppFlag int, sig os.Signal) {
+	if _, notSet := srv.SignalHooks[ppFlag][sig]; !notSet {
+		return
+	}
+	for _, f := range srv.SignalHooks[ppFlag][sig] {
+		f()
+	}
+	return
+}
+
+/*
+shutdown closes the listener so that no new connections are accepted. it also
+starts a goroutine that will hammer (stop all running requests) the server
+after DefaultHammerTime.
+*/
+func (srv *endlessServer) shutdown(isServer bool) {
+	if isServer && srv.getState() != STATE_RUNNING {
+		return
+	}
+
+	srv.setState(STATE_SHUTTING_DOWN)
+	if DefaultHammerTime >= 0 {
+		go srv.hammerTime(DefaultHammerTime)
+	}
+	// disable keep-alives on existing connections
+	srv.SetKeepAlivesEnabled(false)
+	err := srv.EndlessListener.Close()
+	if err != nil {
+		log.Println(syscall.Getpid(), "Listener.Close() error:", err)
+	} else {
+		log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
+	}
+}
+
+/*
+hammerTime forces the server to shutdown in a given timeout - whether it
+finished outstanding requests or not. if Read/WriteTimeout are not set or the
+max header size is very big a connection could hang...
+
+srv.Serve() will not return until all connections are served. this will
+unblock the srv.wg.Wait() in Serve() thus causing ListenAndServe(TLS) to
+return.
+*/
+func (srv *endlessServer) hammerTime(d time.Duration) {
+	defer func() {
+		// we are calling srv.wg.Done() until it panics which means we called
+		// Done() when the counter was already at 0 and we're done.
+		// (and thus Serve() will return and the parent will exit)
+		if r := recover(); r != nil {
+			log.Println("WaitGroup at 0", r)
+		}
+	}()
+	if srv.getState() != STATE_SHUTTING_DOWN {
+		return
+	}
+	time.Sleep(d)
+	log.Println("[STOP - Hammer Time] Forcefully shutting down parent")
+	for {
+		if srv.getState() == STATE_TERMINATE {
+			break
+		}
+		srv.wg.Done()
+		runtime.Gosched()
+	}
+}
+
+func (srv *endlessServer) fork() (err error) {
+	runningServerReg.Lock()
+	defer runningServerReg.Unlock()
+
+	// only one server isntance should fork!
+	if runningServersForked {
+		return errors.New("Another process already forked. Ignoring this one.")
+	}
+
+	runningServersForked = true
+
+	var files = make([]*os.File, len(runningServers))
+	var orderArgs = make([]string, len(runningServers))
+	// get the accessor socket fds for _all_ server instances
+	for _, srvPtr := range runningServers {
+		// introspect.PrintTypeDump(srvPtr.EndlessListener)
+		switch srvPtr.EndlessListener.(type) {
+		case *endlessListener:
+			// normal listener
+			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
+		default:
+			// tls listener
+			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
+		}
+		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
+	}
+
+	// log.Println(files)
+	path := os.Args[0]
+	var args []string
+	if len(os.Args) > 1 {
+		for _, arg := range os.Args[1:] {
+			if arg == "-continue" {
+				break
+			}
+			args = append(args, arg)
+		}
+	}
+	args = append(args, "-continue")
+	if len(runningServers) > 1 {
+		args = append(args, fmt.Sprintf(`-socketorder=%s`, strings.Join(orderArgs, ",")))
+		// log.Println(args)
+	}
+
+	cmd := exec.Command(path, args...)
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+	cmd.ExtraFiles = files
+	// cmd.SysProcAttr = &syscall.SysProcAttr{
+	// 	Setsid:  true,
+	// 	Setctty: true,
+	// 	Ctty:    ,
+	// }
+
+	err = cmd.Start()
+	if err != nil {
+		log.Fatalf("Restart: Failed to launch, error: %v", err)
+	}
+
+	return
+}
+
+type endlessListener struct {
+	net.Listener
+	stopped bool
+	server  *endlessServer
+}
+
+func (el *endlessListener) Accept() (c net.Conn, err error) {
+	tc, err := el.Listener.(*net.TCPListener).AcceptTCP()
+	if err != nil {
+		return
+	}
+
+	tc.SetKeepAlive(true)                  // see http.tcpKeepAliveListener
+	tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
+
+	c = endlessConn{
+		Conn:   tc,
+		server: el.server,
+	}
+
+	el.server.wg.Add(1)
+	return
+}
+
+func newEndlessListener(l net.Listener, srv *endlessServer) (el *endlessListener) {
+	el = &endlessListener{
+		Listener: l,
+		server:   srv,
+	}
+
+	return
+}
+
+func (el *endlessListener) Close() error {
+	if el.stopped {
+		return syscall.EINVAL
+	}
+
+	el.stopped = true
+	return el.Listener.Close()
+}
+
+func (el *endlessListener) File() *os.File {
+	// returns a dup(2) - FD_CLOEXEC flag *not* set
+	tl := el.Listener.(*net.TCPListener)
+	fl, _ := tl.File()
+	return fl
+}
+
+type endlessConn struct {
+	net.Conn
+	server *endlessServer
+}
+
+func (w endlessConn) Close() error {
+	err := w.Conn.Close()
+	if err == nil {
+		w.server.wg.Done()
+	}
+	return err
+}
+
+/*
+RegisterSignalHook registers a function to be run PRE_SIGNAL or POST_SIGNAL for
+a given signal. PRE or POST in this case means before or after the signal
+related code endless itself runs
+*/
+func (srv *endlessServer) RegisterSignalHook(prePost int, sig os.Signal, f func()) (err error) {
+	if prePost != PRE_SIGNAL && prePost != POST_SIGNAL {
+		err = fmt.Errorf("Cannot use %v for prePost arg. Must be endless.PRE_SIGNAL or endless.POST_SIGNAL.")
+		return
+	}
+	for _, s := range hookableSignals {
+		if s == sig {
+			srv.SignalHooks[prePost][sig] = append(srv.SignalHooks[prePost][sig], f)
+			return
+		}
+	}
+	err = fmt.Errorf("Signal %v is not supported.")
+	return
+}

+ 0 - 574
endless/endless_linux.go

@@ -1,574 +0,0 @@
-// +build linux
-
-package endless
-
-import (
-	"crypto/tls"
-	"errors"
-	"flag"
-	"fmt"
-	"log"
-	"net"
-	"net/http"
-	"os"
-	"os/exec"
-	"os/signal"
-	"runtime"
-	"strings"
-	"sync"
-	"syscall"
-	"time"
-
-	// "github.com/fvbock/uds-go/introspect"
-)
-
-const (
-	PRE_SIGNAL = iota
-	POST_SIGNAL
-
-	STATE_INIT
-	STATE_RUNNING
-	STATE_SHUTTING_DOWN
-	STATE_TERMINATE
-)
-
-var (
-	runningServerReg     sync.RWMutex
-	runningServers       map[string]*endlessServer
-	runningServersOrder  []string
-	socketPtrOffsetMap   map[string]uint
-	runningServersForked bool
-
-	DefaultReadTimeOut    time.Duration
-	DefaultWriteTimeOut   time.Duration
-	DefaultMaxHeaderBytes int
-	DefaultHammerTime     time.Duration
-
-	isChild     bool
-	socketOrder string
-
-	hookableSignals []os.Signal
-)
-
-func init() {
-	flag.BoolVar(&isChild, "continue", false, "listen on open fd (after forking)")
-	flag.StringVar(&socketOrder, "socketorder", "", "previous initialization order - used when more than one listener was started")
-
-	runningServerReg = sync.RWMutex{}
-	runningServers = make(map[string]*endlessServer)
-	runningServersOrder = []string{}
-	socketPtrOffsetMap = make(map[string]uint)
-
-	DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
-
-	// after a restart the parent will finish ongoing requests before
-	// shutting down. set to a negative value to disable
-	DefaultHammerTime = 60 * time.Second
-
-	hookableSignals = []os.Signal{
-		syscall.SIGHUP,
-		syscall.SIGUSR1,
-		syscall.SIGUSR2,
-		syscall.SIGINT,
-		syscall.SIGTERM,
-		syscall.SIGTSTP,
-	}
-}
-
-type endlessServer struct {
-	http.Server
-	EndlessListener  net.Listener
-	SignalHooks      map[int]map[os.Signal][]func()
-	tlsInnerListener *endlessListener
-	wg               sync.WaitGroup
-	sigChan          chan os.Signal
-	isChild          bool
-	state            uint8
-	lock             *sync.RWMutex
-}
-
-/*
-NewServer returns an intialized endlessServer Object. Calling Serve on it will
-actually "start" the server.
-*/
-func NewServer(addr string, handler http.Handler, fn func()) (srv *endlessServer) {
-	runningServerReg.Lock()
-	defer runningServerReg.Unlock()
-	if !flag.Parsed() {
-		flag.Parse()
-	}
-	if len(socketOrder) > 0 {
-		for i, addr := range strings.Split(socketOrder, ",") {
-			socketPtrOffsetMap[addr] = uint(i)
-		}
-	} else {
-		socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
-	}
-
-	srv = &endlessServer{
-		wg:      sync.WaitGroup{},
-		sigChan: make(chan os.Signal),
-		isChild: isChild,
-		SignalHooks: map[int]map[os.Signal][]func(){
-			PRE_SIGNAL: map[os.Signal][]func(){
-				syscall.SIGHUP: []func(){
-					fn,
-				},
-				syscall.SIGUSR1: []func(){},
-				syscall.SIGUSR2: []func(){},
-				syscall.SIGINT:  []func(){},
-				syscall.SIGTERM: []func(){},
-				syscall.SIGTSTP: []func(){},
-			},
-			POST_SIGNAL: map[os.Signal][]func(){
-				syscall.SIGHUP:  []func(){},
-				syscall.SIGUSR1: []func(){},
-				syscall.SIGUSR2: []func(){},
-				syscall.SIGINT:  []func(){},
-				syscall.SIGTERM: []func(){},
-				syscall.SIGTSTP: []func(){},
-			},
-		},
-		state: STATE_INIT,
-		lock:  &sync.RWMutex{},
-	}
-
-	srv.Server.Addr = addr
-	srv.Server.ReadTimeout = DefaultReadTimeOut
-	srv.Server.WriteTimeout = DefaultWriteTimeOut
-	srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
-	srv.Server.Handler = handler
-
-	runningServersOrder = append(runningServersOrder, addr)
-	runningServers[addr] = srv
-
-	return
-}
-
-func NetListen(addr string, handler http.Handler, fn func()) (net.Listener, error) {
-	server := NewServer(addr, handler, fn)
-	return server.ListenAndServe(false)
-}
-
-/*
-ListenAndServe listens on the TCP network address addr and then calls Serve
-with handler to handle requests on incoming connections. Handler is typically
-nil, in which case the DefaultServeMux is used.
-*/
-func ListenAndServe(addr string, handler http.Handler, fn func()) error {
-	server := NewServer(addr, handler, fn)
-	_, err := server.ListenAndServe(true)
-	return err
-}
-
-/*
-ListenAndServeTLS acts identically to ListenAndServe, except that it expects
-HTTPS connections. Additionally, files containing a certificate and matching
-private key for the server must be provided. If the certificate is signed by a
-certificate authority, the certFile should be the concatenation of the server's
-certificate followed by the CA's certificate.
-*/
-func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler, destoryfn func()) error {
-	server := NewServer(addr, handler, destoryfn)
-	return server.ListenAndServeTLS(certFile, keyFile, true)
-}
-
-func (srv *endlessServer) getState() uint8 {
-	srv.lock.RLock()
-	defer srv.lock.RUnlock()
-
-	return srv.state
-}
-
-func (srv *endlessServer) setState(st uint8) {
-	srv.lock.Lock()
-	defer srv.lock.Unlock()
-
-	srv.state = st
-}
-
-/*
-Serve accepts incoming HTTP connections on the listener l, creating a new
-service goroutine for each. The service goroutines read requests and then call
-handler to reply to them. Handler is typically nil, in which case the
-DefaultServeMux is used.
-
-In addition to the stl Serve behaviour each connection is added to a
-sync.Waitgroup so that all outstanding connections can be served before shutting
-down the server.
-*/
-func (srv *endlessServer) Serve() (err error) {
-	defer log.Println(syscall.Getpid(), "Serve() returning...")
-	srv.setState(STATE_RUNNING)
-	err = srv.Server.Serve(srv.EndlessListener)
-	log.Println(syscall.Getpid(), "Waiting for connections to finish...")
-	srv.wg.Wait()
-	srv.setState(STATE_TERMINATE)
-	return
-}
-
-/*
-ListenAndServe listens on the TCP network address srv.Addr and then calls Serve
-to handle requests on incoming connections. If srv.Addr is blank, ":http" is
-used.
-*/
-func (srv *endlessServer) ListenAndServe(isServer bool) (l net.Listener, err error) {
-	addr := srv.Addr
-	if addr == "" {
-		addr = ":http"
-	}
-
-	go srv.handleSignals(isServer)
-
-	l, err = srv.getListener(addr)
-	if err != nil {
-		log.Println(err)
-		return
-	}
-
-	srv.EndlessListener = newEndlessListener(l, srv)
-
-	if srv.isChild {
-		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
-	}
-
-	log.Println(syscall.Getpid(), srv.Addr)
-	if isServer {
-		return l, srv.Serve()
-	} else {
-		return l, err
-	}
-}
-
-/*
-ListenAndServeTLS listens on the TCP network address srv.Addr and then calls
-Serve to handle requests on incoming TLS connections.
-
-Filenames containing a certificate and matching private key for the server must
-be provided. If the certificate is signed by a certificate authority, the
-certFile should be the concatenation of the server's certificate followed by the
-CA's certificate.
-
-If srv.Addr is blank, ":https" is used.
-*/
-func (srv *endlessServer) ListenAndServeTLS(certFile, keyFile string, isServer bool) (err error) {
-	addr := srv.Addr
-	if addr == "" {
-		addr = ":https"
-	}
-
-	config := &tls.Config{}
-	if srv.TLSConfig != nil {
-		*config = *srv.TLSConfig
-	}
-	if config.NextProtos == nil {
-		config.NextProtos = []string{"http/1.1"}
-	}
-
-	config.Certificates = make([]tls.Certificate, 1)
-	config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
-	if err != nil {
-		return
-	}
-
-	go srv.handleSignals(isServer)
-
-	l, err := srv.getListener(addr)
-	if err != nil {
-		log.Println(err)
-		return
-	}
-
-	srv.tlsInnerListener = newEndlessListener(l, srv)
-	srv.EndlessListener = tls.NewListener(srv.tlsInnerListener, config)
-
-	if srv.isChild {
-		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
-	}
-
-	log.Println(syscall.Getpid(), srv.Addr)
-	return srv.Serve()
-}
-
-/*
-getListener either opens a new socket to listen on, or takes the acceptor socket
-it got passed when restarted.
-*/
-func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
-	if srv.isChild {
-		var ptrOffset uint = 0
-		runningServerReg.RLock()
-		defer runningServerReg.RUnlock()
-		if len(socketPtrOffsetMap) > 0 {
-			ptrOffset = socketPtrOffsetMap[laddr]
-			// log.Println("laddr", laddr, "ptr offset", socketPtrOffsetMap[laddr])
-		}
-
-		f := os.NewFile(uintptr(3+ptrOffset), "")
-		l, err = net.FileListener(f)
-		if err != nil {
-			err = fmt.Errorf("net.FileListener error: %v", err)
-			return
-		}
-	} else {
-		l, err = net.Listen("tcp", laddr)
-		if err != nil {
-			err = fmt.Errorf("net.Listen error: %v", err)
-			return
-		}
-	}
-	return
-}
-
-/*
-handleSignals listens for os Signals and calls any hooked in function that the
-user had registered with the signal.
-*/
-func (srv *endlessServer) handleSignals(isServer bool) {
-	var sig os.Signal
-
-	signal.Notify(
-		srv.sigChan,
-		hookableSignals...,
-	)
-
-	pid := syscall.Getpid()
-	for {
-		sig = <-srv.sigChan
-		srv.signalHooks(PRE_SIGNAL, sig)
-		switch sig {
-		case syscall.SIGHUP:
-			log.Println(pid, "Received SIGHUP. forking.")
-			err := srv.fork()
-			if err != nil {
-				log.Println("Fork err:", err)
-			}
-		case syscall.SIGUSR1:
-			log.Println(pid, "Received SIGUSR1.")
-		case syscall.SIGUSR2:
-			log.Println(pid, "Received SIGUSR2.")
-			srv.hammerTime(0 * time.Second)
-		case syscall.SIGINT:
-			log.Println(pid, "Received SIGINT.")
-			srv.shutdown(isServer)
-		case syscall.SIGTERM:
-			log.Println(pid, "Received SIGTERM.")
-			srv.shutdown(isServer)
-		case syscall.SIGTSTP:
-			log.Println(pid, "Received SIGTSTP.")
-		default:
-			log.Printf("Received %v: nothing i care about...\n", sig)
-		}
-		srv.signalHooks(POST_SIGNAL, sig)
-	}
-}
-
-func (srv *endlessServer) signalHooks(ppFlag int, sig os.Signal) {
-	if _, notSet := srv.SignalHooks[ppFlag][sig]; !notSet {
-		return
-	}
-	for _, f := range srv.SignalHooks[ppFlag][sig] {
-		f()
-	}
-	return
-}
-
-/*
-shutdown closes the listener so that no new connections are accepted. it also
-starts a goroutine that will hammer (stop all running requests) the server
-after DefaultHammerTime.
-*/
-func (srv *endlessServer) shutdown(isServer bool) {
-	if isServer && srv.getState() != STATE_RUNNING {
-		return
-	}
-
-	srv.setState(STATE_SHUTTING_DOWN)
-	if DefaultHammerTime >= 0 {
-		go srv.hammerTime(DefaultHammerTime)
-	}
-	// disable keep-alives on existing connections
-	srv.SetKeepAlivesEnabled(false)
-	err := srv.EndlessListener.Close()
-	if err != nil {
-		log.Println(syscall.Getpid(), "Listener.Close() error:", err)
-	} else {
-		log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
-	}
-}
-
-/*
-hammerTime forces the server to shutdown in a given timeout - whether it
-finished outstanding requests or not. if Read/WriteTimeout are not set or the
-max header size is very big a connection could hang...
-
-srv.Serve() will not return until all connections are served. this will
-unblock the srv.wg.Wait() in Serve() thus causing ListenAndServe(TLS) to
-return.
-*/
-func (srv *endlessServer) hammerTime(d time.Duration) {
-	defer func() {
-		// we are calling srv.wg.Done() until it panics which means we called
-		// Done() when the counter was already at 0 and we're done.
-		// (and thus Serve() will return and the parent will exit)
-		if r := recover(); r != nil {
-			log.Println("WaitGroup at 0", r)
-		}
-	}()
-	if srv.getState() != STATE_SHUTTING_DOWN {
-		return
-	}
-	time.Sleep(d)
-	log.Println("[STOP - Hammer Time] Forcefully shutting down parent")
-	for {
-		if srv.getState() == STATE_TERMINATE {
-			break
-		}
-		srv.wg.Done()
-		runtime.Gosched()
-	}
-}
-
-func (srv *endlessServer) fork() (err error) {
-	runningServerReg.Lock()
-	defer runningServerReg.Unlock()
-
-	// only one server isntance should fork!
-	if runningServersForked {
-		return errors.New("Another process already forked. Ignoring this one.")
-	}
-
-	runningServersForked = true
-
-	var files = make([]*os.File, len(runningServers))
-	var orderArgs = make([]string, len(runningServers))
-	// get the accessor socket fds for _all_ server instances
-	for _, srvPtr := range runningServers {
-		// introspect.PrintTypeDump(srvPtr.EndlessListener)
-		switch srvPtr.EndlessListener.(type) {
-		case *endlessListener:
-			// normal listener
-			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
-		default:
-			// tls listener
-			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
-		}
-		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
-	}
-
-	// log.Println(files)
-	path := os.Args[0]
-	var args []string
-	if len(os.Args) > 1 {
-		for _, arg := range os.Args[1:] {
-			if arg == "-continue" {
-				break
-			}
-			args = append(args, arg)
-		}
-	}
-	args = append(args, "-continue")
-	if len(runningServers) > 1 {
-		args = append(args, fmt.Sprintf(`-socketorder=%s`, strings.Join(orderArgs, ",")))
-		// log.Println(args)
-	}
-
-	cmd := exec.Command(path, args...)
-	cmd.Stdout = os.Stdout
-	cmd.Stderr = os.Stderr
-	cmd.ExtraFiles = files
-	// cmd.SysProcAttr = &syscall.SysProcAttr{
-	// 	Setsid:  true,
-	// 	Setctty: true,
-	// 	Ctty:    ,
-	// }
-
-	err = cmd.Start()
-	if err != nil {
-		log.Fatalf("Restart: Failed to launch, error: %v", err)
-	}
-
-	return
-}
-
-type endlessListener struct {
-	net.Listener
-	stopped bool
-	server  *endlessServer
-}
-
-func (el *endlessListener) Accept() (c net.Conn, err error) {
-	tc, err := el.Listener.(*net.TCPListener).AcceptTCP()
-	if err != nil {
-		return
-	}
-
-	tc.SetKeepAlive(true)                  // see http.tcpKeepAliveListener
-	tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
-
-	c = endlessConn{
-		Conn:   tc,
-		server: el.server,
-	}
-
-	el.server.wg.Add(1)
-	return
-}
-
-func newEndlessListener(l net.Listener, srv *endlessServer) (el *endlessListener) {
-	el = &endlessListener{
-		Listener: l,
-		server:   srv,
-	}
-
-	return
-}
-
-func (el *endlessListener) Close() error {
-	if el.stopped {
-		return syscall.EINVAL
-	}
-
-	el.stopped = true
-	return el.Listener.Close()
-}
-
-func (el *endlessListener) File() *os.File {
-	// returns a dup(2) - FD_CLOEXEC flag *not* set
-	tl := el.Listener.(*net.TCPListener)
-	fl, _ := tl.File()
-	return fl
-}
-
-type endlessConn struct {
-	net.Conn
-	server *endlessServer
-}
-
-func (w endlessConn) Close() error {
-	err := w.Conn.Close()
-	if err == nil {
-		w.server.wg.Done()
-	}
-	return err
-}
-
-/*
-RegisterSignalHook registers a function to be run PRE_SIGNAL or POST_SIGNAL for
-a given signal. PRE or POST in this case means before or after the signal
-related code endless itself runs
-*/
-func (srv *endlessServer) RegisterSignalHook(prePost int, sig os.Signal, f func()) (err error) {
-	if prePost != PRE_SIGNAL && prePost != POST_SIGNAL {
-		err = fmt.Errorf("Cannot use %v for prePost arg. Must be endless.PRE_SIGNAL or endless.POST_SIGNAL.")
-		return
-	}
-	for _, s := range hookableSignals {
-		if s == sig {
-			srv.SignalHooks[prePost][sig] = append(srv.SignalHooks[prePost][sig], f)
-			return
-		}
-	}
-	err = fmt.Errorf("Signal %v is not supported.")
-	return
-}

+ 4 - 6
go.mod

@@ -3,14 +3,12 @@ module app.yhyue.com/moapp/jybase
 go 1.13
 
 require (
-	app.yhyue.com/moapp/jybase/gomail v0.0.0-20210315065154-753a92810b4d
 	github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f
 	github.com/garyburd/redigo v1.6.2
-	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/olivere/elastic/v7 v7.0.22
 	go.mongodb.org/mongo-driver v1.5.0
-	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
-	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df // indirect
-	gorm.io/driver/mysql v1.0.3
-	gorm.io/gorm v1.20.8
+	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc
+	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
+	gorm.io/driver/mysql v1.0.5
+	gorm.io/gorm v1.21.3
 )

+ 5 - 10
go.sum

@@ -1,8 +1,5 @@
-app.yhyue.com/moapp/jybase/gomail v0.0.0-20210315065154-753a92810b4d h1:hohU4CXz797FmxgyOPqEiudhGjcx7coeCghAFRDwE64=
-app.yhyue.com/moapp/jybase/gomail v0.0.0-20210315065154-753a92810b4d/go.mod h1:+TEkLzZiV2L9VCcHVsp9v+4Ajf1Bjp8IkSt0TprGgQM=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/aws/aws-sdk-go v1.34.28 h1:sscPpn/Ns3i0F4HPEWAVcwdIRaZZCuL7llJ2/60yPIk=
 github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
 github.com/aws/aws-sdk-go v1.35.20 h1:Hs7x9Czh+MMPnZLQqHhsuZKeNFA3Vuf7pdy2r5QlVb0=
 github.com/aws/aws-sdk-go v1.35.20/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
@@ -77,9 +74,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
 github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
-github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
-github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
 github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
 github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
@@ -189,9 +185,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gorm.io/driver/mysql v1.0.3 h1:+JKBYPfn1tygR1/of/Fh2T8iwuVwzt+PEJmKaXzMQXg=
-gorm.io/driver/mysql v1.0.3/go.mod h1:twGxftLBlFgNVNakL7F+P/x9oYqoymG3YYT8cAfI9oI=
-gorm.io/gorm v1.20.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
-gorm.io/gorm v1.20.8 h1:iToaOdZgjNvlc44NFkxfLa3U9q63qwaxt0FdNCiwOMs=
-gorm.io/gorm v1.20.8/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
+gorm.io/driver/mysql v1.0.5 h1:WAAmvLK2rG0tCOqrf5XcLi2QUwugd4rcVJ/W3aoon9o=
+gorm.io/driver/mysql v1.0.5/go.mod h1:N1OIhHAIhx5SunkMGqWbGFVeh4yTNWKmMo1GOAsohLI=
+gorm.io/gorm v1.21.3 h1:qDFi55ZOsjZTwk5eN+uhAmHi8GysJ/qCTichM/yO7ME=
+gorm.io/gorm v1.21.3/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=