Improve graceful manager code/comment (#28063)
The graceful manager has some bugs (#27643, #28062). This is a preparation for further fixes.
This commit is contained in:
parent
f65977df3a
commit
79394b340d
8 changed files with 29 additions and 83 deletions
|
@ -7,6 +7,13 @@ import (
|
||||||
"context"
|
"context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Shutdown procedure:
|
||||||
|
// * cancel ShutdownContext: the registered context consumers have time to do their cleanup (they could use the hammer context)
|
||||||
|
// * cancel HammerContext: the all context consumers have limited time to do their cleanup (wait for a few seconds)
|
||||||
|
// * cancel TerminateContext: the registered context consumers have time to do their cleanup (but they shouldn't use shutdown/hammer context anymore)
|
||||||
|
// * cancel manager context
|
||||||
|
// If the shutdown is triggered again during the shutdown procedure, the hammer context will be canceled immediately to force to shut down.
|
||||||
|
|
||||||
// ShutdownContext returns a context.Context that is Done at shutdown
|
// ShutdownContext returns a context.Context that is Done at shutdown
|
||||||
// Callers using this context should ensure that they are registered as a running server
|
// Callers using this context should ensure that they are registered as a running server
|
||||||
// in order that they are waited for.
|
// in order that they are waited for.
|
||||||
|
|
|
@ -39,10 +39,10 @@ type RunCanceler interface {
|
||||||
// and add a function to call manager.InformCleanup if it's not going to be used
|
// and add a function to call manager.InformCleanup if it's not going to be used
|
||||||
const numberOfServersToCreate = 4
|
const numberOfServersToCreate = 4
|
||||||
|
|
||||||
// Manager represents the graceful server manager interface
|
var (
|
||||||
var manager *Manager
|
manager *Manager
|
||||||
|
initOnce sync.Once
|
||||||
var initOnce = sync.Once{}
|
)
|
||||||
|
|
||||||
// GetManager returns the Manager
|
// GetManager returns the Manager
|
||||||
func GetManager() *Manager {
|
func GetManager() *Manager {
|
||||||
|
@ -147,12 +147,12 @@ func (g *Manager) doShutdown() {
|
||||||
go g.doHammerTime(setting.GracefulHammerTime)
|
go g.doHammerTime(setting.GracefulHammerTime)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
g.WaitForServers()
|
g.runningServerWaitGroup.Wait()
|
||||||
// Mop up any remaining unclosed events.
|
// Mop up any remaining unclosed events.
|
||||||
g.doHammerTime(0)
|
g.doHammerTime(0)
|
||||||
<-time.After(1 * time.Second)
|
<-time.After(1 * time.Second)
|
||||||
g.doTerminate()
|
g.doTerminate()
|
||||||
g.WaitForTerminate()
|
g.terminateWaitGroup.Wait()
|
||||||
g.lock.Lock()
|
g.lock.Lock()
|
||||||
g.managerCtxCancel()
|
g.managerCtxCancel()
|
||||||
g.lock.Unlock()
|
g.lock.Unlock()
|
||||||
|
@ -199,26 +199,18 @@ func (g *Manager) IsChild() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsShutdown returns a channel which will be closed at shutdown.
|
// IsShutdown returns a channel which will be closed at shutdown.
|
||||||
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
// The order of closure is shutdown, hammer (potentially), terminate
|
||||||
func (g *Manager) IsShutdown() <-chan struct{} {
|
func (g *Manager) IsShutdown() <-chan struct{} {
|
||||||
return g.shutdownCtx.Done()
|
return g.shutdownCtx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsHammer returns a channel which will be closed at hammer
|
// IsHammer returns a channel which will be closed at hammer.
|
||||||
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
|
||||||
// Servers running within the running server wait group should respond to IsHammer
|
// Servers running within the running server wait group should respond to IsHammer
|
||||||
// if not shutdown already
|
// if not shutdown already
|
||||||
func (g *Manager) IsHammer() <-chan struct{} {
|
func (g *Manager) IsHammer() <-chan struct{} {
|
||||||
return g.hammerCtx.Done()
|
return g.hammerCtx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsTerminate returns a channel which will be closed at terminate
|
|
||||||
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
|
|
||||||
// IsTerminate will only close once all running servers have stopped
|
|
||||||
func (g *Manager) IsTerminate() <-chan struct{} {
|
|
||||||
return g.terminateCtx.Done()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerDone declares a running server done and subtracts one from the
|
// ServerDone declares a running server done and subtracts one from the
|
||||||
// running server wait group. Users probably do not want to call this
|
// running server wait group. Users probably do not want to call this
|
||||||
// and should use one of the RunWithShutdown* functions
|
// and should use one of the RunWithShutdown* functions
|
||||||
|
@ -226,28 +218,7 @@ func (g *Manager) ServerDone() {
|
||||||
g.runningServerWaitGroup.Done()
|
g.runningServerWaitGroup.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForServers waits for all running servers to finish. Users should probably
|
|
||||||
// instead use AtTerminate or IsTerminate
|
|
||||||
func (g *Manager) WaitForServers() {
|
|
||||||
g.runningServerWaitGroup.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitForTerminate waits for all terminating actions to finish.
|
|
||||||
// Only the main go-routine should use this
|
|
||||||
func (g *Manager) WaitForTerminate() {
|
|
||||||
g.terminateWaitGroup.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Manager) getState() state {
|
|
||||||
g.lock.RLock()
|
|
||||||
defer g.lock.RUnlock()
|
|
||||||
return g.state
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Manager) setStateTransition(old, new state) bool {
|
func (g *Manager) setStateTransition(old, new state) bool {
|
||||||
if old != g.getState() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
g.lock.Lock()
|
g.lock.Lock()
|
||||||
if g.state != old {
|
if g.state != old {
|
||||||
g.lock.Unlock()
|
g.lock.Unlock()
|
||||||
|
@ -258,13 +229,6 @@ func (g *Manager) setStateTransition(old, new state) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Manager) setState(st state) {
|
|
||||||
g.lock.Lock()
|
|
||||||
defer g.lock.Unlock()
|
|
||||||
|
|
||||||
g.state = st
|
|
||||||
}
|
|
||||||
|
|
||||||
// InformCleanup tells the cleanup wait group that we have either taken a listener or will not be taking a listener.
|
// InformCleanup tells the cleanup wait group that we have either taken a listener or will not be taking a listener.
|
||||||
// At the moment the total number of servers (numberOfServersToCreate) are pre-defined as a const before global init,
|
// At the moment the total number of servers (numberOfServersToCreate) are pre-defined as a const before global init,
|
||||||
// so this function MUST be called if a server is not used.
|
// so this function MUST be called if a server is not used.
|
||||||
|
|
|
@ -107,7 +107,9 @@ func (g *Manager) start(ctx context.Context) {
|
||||||
defer pprof.SetGoroutineLabels(ctx)
|
defer pprof.SetGoroutineLabels(ctx)
|
||||||
|
|
||||||
// Set the running state & handle signals
|
// Set the running state & handle signals
|
||||||
g.setState(stateRunning)
|
if !g.setStateTransition(stateInit, stateRunning) {
|
||||||
|
panic("invalid graceful manager state: transition from init to running failed")
|
||||||
|
}
|
||||||
g.notify(statusMsg("Starting Gitea"))
|
g.notify(statusMsg("Starting Gitea"))
|
||||||
g.notify(pidMsg())
|
g.notify(pidMsg())
|
||||||
go g.handleSignals(g.managerCtx)
|
go g.handleSignals(g.managerCtx)
|
||||||
|
|
|
@ -85,7 +85,9 @@ func (g *Manager) start() {
|
||||||
g.shutdownRequested = make(chan struct{})
|
g.shutdownRequested = make(chan struct{})
|
||||||
|
|
||||||
// Set the running state
|
// Set the running state
|
||||||
g.setState(stateRunning)
|
if !g.setStateTransition(stateInit, stateRunning) {
|
||||||
|
panic("invalid graceful manager state: transition from init to running failed")
|
||||||
|
}
|
||||||
if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip {
|
if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip {
|
||||||
log.Trace("Skipping SVC check as SKIP_MINWINSVC is set")
|
log.Trace("Skipping SVC check as SKIP_MINWINSVC is set")
|
||||||
return
|
return
|
||||||
|
|
|
@ -150,12 +150,8 @@ func CloseProvidedListeners() error {
|
||||||
return returnableError
|
return returnableError
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultGetListener obtains a listener for the local network address. The network must be
|
// DefaultGetListener obtains a listener for the stream-oriented local network address:
|
||||||
// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It
|
// "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
|
||||||
// returns an provided net.Listener for the matching network and address, or
|
|
||||||
// creates a new one using net.Listen. This function can be replaced by changing the
|
|
||||||
// GetListener variable at the top of this file, for example to listen on an onion service using
|
|
||||||
// github.com/cretz/bine
|
|
||||||
func DefaultGetListener(network, address string) (net.Listener, error) {
|
func DefaultGetListener(network, address string) (net.Listener, error) {
|
||||||
// Add a deferral to say that we've tried to grab a listener
|
// Add a deferral to say that we've tried to grab a listener
|
||||||
defer GetManager().InformCleanup()
|
defer GetManager().InformCleanup()
|
||||||
|
|
|
@ -10,9 +10,7 @@ package graceful
|
||||||
import "net"
|
import "net"
|
||||||
|
|
||||||
// DefaultGetListener obtains a listener for the local network address.
|
// DefaultGetListener obtains a listener for the local network address.
|
||||||
// On windows this is basically just a shim around net.Listen. This function
|
// On windows this is basically just a shim around net.Listen.
|
||||||
// can be replaced by changing the GetListener variable at the top of this file,
|
|
||||||
// for example to listen on an onion service using github.com/cretz/bine
|
|
||||||
func DefaultGetListener(network, address string) (net.Listener, error) {
|
func DefaultGetListener(network, address string) (net.Listener, error) {
|
||||||
// Add a deferral to say that we've tried to grab a listener
|
// Add a deferral to say that we've tried to grab a listener
|
||||||
defer GetManager().InformCleanup()
|
defer GetManager().InformCleanup()
|
||||||
|
|
|
@ -20,31 +20,11 @@ import (
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
// GetListener returns a net listener
|
||||||
// DefaultReadTimeOut default read timeout
|
// This determines the implementation of net.Listener which the server will use,
|
||||||
DefaultReadTimeOut time.Duration
|
// so that downstreams could provide their own Listener, such as with a hidden service or a p2p network
|
||||||
// DefaultWriteTimeOut default write timeout
|
|
||||||
DefaultWriteTimeOut time.Duration
|
|
||||||
// DefaultMaxHeaderBytes default max header bytes
|
|
||||||
DefaultMaxHeaderBytes int
|
|
||||||
// PerWriteWriteTimeout timeout for writes
|
|
||||||
PerWriteWriteTimeout = 30 * time.Second
|
|
||||||
// PerWriteWriteTimeoutKbTime is a timeout taking account of how much there is to be written
|
|
||||||
PerWriteWriteTimeoutKbTime = 10 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// GetListener returns a listener from a GetListener function, which must have the
|
|
||||||
// signature: `func FunctioName(network, address string) (net.Listener, error)`.
|
|
||||||
// This determines the implementation of net.Listener which the server will use.`
|
|
||||||
// It is implemented in this way so that downstreams may specify the type of listener
|
|
||||||
// they want to provide Gitea on by default, such as with a hidden service or a p2p network
|
|
||||||
// No need to worry about "breaking" if there would be a refactoring for the Listeners. No compatibility-guarantee for this mechanism
|
|
||||||
var GetListener = DefaultGetListener
|
var GetListener = DefaultGetListener
|
||||||
|
|
||||||
func init() {
|
|
||||||
DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServeFunction represents a listen.Accept loop
|
// ServeFunction represents a listen.Accept loop
|
||||||
type ServeFunction = func(net.Listener) error
|
type ServeFunction = func(net.Listener) error
|
||||||
|
|
||||||
|
|
|
@ -13,9 +13,6 @@ import (
|
||||||
func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) {
|
func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) {
|
||||||
server := NewServer(network, address, name)
|
server := NewServer(network, address, name)
|
||||||
httpServer := http.Server{
|
httpServer := http.Server{
|
||||||
ReadTimeout: DefaultReadTimeOut,
|
|
||||||
WriteTimeout: DefaultWriteTimeOut,
|
|
||||||
MaxHeaderBytes: DefaultMaxHeaderBytes,
|
|
||||||
Handler: handler,
|
Handler: handler,
|
||||||
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
|
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue