Only attempt to flush queue if the underlying worker pool is not finished (#18593) (#18620)

* Only attempt to flush queue if the underlying worker pool is not finished (#18593)

Backport #18593

There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.

* On shutdown of Persistant Channel Queues close datachan and empty

Partial Backport #18415

Although we attempt to empty the datachan in queues - due to
races we are better off just closing the channel and forcibly emptying
it in shutdown.

Fix #18618

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Move zero workers warning to debug

Fix #18617

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>

* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>

Co-authored-by: Gusted <williamzijl7@hotmail.com>
This commit is contained in:
zeripath 2022-02-06 06:55:44 +00:00 committed by GitHub
parent f65e29c077
commit 36c66303df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 15 deletions

View file

@ -72,6 +72,8 @@ type ManagedPool interface {
BoostWorkers() int BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool // SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
// Done returns a channel that will be closed when the Pool's baseCtx is closed
Done() <-chan struct{}
} }
// ManagedQueueList implements the sort.Interface // ManagedQueueList implements the sort.Interface
@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) {
delete(m.Queues, qid) delete(m.Queues, qid)
m.mutex.Unlock() m.mutex.Unlock()
log.Trace("Queue Manager removed: QID: %d", qid) log.Trace("Queue Manager removed: QID: %d", qid)
} }
// GetManagedQueue by qid // GetManagedQueue by qid
@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Done() wg.Done()
continue continue
} }
if pool, ok := mq.Managed.(ManagedPool); ok {
// No point into flushing pools when their base's ctx is already done.
select {
case <-pool.Done():
wg.Done()
continue
default:
}
}
allEmpty = false allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok { if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name) log.Debug("Flushing (flushable) queue: %s", mq.Name)
@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Wait() wg.Wait()
} }
return nil return nil
} }
// ManagedQueues returns the managed queues // ManagedQueues returns the managed queues

View file

@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
q.internal.(*LevelQueue).Shutdown() q.internal.(*LevelQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelQueue).qid) GetManager().Remove(q.internal.(*LevelQueue).qid)
} }
} }
// Flush flushes the queue and blocks till the queue is empty // Flush flushes the queue and blocks till the queue is empty
@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait() q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait() q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel // Redirect all remaining data in the chan to the internal channel
go func() { close(q.channelQueue.dataChan)
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan { for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data) _ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1) atomic.AddInt64(&q.channelQueue.numInQueue, -1)
} }
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
} }

View file

@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
return pool return pool
} }
// Done returns when this worker pool's base context has been cancelled
func (p *WorkerPool) Done() <-chan struct{} {
return p.baseCtx.Done()
}
// Push pushes the data to the internal channel // Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) { func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1) atomic.AddInt64(&p.numInQueue, 1)
@ -90,7 +95,7 @@ func (p *WorkerPool) zeroBoost() {
boost = p.maxNumberOfWorkers - p.numberOfWorkers boost = p.maxNumberOfWorkers - p.numberOfWorkers
} }
if mq != nil { if mq != nil {
log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
start := time.Now() start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
@ -98,7 +103,7 @@ func (p *WorkerPool) zeroBoost() {
mq.RemoveWorkers(pid) mq.RemoveWorkers(pid)
} }
} else { } else {
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
} }
p.lock.Unlock() p.lock.Unlock()
p.addWorkers(ctx, cancel, boost) p.addWorkers(ctx, cancel, boost)
@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid) log.Trace("WorkerPool: %d Flush", p.qid)
for { for {
select { select {
case data := <-p.dataChan: case data, ok := <-p.dataChan:
if !ok {
return nil
}
p.handle(data) p.handle(data)
atomic.AddInt64(&p.numInQueue, -1) atomic.AddInt64(&p.numInQueue, -1)
case <-p.baseCtx.Done(): case <-p.baseCtx.Done():
@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
func (p *WorkerPool) doWork(ctx context.Context) { func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300 delay := time.Millisecond * 300
var data = make([]Data, 0, p.batchLength) data := make([]Data, 0, p.batchLength)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():