Refactor graceful manager, fix misused WaitGroup (#29738)

Follow #29629

(cherry picked from commit d08f4360c96e130e0454b76ecef9405f2bd312a1)
This commit is contained in:
coldWater 2024-03-15 18:59:11 +08:00 committed by Earl Warren
parent 1a17c39e00
commit 94c70c7753
No known key found for this signature in database
GPG key ID: 0579CB2928A78A00
4 changed files with 55 additions and 51 deletions

View file

@ -233,7 +233,10 @@ func (g *Manager) setStateTransition(old, new state) bool {
// At the moment the total number of servers (numberOfServersToCreate) are pre-defined as a const before global init, // At the moment the total number of servers (numberOfServersToCreate) are pre-defined as a const before global init,
// so this function MUST be called if a server is not used. // so this function MUST be called if a server is not used.
func (g *Manager) InformCleanup() { func (g *Manager) InformCleanup() {
g.createServerWaitGroup.Done() g.createServerCond.L.Lock()
defer g.createServerCond.L.Unlock()
g.createdServer++
g.createServerCond.Signal()
} }
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating

View file

@ -42,8 +42,9 @@ type Manager struct {
terminateCtxCancel context.CancelFunc terminateCtxCancel context.CancelFunc
managerCtxCancel context.CancelFunc managerCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup
createServerCond sync.Cond
createdServer int
shutdownRequested chan struct{} shutdownRequested chan struct{}
toRunAtShutdown []func() toRunAtShutdown []func()
@ -52,7 +53,7 @@ type Manager struct {
func newGracefulManager(ctx context.Context) *Manager { func newGracefulManager(ctx context.Context) *Manager {
manager := &Manager{ctx: ctx, shutdownRequested: make(chan struct{})} manager := &Manager{ctx: ctx, shutdownRequested: make(chan struct{})}
manager.createServerWaitGroup.Add(numberOfServersToCreate) manager.createServerCond.L = &sync.Mutex{}
manager.prepare(ctx) manager.prepare(ctx)
manager.start() manager.start()
return manager return manager

View file

@ -57,20 +57,27 @@ func (g *Manager) start() {
// Handle clean up of unused provided listeners and delayed start-up // Handle clean up of unused provided listeners and delayed start-up
startupDone := make(chan struct{}) startupDone := make(chan struct{})
go func() { go func() {
defer close(startupDone)
// Wait till we're done getting all the listeners and then close the unused ones
func() {
// FIXME: there is a fundamental design problem of the "manager" and the "wait group".
// If nothing has started, the "Wait" just panics: sync: WaitGroup is reused before previous Wait has returned
// There is no clear solution besides a complete rewriting of the "manager"
defer func() { defer func() {
_ = recover() close(startupDone)
}() // Close the unused listeners and ignore the error here there's not much we can do with it, they're logged in the CloseProvidedListeners function
g.createServerWaitGroup.Wait()
}()
// Ignore the error here there's not much we can do with it, they're logged in the CloseProvidedListeners function
_ = CloseProvidedListeners() _ = CloseProvidedListeners()
}()
// Wait for all servers to be created
g.createServerCond.L.Lock()
for {
if g.createdServer >= numberOfServersToCreate {
g.createServerCond.L.Unlock()
g.notify(readyMsg) g.notify(readyMsg)
return
}
select {
case <-g.IsShutdown():
g.createServerCond.L.Unlock()
return
default:
}
g.createServerCond.Wait()
}
}() }()
if setting.StartupTimeout > 0 { if setting.StartupTimeout > 0 {
go func() { go func() {
@ -78,16 +85,7 @@ func (g *Manager) start() {
case <-startupDone: case <-startupDone:
return return
case <-g.IsShutdown(): case <-g.IsShutdown():
func() { g.createServerCond.Signal()
// When WaitGroup counter goes negative it will panic - we don't care about this so we can just ignore it.
defer func() {
_ = recover()
}()
// Ensure that the createServerWaitGroup stops waiting
for {
g.createServerWaitGroup.Done()
}
}()
return return
case <-time.After(setting.StartupTimeout): case <-time.After(setting.StartupTimeout):
log.Error("Startup took too long! Shutting down") log.Error("Startup took too long! Shutting down")

View file

@ -149,34 +149,36 @@ hammerLoop:
func (g *Manager) awaitServer(limit time.Duration) bool { func (g *Manager) awaitServer(limit time.Duration) bool {
c := make(chan struct{}) c := make(chan struct{})
go func() { go func() {
defer close(c) g.createServerCond.L.Lock()
func() { for {
// FIXME: there is a fundamental design problem of the "manager" and the "wait group". if g.createdServer >= numberOfServersToCreate {
// If nothing has started, the "Wait" just panics: sync: WaitGroup is reused before previous Wait has returned g.createServerCond.L.Unlock()
// There is no clear solution besides a complete rewriting of the "manager" close(c)
defer func() { return
_ = recover() }
}() select {
g.createServerWaitGroup.Wait() case <-g.IsShutdown():
}() g.createServerCond.L.Unlock()
return
default:
}
g.createServerCond.Wait()
}
}() }()
var tc <-chan time.Time
if limit > 0 { if limit > 0 {
tc = time.After(limit)
}
select { select {
case <-c: case <-c:
return true // completed normally return true // completed normally
case <-time.After(limit): case <-tc:
return false // timed out return false // timed out
case <-g.IsShutdown(): case <-g.IsShutdown():
g.createServerCond.Signal()
return false return false
} }
} else {
select {
case <-c:
return true // completed normally
case <-g.IsShutdown():
return false
}
}
} }
func (g *Manager) notify(msg systemdNotifyMsg) { func (g *Manager) notify(msg systemdNotifyMsg) {