This PR is another in the vein of queue improvements. It suggests an exponential backoff for bytefifo queues to reduce the load from queue polling. This will mostly be useful for redis queues. Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: zeripath <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv>
This commit is contained in:
parent
30584a6df8
commit
d644709b22
1 changed files with 59 additions and 29 deletions
|
@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()
|
|||
}
|
||||
|
||||
func (q *ByteFIFOQueue) readToChan() {
|
||||
// handle quick cancels
|
||||
select {
|
||||
case <-q.closed:
|
||||
// tell the pool to shutdown.
|
||||
q.cancel()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
backOffTime := time.Millisecond * 100
|
||||
maxBackOffTime := time.Second * 3
|
||||
for {
|
||||
select {
|
||||
case <-q.closed:
|
||||
// tell the pool to shutdown.
|
||||
q.cancel()
|
||||
return
|
||||
default:
|
||||
q.lock.Lock()
|
||||
bs, err := q.byteFIFO.Pop()
|
||||
if err != nil {
|
||||
q.lock.Unlock()
|
||||
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
success, resetBackoff := q.doPop()
|
||||
if resetBackoff {
|
||||
backOffTime = 100 * time.Millisecond
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
q.lock.Unlock()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
if success {
|
||||
select {
|
||||
case <-q.closed:
|
||||
// tell the pool to shutdown.
|
||||
q.cancel()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
data, err := unmarshalAs(bs, q.exemplar)
|
||||
if err != nil {
|
||||
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
|
||||
q.lock.Unlock()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
} else {
|
||||
select {
|
||||
case <-q.closed:
|
||||
// tell the pool to shutdown.
|
||||
q.cancel()
|
||||
return
|
||||
case <-time.After(backOffTime):
|
||||
}
|
||||
backOffTime += backOffTime / 2
|
||||
if backOffTime > maxBackOffTime {
|
||||
backOffTime = maxBackOffTime
|
||||
}
|
||||
|
||||
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
|
||||
q.WorkerPool.Push(data)
|
||||
q.lock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
bs, err := q.byteFIFO.Pop()
|
||||
if err != nil {
|
||||
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
|
||||
return
|
||||
}
|
||||
if len(bs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
resetBackoff = true
|
||||
|
||||
data, err := unmarshalAs(bs, q.exemplar)
|
||||
if err != nil {
|
||||
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
|
||||
q.WorkerPool.Push(data)
|
||||
success = true
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown processing from this queue
|
||||
func (q *ByteFIFOQueue) Shutdown() {
|
||||
log.Trace("%s: %s Shutting down", q.typ, q.name)
|
||||
|
|
Reference in a new issue