Merge pull request '[CI] workerqueue attempt to fix flacky test' (#2721) from oliverpool/forgejo:queue_flaky_fix into forgejo
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/2721 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
This commit is contained in:
commit
f68100ceea
3 changed files with 28 additions and 12 deletions
|
@ -146,8 +146,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
|
||||||
log.Debug("Queue %q starts new worker", q.GetName())
|
log.Debug("Queue %q starts new worker", q.GetName())
|
||||||
defer log.Debug("Queue %q stops idle worker", q.GetName())
|
defer log.Debug("Queue %q stops idle worker", q.GetName())
|
||||||
|
|
||||||
atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
|
|
||||||
|
|
||||||
t := time.NewTicker(workerIdleDuration)
|
t := time.NewTicker(workerIdleDuration)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
|
||||||
workerMaxNum int
|
workerMaxNum int
|
||||||
workerActiveNum int
|
workerActiveNum int
|
||||||
workerNumMu sync.Mutex
|
workerNumMu sync.Mutex
|
||||||
|
|
||||||
workerStartedCounter int32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type flushType chan struct{}
|
type flushType chan struct{}
|
||||||
|
|
|
@ -4,7 +4,9 @@
|
||||||
package queue
|
package queue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -14,6 +16,7 @@ import (
|
||||||
"code.gitea.io/gitea/modules/test"
|
"code.gitea.io/gitea/modules/test"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
|
func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
|
||||||
|
@ -249,23 +252,40 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
|
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
|
||||||
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
|
defer test.MockVariableValue(&workerIdleDuration, 1*time.Millisecond)()
|
||||||
|
|
||||||
|
chGoroutineIDs := make(chan string)
|
||||||
handler := func(items ...int) (unhandled []int) {
|
handler := func(items ...int) (unhandled []int) {
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(10 * workerIdleDuration)
|
||||||
|
chGoroutineIDs <- goroutineID() // hacky way to identify a worker
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
|
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
|
||||||
stop := runWorkerPoolQueue(q)
|
stop := runWorkerPoolQueue(q)
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
|
const workloadSize = 12
|
||||||
|
for i := 0; i < workloadSize; i++ {
|
||||||
assert.NoError(t, q.Push(i))
|
assert.NoError(t, q.Push(i))
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
workerIDs := make(map[string]struct{})
|
||||||
assert.EqualValues(t, 2, q.GetWorkerNumber())
|
for i := 0; i < workloadSize; i++ {
|
||||||
assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
|
c := <-chGoroutineIDs
|
||||||
// when the queue never becomes empty, the existing workers should keep working
|
workerIDs[c] = struct{}{}
|
||||||
assert.LessOrEqual(t, q.workerStartedCounter, int32(4)) // counter should be 2, but sometimes it gets bigger
|
t.Logf("%d workers: overall=%d current=%d", i, len(workerIDs), q.GetWorkerNumber())
|
||||||
|
|
||||||
|
// ensure that no more than qs.MaxWorkers workers are created over the whole lifetime of the queue
|
||||||
|
// (otherwise it would mean that some workers got shut down while the queue was full)
|
||||||
|
require.LessOrEqual(t, len(workerIDs), q.GetWorkerMaxNumber())
|
||||||
|
}
|
||||||
|
close(chGoroutineIDs)
|
||||||
|
|
||||||
stop()
|
stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func goroutineID() string {
|
||||||
|
var buffer [31]byte
|
||||||
|
_ = runtime.Stack(buffer[:], false)
|
||||||
|
return string(bytes.Fields(buffer[10:])[0])
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue