Merge pull request '[v7.0/forgejo] [TEST] cancel all processes on PrepareTestEnv' (#3130) from bp-v7.0/forgejo-8ffaa08-aba99ab into v7.0/forgejo
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/3130 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org> Reviewed-by: oliverpool <oliverpool@noreply.codeberg.org>
This commit is contained in:
commit
bc1f64e3bf
7 changed files with 53 additions and 9 deletions
|
@ -161,8 +161,7 @@ func MainTest(m *testing.M) {
|
||||||
exitStatus := m.Run()
|
exitStatus := m.Run()
|
||||||
|
|
||||||
if err := testlogger.WriterCloser.Reset(); err != nil && exitStatus == 0 {
|
if err := testlogger.WriterCloser.Reset(); err != nil && exitStatus == 0 {
|
||||||
fmt.Printf("testlogger.WriterCloser.Reset: %v\n", err)
|
fmt.Printf("testlogger.WriterCloser.Reset: error ignored: %v\n", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if err := removeAllWithRetry(setting.RepoRootPath); err != nil {
|
if err := removeAllWithRetry(setting.RepoRootPath); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "os.RemoveAll: %v\n", err)
|
fmt.Fprintf(os.Stderr, "os.RemoveAll: %v\n", err)
|
||||||
|
|
|
@ -120,9 +120,12 @@ func Init() {
|
||||||
case "bleve", "elasticsearch":
|
case "bleve", "elasticsearch":
|
||||||
handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
|
handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
|
||||||
indexer := *globalIndexer.Load()
|
indexer := *globalIndexer.Load()
|
||||||
|
// make it a process to allow for cancellation (especially during integration tests where no global shutdown happens)
|
||||||
|
batchCtx, _, finished := process.GetManager().AddContext(ctx, "CodeIndexer batch")
|
||||||
|
defer finished()
|
||||||
for _, indexerData := range items {
|
for _, indexerData := range items {
|
||||||
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
|
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
|
||||||
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
|
if err := index(batchCtx, indexer, indexerData.RepoID); err != nil {
|
||||||
unhandled = append(unhandled, indexerData)
|
unhandled = append(unhandled, indexerData)
|
||||||
if !setting.IsInTesting {
|
if !setting.IsInTesting {
|
||||||
log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
|
log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
"code.gitea.io/gitea/modules/json"
|
"code.gitea.io/gitea/modules/json"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/process"
|
||||||
repo_module "code.gitea.io/gitea/modules/repository"
|
repo_module "code.gitea.io/gitea/modules/repository"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
"code.gitea.io/gitea/modules/sync"
|
"code.gitea.io/gitea/modules/sync"
|
||||||
|
@ -296,8 +297,12 @@ func checkForInvalidation(ctx context.Context, requests issues_model.PullRequest
|
||||||
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
|
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
|
||||||
// and generate new patch for testing as needed.
|
// and generate new patch for testing as needed.
|
||||||
func AddTestPullRequestTask(ctx context.Context, doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string, timeNano int64) {
|
func AddTestPullRequestTask(ctx context.Context, doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string, timeNano int64) {
|
||||||
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: only pull requests created before nano time %d will be considered", repoID, branch, timeNano)
|
description := fmt.Sprintf("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: only pull requests created before nano time %d will be considered", repoID, branch, timeNano)
|
||||||
go graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
|
log.Trace(description)
|
||||||
|
go graceful.GetManager().RunWithShutdownContext(func(shutdownCtx context.Context) {
|
||||||
|
// make it a process to allow for cancellation (especially during integration tests where no global shutdown happens)
|
||||||
|
ctx, _, finished := process.GetManager().AddContext(shutdownCtx, description)
|
||||||
|
defer finished()
|
||||||
// There is no sensible way to shut this down ":-("
|
// There is no sensible way to shut this down ":-("
|
||||||
// If you don't let it run all the way then you will lose data
|
// If you don't let it run all the way then you will lose data
|
||||||
// TODO: graceful: TestPullRequest needs to become a queue!
|
// TODO: graceful: TestPullRequest needs to become a queue!
|
||||||
|
|
|
@ -60,8 +60,7 @@ func TestMain(m *testing.M) {
|
||||||
exitVal := m.Run()
|
exitVal := m.Run()
|
||||||
|
|
||||||
if err := testlogger.WriterCloser.Reset(); err != nil {
|
if err := testlogger.WriterCloser.Reset(); err != nil {
|
||||||
fmt.Printf("testlogger.WriterCloser.Reset: %v\n", err)
|
fmt.Printf("testlogger.WriterCloser.Reset: error ignored: %v\n", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil {
|
if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil {
|
||||||
fmt.Printf("util.RemoveAll: %v\n", err)
|
fmt.Printf("util.RemoveAll: %v\n", err)
|
||||||
|
|
|
@ -1076,6 +1076,7 @@ func TestDataAsync_Issue29101(t *testing.T) {
|
||||||
|
|
||||||
gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo)
|
gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
defer gitRepo.Close()
|
||||||
|
|
||||||
commit, err := gitRepo.GetCommit(sha)
|
commit, err := gitRepo.GetCommit(sha)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
|
@ -184,8 +184,7 @@ func TestMain(m *testing.M) {
|
||||||
exitCode := m.Run()
|
exitCode := m.Run()
|
||||||
|
|
||||||
if err := testlogger.WriterCloser.Reset(); err != nil {
|
if err := testlogger.WriterCloser.Reset(); err != nil {
|
||||||
fmt.Printf("testlogger.WriterCloser.Reset: %v\n", err)
|
fmt.Printf("testlogger.WriterCloser.Reset: error ignored: %v\n", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil {
|
if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil {
|
||||||
|
|
|
@ -11,7 +11,9 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"code.gitea.io/gitea/models/db"
|
"code.gitea.io/gitea/models/db"
|
||||||
packages_model "code.gitea.io/gitea/models/packages"
|
packages_model "code.gitea.io/gitea/models/packages"
|
||||||
|
@ -20,6 +22,7 @@ import (
|
||||||
"code.gitea.io/gitea/modules/git"
|
"code.gitea.io/gitea/modules/git"
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/process"
|
||||||
repo_module "code.gitea.io/gitea/modules/repository"
|
repo_module "code.gitea.io/gitea/modules/repository"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
"code.gitea.io/gitea/modules/storage"
|
"code.gitea.io/gitea/modules/storage"
|
||||||
|
@ -193,6 +196,36 @@ func PrepareAttachmentsStorage(t testing.TB) {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancelProcesses cancels all processes of the [process.Manager].
|
||||||
|
// Returns immediately if delay is 0, otherwise wait until all processes are done
|
||||||
|
// and fails the test if it takes longer that the given delay.
|
||||||
|
func cancelProcesses(t testing.TB, delay time.Duration) {
|
||||||
|
processManager := process.GetManager()
|
||||||
|
processes, _ := processManager.Processes(true, true)
|
||||||
|
for _, p := range processes {
|
||||||
|
processManager.Cancel(p.PID)
|
||||||
|
t.Logf("PrepareTestEnv:Process %q cancelled", p.Description)
|
||||||
|
}
|
||||||
|
if delay == 0 || len(processes) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
processes, _ = processManager.Processes(true, true)
|
||||||
|
for len(processes) > 0 {
|
||||||
|
if time.Since(start) > delay {
|
||||||
|
t.Errorf("ERROR PrepareTestEnv: could not cancel all processes within %s", delay)
|
||||||
|
for _, p := range processes {
|
||||||
|
t.Logf("PrepareTestEnv:Remaining Process: %q", p.Description)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
runtime.Gosched() // let the context cancellation propagate
|
||||||
|
processes, _ = processManager.Processes(true, true)
|
||||||
|
}
|
||||||
|
t.Logf("PrepareTestEnv: all processes cancelled within %s", time.Since(start))
|
||||||
|
}
|
||||||
|
|
||||||
func PrepareTestEnv(t testing.TB, skip ...int) func() {
|
func PrepareTestEnv(t testing.TB, skip ...int) func() {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
ourSkip := 1
|
ourSkip := 1
|
||||||
|
@ -201,6 +234,11 @@ func PrepareTestEnv(t testing.TB, skip ...int) func() {
|
||||||
}
|
}
|
||||||
deferFn := PrintCurrentTest(t, ourSkip)
|
deferFn := PrintCurrentTest(t, ourSkip)
|
||||||
|
|
||||||
|
// kill all background processes to prevent them from interfering with the fixture loading
|
||||||
|
// see https://codeberg.org/forgejo/forgejo/issues/2962
|
||||||
|
cancelProcesses(t, 30*time.Second)
|
||||||
|
t.Cleanup(func() { cancelProcesses(t, 0) }) // cancel remaining processes in a non-blocking way
|
||||||
|
|
||||||
// load database fixtures
|
// load database fixtures
|
||||||
assert.NoError(t, unittest.LoadFixtures())
|
assert.NoError(t, unittest.LoadFixtures())
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue