From e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 15 Dec 2019 09:51:28 +0000 Subject: [PATCH] Graceful: Xorm, RepoIndexer, Cron and Others (#9282) * Change graceful to use a singleton obtained through GetManager instead of a global. * Graceful: Make TestPullRequests shutdownable * Graceful: Make the cron tasks graceful * Graceful: AddTestPullRequest run in graceful ctx * Graceful: SyncMirrors shutdown * Graceful: SetDefaultContext for Xorm to be HammerContext * Avoid starting graceful for migrate commands and checkout * Graceful: DeliverHooks now can be shutdown * Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable * Begin the process of making the repo indexer shutdown gracefully --- cmd/migrate.go | 4 +- cmd/web.go | 10 +- cmd/web_graceful.go | 4 +- contrib/pr/checkout.go | 3 +- integrations/auth_ldap_test.go | 6 +- integrations/integration_test.go | 8 +- integrations/migration-test/migration_test.go | 3 +- models/branches.go | 4 +- models/models.go | 5 +- models/pull_list.go | 9 +- models/repo.go | 64 +++++++- models/repo_indexer.go | 43 +++++- models/user.go | 35 ++++- modules/cron/cron.go | 9 +- modules/git/git.go | 3 +- modules/git/git_test.go | 3 +- modules/graceful/context.go | 6 +- modules/graceful/manager.go | 138 ++++++++++-------- modules/graceful/manager_unix.go | 45 ++++-- modules/graceful/manager_windows.go | 31 ++-- modules/graceful/net_unix.go | 2 +- modules/graceful/restart_unix.go | 2 +- modules/graceful/server.go | 6 +- modules/graceful/server_hooks.go | 6 +- modules/indexer/code/bleve.go | 82 ++++++----- modules/indexer/code/repo.go | 35 ++++- modules/indexer/issues/indexer.go | 2 +- modules/migrations/update.go | 26 +++- modules/ssh/ssh_graceful.go | 2 +- modules/sync/unique_queue.go | 56 +++++-- modules/webhook/deliver.go | 63 +++++--- routers/admin/admin.go | 5 +- routers/init.go | 11 +- routers/install.go | 3 +- services/mirror/mirror.go | 32 ++-- services/pull/check.go | 86 +++++------ services/pull/pull.go | 63 ++++---- 37 files changed, 628 insertions(+), 287 deletions(-) diff --git a/cmd/migrate.go b/cmd/migrate.go index 1fa1d09e25..2428925887 100644 --- a/cmd/migrate.go +++ b/cmd/migrate.go @@ -5,6 +5,8 @@ package cmd import ( + "context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/models/migrations" "code.gitea.io/gitea/modules/log" @@ -32,7 +34,7 @@ func runMigrate(ctx *cli.Context) error { log.Trace("Log path: %s", setting.LogRootPath) setting.InitDBConfig() - if err := models.NewEngine(migrations.Migrate); err != nil { + if err := models.NewEngine(context.Background(), migrations.Migrate); err != nil { log.Fatal("Failed to initialize ORM engine: %v", err) return err } diff --git a/cmd/web.go b/cmd/web.go index cc00a32198..243b9a4108 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -5,6 +5,7 @@ package cmd import ( + "context" "fmt" "net/http" _ "net/http/pprof" // Used for debugging if enabled and a web server is running @@ -96,6 +97,10 @@ func runLetsEncryptFallbackHandler(w http.ResponseWriter, r *http.Request) { } func runWeb(ctx *cli.Context) error { + managerCtx, cancel := context.WithCancel(context.Background()) + graceful.InitManager(managerCtx) + defer cancel() + if os.Getppid() > 1 && len(os.Getenv("LISTEN_FDS")) > 0 { log.Info("Restarting Gitea on PID: %d from parent PID: %d", os.Getpid(), os.Getppid()) } else { @@ -108,7 +113,7 @@ func runWeb(ctx *cli.Context) error { } // Perform global initialization - routers.GlobalInit() + routers.GlobalInit(graceful.GetManager().HammerContext()) // Set up Macaron m := routes.NewMacaron() @@ -199,8 +204,7 @@ func runWeb(ctx *cli.Context) error { log.Critical("Failed to start server: %v", err) } log.Info("HTTP Listener: %s Closed", listenAddr) - graceful.Manager.WaitForServers() - graceful.Manager.WaitForTerminate() + <-graceful.GetManager().Done() log.Info("PID: %d Gitea Web Finished", os.Getpid()) log.Close() return nil diff --git a/cmd/web_graceful.go b/cmd/web_graceful.go index 5f8b85b390..f3c41766af 100644 --- a/cmd/web_graceful.go +++ b/cmd/web_graceful.go @@ -28,13 +28,13 @@ func runHTTPSWithTLSConfig(network, listenAddr string, tlsConfig *tls.Config, m // NoHTTPRedirector tells our cleanup routine that we will not be using a fallback http redirector func NoHTTPRedirector() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } // NoMainListener tells our cleanup routine that we will not be using a possibly provided listener // for our main HTTP/HTTPS service func NoMainListener() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } func runFCGI(network, listenAddr string, m http.Handler) error { diff --git a/contrib/pr/checkout.go b/contrib/pr/checkout.go index 9c06357295..34cd82ff0a 100644 --- a/contrib/pr/checkout.go +++ b/contrib/pr/checkout.go @@ -5,6 +5,7 @@ Checkout a PR and load the tests data into sqlite database */ import ( + "context" "flag" "fmt" "io/ioutil" @@ -92,7 +93,7 @@ func runPR() { //x, err = xorm.NewEngine("sqlite3", "file::memory:?cache=shared") var helper testfixtures.Helper = &testfixtures.SQLite{} - models.NewEngine(func(_ *xorm.Engine) error { + models.NewEngine(context.Background(), func(_ *xorm.Engine) error { return nil }) models.HasEngine = true diff --git a/integrations/auth_ldap_test.go b/integrations/auth_ldap_test.go index 5cb2bad57d..80286c09e6 100644 --- a/integrations/auth_ldap_test.go +++ b/integrations/auth_ldap_test.go @@ -5,6 +5,7 @@ package integrations import ( + "context" "net/http" "os" "strings" @@ -147,7 +148,7 @@ func TestLDAPUserSync(t *testing.T) { } defer prepareTestEnv(t)() addAuthSourceLDAP(t, "") - models.SyncExternalUsers() + models.SyncExternalUsers(context.Background()) session := loginUser(t, "user1") // Check if users exists @@ -206,7 +207,8 @@ func TestLDAPUserSSHKeySync(t *testing.T) { } defer prepareTestEnv(t)() addAuthSourceLDAP(t, "sshPublicKey") - models.SyncExternalUsers() + + models.SyncExternalUsers(context.Background()) // Check if users has SSH keys synced for _, u := range gitLDAPUsers { diff --git a/integrations/integration_test.go b/integrations/integration_test.go index 5da9e04c78..bf363f3b4d 100644 --- a/integrations/integration_test.go +++ b/integrations/integration_test.go @@ -6,6 +6,7 @@ package integrations import ( "bytes" + "context" "database/sql" "encoding/json" "fmt" @@ -24,6 +25,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/routers" "code.gitea.io/gitea/routers/routes" @@ -55,6 +57,10 @@ func NewNilResponseRecorder() *NilResponseRecorder { } func TestMain(m *testing.M) { + managerCtx, cancel := context.WithCancel(context.Background()) + graceful.InitManager(managerCtx) + defer cancel() + initIntegrationTest() mac = routes.NewMacaron() routes.RegisterRoutes(mac) @@ -171,7 +177,7 @@ func initIntegrationTest() { } defer db.Close() } - routers.GlobalInit() + routers.GlobalInit(graceful.GetManager().HammerContext()) } func prepareTestEnv(t testing.TB, skip ...int) func() { diff --git a/integrations/migration-test/migration_test.go b/integrations/migration-test/migration_test.go index 4fe36dc021..c274d482da 100644 --- a/integrations/migration-test/migration_test.go +++ b/integrations/migration-test/migration_test.go @@ -6,6 +6,7 @@ package migrations import ( "compress/gzip" + "context" "database/sql" "fmt" "io/ioutil" @@ -220,7 +221,7 @@ func doMigrationTest(t *testing.T, version string) { err := models.SetEngine() assert.NoError(t, err) - err = models.NewEngine(wrappedMigrate) + err = models.NewEngine(context.Background(), wrappedMigrate) assert.NoError(t, err) currentEngine.Close() } diff --git a/models/branches.go b/models/branches.go index bb99cffa05..045019314a 100644 --- a/models/branches.go +++ b/models/branches.go @@ -5,6 +5,7 @@ package models import ( + "context" "fmt" "time" @@ -525,7 +526,8 @@ func (deletedBranch *DeletedBranch) LoadUser() { } // RemoveOldDeletedBranches removes old deleted branches -func RemoveOldDeletedBranches() { +func RemoveOldDeletedBranches(ctx context.Context) { + // Nothing to do for shutdown or terminate log.Trace("Doing: DeletedBranchesCleanup") deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan) diff --git a/models/models.go b/models/models.go index 8c10e7abfc..9eb174e200 100644 --- a/models/models.go +++ b/models/models.go @@ -6,6 +6,7 @@ package models import ( + "context" "database/sql" "errors" "fmt" @@ -164,11 +165,13 @@ func SetEngine() (err error) { } // NewEngine initializes a new xorm.Engine -func NewEngine(migrateFunc func(*xorm.Engine) error) (err error) { +func NewEngine(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) { if err = SetEngine(); err != nil { return err } + x.SetDefaultContext(ctx) + if err = x.Ping(); err != nil { return err } diff --git a/models/pull_list.go b/models/pull_list.go index 49d04ba0b8..1376978353 100644 --- a/models/pull_list.go +++ b/models/pull_list.go @@ -68,11 +68,12 @@ func GetUnmergedPullRequestsByBaseInfo(repoID int64, branch string) ([]*PullRequ Find(&prs) } -// GetPullRequestsByCheckStatus returns all pull requests according the special checking status. -func GetPullRequestsByCheckStatus(status PullRequestStatus) ([]*PullRequest, error) { - prs := make([]*PullRequest, 0, 10) - return prs, x. +// GetPullRequestIDsByCheckStatus returns all pull requests according the special checking status. +func GetPullRequestIDsByCheckStatus(status PullRequestStatus) ([]int64, error) { + prs := make([]int64, 0, 10) + return prs, x.Table("pull_request"). Where("status=?", status). + Cols("pull_request.id"). Find(&prs) } diff --git a/models/repo.go b/models/repo.go index f4ac75b8f0..c7eee3c1ec 100644 --- a/models/repo.go +++ b/models/repo.go @@ -7,6 +7,7 @@ package models import ( "bytes" + "context" "crypto/md5" "errors" "fmt" @@ -2098,19 +2099,27 @@ func DeleteRepositoryArchives() error { } // DeleteOldRepositoryArchives deletes old repository archives. -func DeleteOldRepositoryArchives() { +func DeleteOldRepositoryArchives(ctx context.Context) { log.Trace("Doing: ArchiveCleanup") - if err := x.Where("id > 0").Iterate(new(Repository), deleteOldRepositoryArchives); err != nil { + if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error { + return deleteOldRepositoryArchives(ctx, idx, bean) + }); err != nil { log.Error("ArchiveClean: %v", err) } } -func deleteOldRepositoryArchives(idx int, bean interface{}) error { +func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error { repo := bean.(*Repository) basePath := filepath.Join(repo.RepoPath(), "archives") for _, ty := range []string{"zip", "targz"} { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty) + default: + } + path := filepath.Join(basePath, ty) file, err := os.Open(path) if err != nil { @@ -2133,6 +2142,11 @@ func deleteOldRepositoryArchives(idx int, bean interface{}) error { minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan) for _, info := range files { if info.ModTime().Before(minimumOldestTime) && !info.IsDir() { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name()) + default: + } toDelete := filepath.Join(path, info.Name()) // This is a best-effort purge, so we do not check error codes to confirm removal. if err = os.Remove(toDelete); err != nil { @@ -2226,13 +2240,17 @@ func SyncRepositoryHooks() error { } // GitFsck calls 'git fsck' to check repository health. -func GitFsck() { +func GitFsck(ctx context.Context) { log.Trace("Doing: GitFsck") - if err := x. Where("id>0 AND is_fsck_enabled=?", true).BufferSize(setting.Database.IterateBufferSize). Iterate(new(Repository), func(idx int, bean interface{}) error { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + } repo := bean.(*Repository) repoPath := repo.RepoPath() log.Trace("Running health check on repository %s", repoPath) @@ -2278,13 +2296,19 @@ type repoChecker struct { desc string } -func repoStatsCheck(checker *repoChecker) { +func repoStatsCheck(ctx context.Context, checker *repoChecker) { results, err := x.Query(checker.querySQL) if err != nil { log.Error("Select %s: %v", checker.desc, err) return } for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", checker.desc, id) _, err = x.Exec(checker.correctSQL, id, id) @@ -2295,7 +2319,7 @@ func repoStatsCheck(checker *repoChecker) { } // CheckRepoStats checks the repository stats -func CheckRepoStats() { +func CheckRepoStats(ctx context.Context) { log.Trace("Doing: CheckRepoStats") checkers := []*repoChecker{ @@ -2331,7 +2355,13 @@ func CheckRepoStats() { }, } for i := range checkers { - repoStatsCheck(checkers[i]) + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + repoStatsCheck(ctx, checkers[i]) + } } // ***** START: Repository.NumClosedIssues ***** @@ -2341,6 +2371,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id) @@ -2358,6 +2394,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id) @@ -2375,6 +2417,12 @@ func CheckRepoStats() { log.Error("Select repository count 'num_forks': %v", err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating repository count 'num_forks': %d", id) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 138ef54d33..aee3c74b35 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -4,6 +4,12 @@ package models +import ( + "fmt" + + "xorm.io/builder" +) + // RepoIndexerStatus status of a repo's entry in the repo indexer // For now, implicitly refers to default branch type RepoIndexerStatus struct { @@ -12,6 +18,31 @@ type RepoIndexerStatus struct { CommitSha string `xorm:"VARCHAR(40)"` } +// GetUnindexedRepos returns repos which do not have an indexer status +func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { + ids := make([]int64, 0, 50) + cond := builder.Cond(builder.IsNull{ + "repo_indexer_status.id", + }) + sess := x.Table("repository").Join("LEFT OUTER", "repo_indexer_status", "repository.id = repo_indexer_status.repo_id") + if maxRepoID > 0 { + cond = builder.And(cond, builder.Lte{ + "repository.id": maxRepoID, + }) + } + if page >= 0 && pageSize > 0 { + start := 0 + if page > 0 { + start = (page - 1) * pageSize + } + sess.Limit(pageSize, start) + } + + sess.Where(cond).Cols("repository.id").Desc("repository.id") + err := sess.Find(&ids) + return ids, err +} + // GetIndexerStatus loads repo codes indxer status func (repo *Repository) GetIndexerStatus() error { if repo.IndexerStatus != nil { @@ -31,15 +62,21 @@ func (repo *Repository) GetIndexerStatus() error { // UpdateIndexerStatus updates indexer status func (repo *Repository) UpdateIndexerStatus(sha string) error { if err := repo.GetIndexerStatus(); err != nil { - return err + return fmt.Errorf("UpdateIndexerStatus: Unable to getIndexerStatus for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } if len(repo.IndexerStatus.CommitSha) == 0 { repo.IndexerStatus.CommitSha = sha _, err := x.Insert(repo.IndexerStatus) - return err + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } repo.IndexerStatus.CommitSha = sha _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). Update(repo.IndexerStatus) - return err + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to update repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } diff --git a/models/user.go b/models/user.go index 2cef2e5dec..0454158de6 100644 --- a/models/user.go +++ b/models/user.go @@ -7,6 +7,7 @@ package models import ( "container/list" + "context" "crypto/md5" "crypto/sha256" "crypto/subtle" @@ -1695,7 +1696,7 @@ func synchronizeLdapSSHPublicKeys(usr *User, s *LoginSource, sshPublicKeys []str } // SyncExternalUsers is used to synchronize users with external authorization source -func SyncExternalUsers() { +func SyncExternalUsers(ctx context.Context) { log.Trace("Doing: SyncExternalUsers") ls, err := LoginSources() @@ -1710,6 +1711,12 @@ func SyncExternalUsers() { if !s.IsActived || !s.IsSyncEnabled { continue } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } if s.IsLDAP() { log.Trace("Doing: SyncExternalUsers[%s]", s.Name) @@ -1727,6 +1734,12 @@ func SyncExternalUsers() { log.Error("SyncExternalUsers: %v", err) return } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } sr, err := s.LDAP().SearchEntries() if err != nil { @@ -1735,6 +1748,19 @@ func SyncExternalUsers() { } for _, su := range sr { + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before completed update of users", s.Name) + // Rewrite authorized_keys file if LDAP Public SSH Key attribute is set and any key was added or removed + if sshKeysNeedUpdate { + err = RewriteAllPublicKeys() + if err != nil { + log.Error("RewriteAllPublicKeys: %v", err) + } + } + return + default: + } if len(su.Username) == 0 { continue } @@ -1819,6 +1845,13 @@ func SyncExternalUsers() { } } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before delete users", s.Name) + return + default: + } + // Deactivate users not present in LDAP if updateExisting { for _, usr := range users { diff --git a/modules/cron/cron.go b/modules/cron/cron.go index 795fafb51f..f4511a8e79 100644 --- a/modules/cron/cron.go +++ b/modules/cron/cron.go @@ -6,9 +6,11 @@ package cron import ( + "context" "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations" "code.gitea.io/gitea/modules/setting" @@ -37,17 +39,19 @@ var taskStatusTable = sync.NewStatusTable() type Func func() // WithUnique wrap a cron func with an unique running check -func WithUnique(name string, body Func) Func { +func WithUnique(name string, body func(context.Context)) Func { return func() { if !taskStatusTable.StartIfNotRunning(name) { return } defer taskStatusTable.Stop(name) - body() + graceful.GetManager().RunWithShutdownContext(body) } } // NewContext begins cron tasks +// Each cron task is run within the shutdown context as a running server +// AtShutdown the cron server is stopped func NewContext() { var ( entry *cron.Entry @@ -129,6 +133,7 @@ func NewContext() { go WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)() c.Start() + graceful.GetManager().RunAtShutdown(context.Background(), c.Stop) } // ListTasks returns all running cron tasks. diff --git a/modules/git/git.go b/modules/git/git.go index 286e1ad8b4..d5caaa0912 100644 --- a/modules/git/git.go +++ b/modules/git/git.go @@ -106,7 +106,8 @@ func SetExecutablePath(path string) error { } // Init initializes git module -func Init() error { +func Init(ctx context.Context) error { + DefaultContext = ctx // Git requires setting user.name and user.email in order to commit changes. for configKey, defaultValue := range map[string]string{"user.name": "Gitea", "user.email": "gitea@fake.local"} { if stdout, stderr, err := process.GetManager().Exec("git.Init(get setting)", GitExecutable, "config", "--get", configKey); err != nil || strings.TrimSpace(stdout) == "" { diff --git a/modules/git/git_test.go b/modules/git/git_test.go index 0c6259a9c5..27951d639b 100644 --- a/modules/git/git_test.go +++ b/modules/git/git_test.go @@ -5,6 +5,7 @@ package git import ( + "context" "fmt" "os" "testing" @@ -16,7 +17,7 @@ func fatalTestError(fmtStr string, args ...interface{}) { } func TestMain(m *testing.M) { - if err := Init(); err != nil { + if err := Init(context.Background()); err != nil { fatalTestError("Init failed: %v", err) } diff --git a/modules/graceful/context.go b/modules/graceful/context.go index a4a4df7dea..1ad1109b4e 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -62,7 +62,7 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // ShutdownContext returns a context.Context that is Done at shutdown // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) ShutdownContext() context.Context { +func (g *Manager) ShutdownContext() context.Context { return &ChannelContext{ done: g.IsShutdown(), err: ErrShutdown, @@ -72,7 +72,7 @@ func (g *gracefulManager) ShutdownContext() context.Context { // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) HammerContext() context.Context { +func (g *Manager) HammerContext() context.Context { return &ChannelContext{ done: g.IsHammer(), err: ErrHammer, @@ -82,7 +82,7 @@ func (g *gracefulManager) HammerContext() context.Context { // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. -func (g *gracefulManager) TerminateContext() context.Context { +func (g *Manager) TerminateContext() context.Context { return &ChannelContext{ done: g.IsTerminate(), err: ErrTerminate, diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index b9a56ca9c6..eec675e297 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -6,9 +6,9 @@ package graceful import ( "context" + "sync" "time" - "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -34,14 +34,24 @@ const ( const numberOfServersToCreate = 3 // Manager represents the graceful server manager interface -var Manager *gracefulManager +var manager *Manager -func init() { - Manager = newGracefulManager(context.Background()) - // Set the git default context to the HammerContext - git.DefaultContext = Manager.HammerContext() - // Set the process default context to the HammerContext - process.DefaultContext = Manager.HammerContext() +var initOnce = sync.Once{} + +// GetManager returns the Manager +func GetManager() *Manager { + InitManager(context.Background()) + return manager +} + +// InitManager creates the graceful manager in the provided context +func InitManager(ctx context.Context) { + initOnce.Do(func() { + manager = newGracefulManager(ctx) + + // Set the process default context to the HammerContext + process.DefaultContext = manager.HammerContext() + }) } // CallbackWithContext is combined runnable and context to watch to see if the caller has finished @@ -61,7 +71,7 @@ type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -func (g *gracefulManager) RunWithShutdownFns(run RunnableWithShutdownFns) { +func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(func(ctx context.Context, atShutdown func()) { @@ -90,7 +100,7 @@ type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate Callb // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { +func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { @@ -101,14 +111,14 @@ func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { // RunWithShutdownContext takes a function that has a context to watch for shutdown. // After the provided context is Done(), the main function must return once shutdown is complete. // (Optionally the HammerContext may be obtained and waited for however, this should be avoided if possible.) -func (g *gracefulManager) RunWithShutdownContext(run func(context.Context)) { +func (g *Manager) RunWithShutdownContext(run func(context.Context)) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.ShutdownContext()) } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) go func() { select { @@ -121,7 +131,7 @@ func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) } // RunAtShutdown creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { +func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { go func() { select { case <-g.IsShutdown(): @@ -132,7 +142,7 @@ func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { +func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { go func() { select { case <-g.IsHammer(): @@ -141,7 +151,7 @@ func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { } }() } -func (g *gracefulManager) doShutdown() { +func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } @@ -158,48 +168,47 @@ func (g *gracefulManager) doShutdown() { g.doHammerTime(0) <-time.After(1 * time.Second) g.doTerminate() + g.WaitForTerminate() + g.lock.Lock() + close(g.done) + g.lock.Unlock() }() } -func (g *gracefulManager) doHammerTime(d time.Duration) { +func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) + g.lock.Lock() select { case <-g.hammer: default: log.Warn("Setting Hammer condition") close(g.hammer) } - + g.lock.Unlock() } -func (g *gracefulManager) doTerminate() { +func (g *Manager) doTerminate() { if !g.setStateTransition(stateShuttingDown, stateTerminate) { return } g.lock.Lock() - close(g.terminate) + select { + case <-g.terminate: + default: + log.Warn("Terminating") + close(g.terminate) + } g.lock.Unlock() } // IsChild returns if the current process is a child of previous Gitea process -func (g *gracefulManager) IsChild() bool { +func (g *Manager) IsChild() bool { return g.isChild } // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate -func (g *gracefulManager) IsShutdown() <-chan struct{} { - g.lock.RLock() - if g.shutdown == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.shutdown == nil { - g.shutdown = make(chan struct{}) - } - defer g.lock.Unlock() - return g.shutdown - } - defer g.lock.RUnlock() +func (g *Manager) IsShutdown() <-chan struct{} { return g.shutdown } @@ -207,65 +216,43 @@ func (g *gracefulManager) IsShutdown() <-chan struct{} { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // Servers running within the running server wait group should respond to IsHammer // if not shutdown already -func (g *gracefulManager) IsHammer() <-chan struct{} { - g.lock.RLock() - if g.hammer == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.hammer == nil { - g.hammer = make(chan struct{}) - } - defer g.lock.Unlock() - return g.hammer - } - defer g.lock.RUnlock() +func (g *Manager) IsHammer() <-chan struct{} { return g.hammer } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped -func (g *gracefulManager) IsTerminate() <-chan struct{} { - g.lock.RLock() - if g.terminate == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.terminate == nil { - g.terminate = make(chan struct{}) - } - defer g.lock.Unlock() - return g.terminate - } - defer g.lock.RUnlock() +func (g *Manager) IsTerminate() <-chan struct{} { return g.terminate } // ServerDone declares a running server done and subtracts one from the // running server wait group. Users probably do not want to call this // and should use one of the RunWithShutdown* functions -func (g *gracefulManager) ServerDone() { +func (g *Manager) ServerDone() { g.runningServerWaitGroup.Done() } // WaitForServers waits for all running servers to finish. Users should probably // instead use AtTerminate or IsTerminate -func (g *gracefulManager) WaitForServers() { +func (g *Manager) WaitForServers() { g.runningServerWaitGroup.Wait() } // WaitForTerminate waits for all terminating actions to finish. // Only the main go-routine should use this -func (g *gracefulManager) WaitForTerminate() { +func (g *Manager) WaitForTerminate() { g.terminateWaitGroup.Wait() } -func (g *gracefulManager) getState() state { +func (g *Manager) getState() state { g.lock.RLock() defer g.lock.RUnlock() return g.state } -func (g *gracefulManager) setStateTransition(old, new state) bool { +func (g *Manager) setStateTransition(old, new state) bool { if old != g.getState() { return false } @@ -279,7 +266,7 @@ func (g *gracefulManager) setStateTransition(old, new state) bool { return true } -func (g *gracefulManager) setState(st state) { +func (g *Manager) setState(st state) { g.lock.Lock() defer g.lock.Unlock() @@ -288,6 +275,31 @@ func (g *gracefulManager) setState(st state) { // InformCleanup tells the cleanup wait group that we have either taken a listener // or will not be taking a listener -func (g *gracefulManager) InformCleanup() { +func (g *Manager) InformCleanup() { g.createServerWaitGroup.Done() } + +// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating +func (g *Manager) Done() <-chan struct{} { + return g.done +} + +// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +func (g *Manager) Err() error { + select { + case <-g.Done(): + return ErrTerminate + default: + return nil + } +} + +// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +func (g *Manager) Value(key interface{}) interface{} { + return nil +} + +// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context +func (g *Manager) Deadline() (deadline time.Time, ok bool) { + return +} diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 1ffc59f0df..323c6a4111 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -19,7 +19,8 @@ import ( "code.gitea.io/gitea/modules/setting" ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { isChild bool forked bool lock *sync.RWMutex @@ -27,27 +28,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1, lock: &sync.RWMutex{}, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run(ctx) + manager.start(ctx) return manager } -func (g *gracefulManager) Run(ctx context.Context) { +func (g *Manager) start(ctx context.Context) { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state & handle signals g.setState(stateRunning) go g.handleSignals(ctx) - c := make(chan struct{}) + + // Handle clean up of unused provided listeners and delayed start-up + startupDone := make(chan struct{}) go func() { - defer close(c) + defer close(startupDone) // Wait till we're done getting all of the listeners and then close // the unused ones g.createServerWaitGroup.Wait() @@ -58,9 +69,19 @@ func (g *gracefulManager) Run(ctx context.Context) { if setting.StartupTimeout > 0 { go func() { select { - case <-c: + case <-startupDone: return case <-g.IsShutdown(): + func() { + // When waitgroup counter goes negative it will panic - we don't care about this so we can just ignore it. + defer func() { + _ = recover() + }() + // Ensure that the createServerWaitGroup stops waiting + for { + g.createServerWaitGroup.Done() + } + }() return case <-time.After(setting.StartupTimeout): log.Error("Startup took too long! Shutting down") @@ -70,7 +91,7 @@ func (g *gracefulManager) Run(ctx context.Context) { } } -func (g *gracefulManager) handleSignals(ctx context.Context) { +func (g *Manager) handleSignals(ctx context.Context) { signalChannel := make(chan os.Signal, 1) signal.Notify( @@ -123,7 +144,7 @@ func (g *gracefulManager) handleSignals(ctx context.Context) { } } -func (g *gracefulManager) doFork() error { +func (g *Manager) doFork() error { g.lock.Lock() if g.forked { g.lock.Unlock() @@ -139,7 +160,9 @@ func (g *gracefulManager) doFork() error { return err } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { KillParent() g.runningServerWaitGroup.Add(1) } diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 26c791e6ed..526fc0bd24 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -30,7 +30,8 @@ const ( acceptHammerCode = svc.Accepted(hammerCode) ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { ctx context.Context isChild bool lock *sync.RWMutex @@ -38,27 +39,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: false, lock: &sync.RWMutex{}, ctx: ctx, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run() + manager.start() return manager } -func (g *gracefulManager) Run() { +func (g *Manager) start() { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return } + + // Make SVC process run := svc.Run isInteractive, err := svc.IsAnInteractiveSession() if err != nil { @@ -71,8 +82,8 @@ func (g *gracefulManager) Run() { go run(WindowsServiceName, g) } -// Execute makes gracefulManager implement svc.Handler -func (g *gracefulManager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { +// Execute makes Manager implement svc.Handler +func (g *Manager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { if setting.StartupTimeout > 0 { status <- svc.Status{State: svc.StartPending} } else { @@ -141,11 +152,13 @@ hammerLoop: return false, 0 } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { g.runningServerWaitGroup.Add(1) } -func (g *gracefulManager) awaitServer(limit time.Duration) bool { +func (g *Manager) awaitServer(limit time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) diff --git a/modules/graceful/net_unix.go b/modules/graceful/net_unix.go index 5550c09f42..1e496e9d91 100644 --- a/modules/graceful/net_unix.go +++ b/modules/graceful/net_unix.go @@ -101,7 +101,7 @@ func CloseProvidedListeners() error { // creates a new one using net.Listen. func GetListener(network, address string) (net.Listener, error) { // Add a deferral to say that we've tried to grab a listener - defer Manager.InformCleanup() + defer GetManager().InformCleanup() switch network { case "tcp", "tcp4", "tcp6": tcpAddr, err := net.ResolveTCPAddr(network, address) diff --git a/modules/graceful/restart_unix.go b/modules/graceful/restart_unix.go index 3fc4f0511d..9a94e5fa67 100644 --- a/modules/graceful/restart_unix.go +++ b/modules/graceful/restart_unix.go @@ -22,7 +22,7 @@ var killParent sync.Once // KillParent sends the kill signal to the parent process if we are a child func KillParent() { killParent.Do(func() { - if Manager.IsChild() { + if GetManager().IsChild() { ppid := syscall.Getppid() if ppid > 1 { _ = syscall.Kill(ppid, syscall.SIGTERM) diff --git a/modules/graceful/server.go b/modules/graceful/server.go index c6692cbb75..30fb8cdffa 100644 --- a/modules/graceful/server.go +++ b/modules/graceful/server.go @@ -47,7 +47,7 @@ type Server struct { // NewServer creates a server on network at provided address func NewServer(network, address string) *Server { - if Manager.IsChild() { + if GetManager().IsChild() { log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) } else { log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) @@ -138,12 +138,12 @@ func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFun func (srv *Server) Serve(serve ServeFunction) error { defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) srv.setState(stateRunning) - Manager.RegisterServer() + GetManager().RegisterServer() err := serve(srv.listener) log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) srv.wg.Wait() srv.setState(stateTerminate) - Manager.ServerDone() + GetManager().ServerDone() // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil if err != nil && strings.Contains(err.Error(), "use of closed") { return nil diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go index 74b0fcb885..c634905711 100644 --- a/modules/graceful/server_hooks.go +++ b/modules/graceful/server_hooks.go @@ -14,15 +14,15 @@ import ( // awaitShutdown waits for the shutdown signal from the Manager func (srv *Server) awaitShutdown() { select { - case <-Manager.IsShutdown(): + case <-GetManager().IsShutdown(): // Shutdown srv.doShutdown() - case <-Manager.IsHammer(): + case <-GetManager().IsHammer(): // Hammer srv.doShutdown() srv.doHammer() } - <-Manager.IsHammer() + <-GetManager().IsHammer() srv.doHammer() } diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index c8e1bb1879..bb2fc5bc74 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -6,6 +6,7 @@ package code import ( "fmt" + "os" "strconv" "strings" "time" @@ -34,10 +35,11 @@ func InitRepoIndexer() { return } waitChannel := make(chan time.Duration) + // FIXME: graceful: This should use a persistable queue repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() - log.Info("Initializing Repository Indexer") + log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) initRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) @@ -45,7 +47,7 @@ func InitRepoIndexer() { if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { @@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error { return nil } - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err - } - var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { return err @@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error { // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. +// FIXME: graceful: This should use a persistable queue func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { - repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) - err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) if err != nil { log.Error("populateRepoIndexer: %v", err) return - } else if len(repos) == 0 { + } else if len(ids) == 0 { break } - for _, repo := range repos { + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, + repoID: id, deleted: false, } - maxRepoID = repo.ID - 1 + maxRepoID = id - 1 } } - log.Info("Done populating the repo indexer with existing repositories") + log.Info("Done (re)populating the repo indexer with existing repositories") } func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) } sha, err := getDefaultBranchSha(repo) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) } changes, err := getRepoChanges(repo, sha) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } else if changes == nil { return nil } @@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) } } for _, filename := range changes.RemovedFilenames { if err := addDelete(filename, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) } } if err = batch.Flush(); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } return repo.UpdateIndexerStatus(sha) } @@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, func processRepoIndexerOperationQueue() { for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("deleteRepoFromIndexer: %v", err) + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = deleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) + for _, watcher := range op.watchers { + watcher <- err } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return } - for _, watcher := range op.watchers { - watcher <- err - } + } } diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go index 31f0fa7f3d..bc5f317b7d 100644 --- a/modules/indexer/code/repo.go +++ b/modules/indexer/code/repo.go @@ -5,9 +5,13 @@ package code import ( + "context" + "os" "strings" "sync" + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -104,21 +108,50 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) func initRepoIndexer(populateIndexer func() error) { indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { - log.Fatal("InitRepoIndexer: %v", err) + log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) } if indexer != nil { indexerHolder.set(indexer) + closeAtTerminate() + + // Continue population from where left off + if err = populateIndexer(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } return } if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { log.Fatal("CreateRepoIndexer: %v", err) } + closeAtTerminate() + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("DeleteAllRepoIndexerStatus: %v", err) + } + if err = populateIndexer(); err != nil { log.Fatal("PopulateRepoIndex: %v", err) } } +func closeAtTerminate() { + graceful.GetManager().RunAtTerminate(context.Background(), func() { + log.Debug("Closing repo indexer") + indexer := indexerHolder.get() + if indexer != nil { + err := indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) + } + } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + }) +} + // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { docMapping := bleve.NewDocumentMapping() diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 78eba58095..50b8d6d224 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -172,7 +172,7 @@ func InitIssueIndexer(syncReindex bool) { } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { diff --git a/modules/migrations/update.go b/modules/migrations/update.go index d1465b2baf..3d0962657c 100644 --- a/modules/migrations/update.go +++ b/modules/migrations/update.go @@ -5,21 +5,28 @@ package migrations import ( + "context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/structs" ) // UpdateMigrationPosterID updates all migrated repositories' issues and comments posterID -func UpdateMigrationPosterID() { +func UpdateMigrationPosterID(ctx context.Context) { for _, gitService := range structs.SupportedFullGitService { - if err := updateMigrationPosterIDByGitService(gitService); err != nil { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterID aborted due to shutdown before %s", gitService.Name()) + default: + } + if err := updateMigrationPosterIDByGitService(ctx, gitService); err != nil { log.Error("updateMigrationPosterIDByGitService failed: %v", err) } } } -func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { +func updateMigrationPosterIDByGitService(ctx context.Context, tp structs.GitServiceType) error { provider := tp.Name() if len(provider) == 0 { return nil @@ -28,6 +35,13 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { const batchSize = 100 var start int for { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } + users, err := models.FindExternalUsersByProvider(models.FindExternalUserOptions{ Provider: provider, Start: start, @@ -38,6 +52,12 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { } for _, user := range users { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } externalUserID := user.ExternalID if err := models.UpdateMigrationsByType(tp, externalUserID, user.UserID); err != nil { log.Error("UpdateMigrationsByType type %s external user id %v to local user id %v failed: %v", tp.Name(), user.ExternalID, user.UserID, err) diff --git a/modules/ssh/ssh_graceful.go b/modules/ssh/ssh_graceful.go index 4d7557e2ee..f8370ab4db 100644 --- a/modules/ssh/ssh_graceful.go +++ b/modules/ssh/ssh_graceful.go @@ -24,5 +24,5 @@ func listen(server *ssh.Server) { // Unused informs our cleanup routine that we will not be using a ssh port func Unused() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index de694d8560..14644c7d4e 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -1,4 +1,5 @@ // Copyright 2016 The Gogs Authors. All rights reserved. +// Copyright 2019 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. @@ -15,8 +16,9 @@ import ( // This queue is particularly useful for preventing duplicated task // of same purpose. type UniqueQueue struct { - table *StatusTable - queue chan string + table *StatusTable + queue chan string + closed chan struct{} } // NewUniqueQueue initializes and returns a new UniqueQueue object. @@ -26,11 +28,43 @@ func NewUniqueQueue(queueLength int) *UniqueQueue { } return &UniqueQueue{ - table: NewStatusTable(), - queue: make(chan string, queueLength), + table: NewStatusTable(), + queue: make(chan string, queueLength), + closed: make(chan struct{}), } } +// Close closes this queue +func (q *UniqueQueue) Close() { + select { + case <-q.closed: + default: + q.table.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.table.lock.Unlock() + } +} + +// IsClosed returns a channel that is closed when this Queue is closed +func (q *UniqueQueue) IsClosed() <-chan struct{} { + return q.closed +} + +// IDs returns the current ids in the pool +func (q *UniqueQueue) IDs() []interface{} { + q.table.lock.Lock() + defer q.table.lock.Unlock() + ids := make([]interface{}, 0, len(q.table.pool)) + for id := range q.table.pool { + ids = append(ids, id) + } + return ids +} + // Queue returns channel of queue for retrieving instances. func (q *UniqueQueue) Queue() <-chan string { return q.queue @@ -45,18 +79,22 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - if q.Exist(id) { - return - } - idStr := com.ToStr(id) q.table.lock.Lock() + if _, ok := q.table.pool[idStr]; ok { + return + } q.table.pool[idStr] = struct{}{} if fn != nil { fn() } q.table.lock.Unlock() - q.queue <- idStr + select { + case <-q.closed: + return + case q.queue <- idStr: + return + } } // Add adds new instance to the queue. diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index b262505cea..9f5c938f83 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -5,6 +5,7 @@ package webhook import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -16,6 +17,7 @@ import ( "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -145,8 +147,14 @@ func Deliver(t *models.HookTask) error { } // DeliverHooks checks and delivers undelivered hooks. -// TODO: shoot more hooks at same time. -func DeliverHooks() { +// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue +// or a full queue. Then more hooks could be sent at same time. +func DeliverHooks(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } tasks, err := models.FindUndeliveredHookTasks() if err != nil { log.Error("DeliverHooks: %v", err) @@ -155,33 +163,50 @@ func DeliverHooks() { // Update hook task status. for _, t := range tasks { + select { + case <-ctx.Done(): + return + default: + } if err = Deliver(t); err != nil { log.Error("deliver: %v", err) } } // Start listening on new hook requests. - for repoIDStr := range hookQueue.Queue() { - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) + for { + select { + case <-ctx.Done(): + hookQueue.Close() + return + case repoIDStr := <-hookQueue.Queue(): + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + hookQueue.Remove(repoIDStr) - repoID, err := com.StrTo(repoIDStr).Int64() - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } + repoID, err := com.StrTo(repoIDStr).Int64() + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } - tasks, err := models.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - if err = Deliver(t); err != nil { - log.Error("deliver: %v", err) + tasks, err := models.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + select { + case <-ctx.Done(): + return + default: + } + if err = Deliver(t); err != nil { + log.Error("deliver: %v", err) + } } } } + } var ( @@ -234,5 +259,5 @@ func InitDeliverHooks() { }, } - go DeliverHooks() + go graceful.GetManager().RunWithShutdownContext(DeliverHooks) } diff --git a/routers/admin/admin.go b/routers/admin/admin.go index 9f155ff008..ccedcaf8a6 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -19,6 +19,7 @@ import ( "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/cron" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -171,10 +172,10 @@ func Dashboard(ctx *context.Context) { err = models.ReinitMissingRepositories() case syncExternalUsers: success = ctx.Tr("admin.dashboard.sync_external_users_started") - go models.SyncExternalUsers() + go graceful.GetManager().RunWithShutdownContext(models.SyncExternalUsers) case gitFsck: success = ctx.Tr("admin.dashboard.git_fsck_started") - go models.GitFsck() + go graceful.GetManager().RunWithShutdownContext(models.GitFsck) case deleteGeneratedRepositoryAvatars: success = ctx.Tr("admin.dashboard.delete_generated_repository_avatars_success") err = models.RemoveRandomAvatars() diff --git a/routers/init.go b/routers/init.go index 81418a4ad5..01df15d6c5 100644 --- a/routers/init.go +++ b/routers/init.go @@ -5,6 +5,7 @@ package routers import ( + "context" "strings" "time" @@ -53,11 +54,11 @@ func NewServices() { } // In case of problems connecting to DB, retry connection. Eg, PGSQL in Docker Container on Synology -func initDBEngine() (err error) { +func initDBEngine(ctx context.Context) (err error) { log.Info("Beginning ORM engine initialization.") for i := 0; i < setting.Database.DBConnectRetries; i++ { log.Info("ORM engine initialization attempt #%d/%d...", i+1, setting.Database.DBConnectRetries) - if err = models.NewEngine(migrations.Migrate); err == nil { + if err = models.NewEngine(ctx, migrations.Migrate); err == nil { break } else if i == setting.Database.DBConnectRetries-1 { return err @@ -71,9 +72,9 @@ func initDBEngine() (err error) { } // GlobalInit is for global configuration reload-able. -func GlobalInit() { +func GlobalInit(ctx context.Context) { setting.NewContext() - if err := git.Init(); err != nil { + if err := git.Init(ctx); err != nil { log.Fatal("Git module init failed: %v", err) } setting.CheckLFSVersion() @@ -88,7 +89,7 @@ func GlobalInit() { highlight.NewContext() external.RegisterParsers() markup.Init() - if err := initDBEngine(); err == nil { + if err := initDBEngine(ctx); err == nil { log.Info("ORM engine initialization successful!") } else { log.Fatal("ORM engine initialization failed: %v", err) diff --git a/routers/install.go b/routers/install.go index 53880d2c46..7395aeee84 100644 --- a/routers/install.go +++ b/routers/install.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/generate" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/user" @@ -351,7 +352,7 @@ func InstallPost(ctx *context.Context, form auth.InstallForm) { return } - GlobalInit() + GlobalInit(graceful.GetManager().HammerContext()) // Create admin account if len(form.AdminName) > 0 { diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 1ad9448b6b..7fc6e97b46 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -5,11 +5,14 @@ package mirror import ( + "context" "fmt" "net/url" "strings" "time" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/git" @@ -294,29 +297,38 @@ func Password(m *models.Mirror) string { } // Update checks and updates mirror repositories. -func Update() { +func Update(ctx context.Context) { log.Trace("Doing: Update") - if err := models.MirrorsIterate(func(idx int, bean interface{}) error { m := bean.(*models.Mirror) if m.Repo == nil { log.Error("Disconnected mirror repository found: %d", m.ID) return nil } - - mirrorQueue.Add(m.RepoID) - return nil + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + mirrorQueue.Add(m.RepoID) + return nil + } }); err != nil { log.Error("Update: %v", err) } } // SyncMirrors checks and syncs mirrors. -// TODO: sync more mirrors at same time. -func SyncMirrors() { +// FIXME: graceful: this should be a persistable queue +func SyncMirrors(ctx context.Context) { // Start listening on new sync requests. - for repoID := range mirrorQueue.Queue() { - syncMirror(repoID) + for { + select { + case <-ctx.Done(): + mirrorQueue.Close() + return + case repoID := <-mirrorQueue.Queue(): + syncMirror(repoID) + } } } @@ -416,7 +428,7 @@ func syncMirror(repoID string) { // InitSyncMirrors initializes a go routine to sync the mirrors func InitSyncMirrors() { - go SyncMirrors() + go graceful.GetManager().RunWithShutdownContext(SyncMirrors) } // StartToMirror adds repoID to mirror queue diff --git a/services/pull/check.go b/services/pull/check.go index fc2ac927b8..7344f071ac 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -6,6 +6,7 @@ package pull import ( + "context" "fmt" "io/ioutil" "os" @@ -16,6 +17,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/sync" @@ -151,65 +153,53 @@ func manuallyMerged(pr *models.PullRequest) bool { // TestPullRequests checks and tests untested patches of pull requests. // TODO: test more pull requests at same time. -func TestPullRequests() { - prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking) - if err != nil { - log.Error("Find Checking PRs: %v", err) - return - } +func TestPullRequests(ctx context.Context) { - var checkedPRs = make(map[int64]struct{}) - - // Update pull request status. - for _, pr := range prs { - checkedPRs[pr.ID] = struct{}{} - if err := pr.GetBaseRepo(); err != nil { - log.Error("GetBaseRepo: %v", err) - continue + go func() { + prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) + if err != nil { + log.Error("Find Checking PRs: %v", err) + return } - if manuallyMerged(pr) { - continue + for _, prID := range prs { + select { + case <-ctx.Done(): + return + default: + pullRequestQueue.Add(prID) + } } - if err := TestPatch(pr); err != nil { - log.Error("testPatch: %v", err) - continue - } - - checkAndUpdateStatus(pr) - } + }() // Start listening on new test requests. - for prID := range pullRequestQueue.Queue() { - log.Trace("TestPullRequests[%v]: processing test task", prID) - pullRequestQueue.Remove(prID) + for { + select { + case prID := <-pullRequestQueue.Queue(): + log.Trace("TestPullRequests[%v]: processing test task", prID) + pullRequestQueue.Remove(prID) - id := com.StrTo(prID).MustInt64() - if _, ok := checkedPRs[id]; ok { - continue - } + id := com.StrTo(prID).MustInt64() - pr, err := models.GetPullRequestByID(id) - if err != nil { - log.Error("GetPullRequestByID[%s]: %v", prID, err) - continue - } else if manuallyMerged(pr) { - continue + pr, err := models.GetPullRequestByID(id) + if err != nil { + log.Error("GetPullRequestByID[%s]: %v", prID, err) + continue + } else if manuallyMerged(pr) { + continue + } else if err = TestPatch(pr); err != nil { + log.Error("testPatch[%d]: %v", pr.ID, err) + continue + } + checkAndUpdateStatus(pr) + case <-ctx.Done(): + pullRequestQueue.Close() + log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) + return } - pr.Status = models.PullRequestStatusChecking - if err := pr.Update(); err != nil { - log.Error("testPatch[%d]: Unable to update status to Checking Status %v", pr.ID, err) - continue - } - if err = TestPatch(pr); err != nil { - log.Error("testPatch[%d]: %v", pr.ID, err) - continue - } - - checkAndUpdateStatus(pr) } } // Init runs the task queue to test all the checking status pull requests func Init() { - go TestPullRequests() + go graceful.GetManager().RunWithShutdownContext(TestPullRequests) } diff --git a/services/pull/pull.go b/services/pull/pull.go index 6447c8a87f..e7f4e4eede 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -5,12 +5,14 @@ package pull import ( + "context" "fmt" "os" "path" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" issue_service "code.gitea.io/gitea/services/issue" @@ -54,6 +56,7 @@ func checkForInvalidation(requests models.PullRequestList, repoID int64, doer *m return fmt.Errorf("git.OpenRepository: %v", err) } go func() { + // FIXME: graceful: We need to tell the manager we're doing something... err := requests.InvalidateCodeComments(doer, gitRepo, branch) if err != nil { log.Error("PullRequestList.InvalidateCodeComments: %v", err) @@ -79,39 +82,45 @@ func addHeadRepoTasks(prs []*models.PullRequest) { // and generate new patch for testing as needed. func AddTestPullRequestTask(doer *models.User, repoID int64, branch string, isSync bool) { log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch) - prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) - return - } + graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) { + // There is no sensible way to shut this down ":-(" + // If you don't let it run all the way then you will lose data + // FIXME: graceful: AddTestPullRequestTask needs to become a queue! - if isSync { - requests := models.PullRequestList(prs) - if err = requests.LoadAttributes(); err != nil { - log.Error("PullRequestList.LoadAttributes: %v", err) + prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) + return } - if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { - log.Error("checkForInvalidation: %v", invalidationErr) - } - if err == nil { - for _, pr := range prs { - pr.Issue.PullRequest = pr - notification.NotifyPullRequestSynchronized(doer, pr) + + if isSync { + requests := models.PullRequestList(prs) + if err = requests.LoadAttributes(); err != nil { + log.Error("PullRequestList.LoadAttributes: %v", err) + } + if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { + log.Error("checkForInvalidation: %v", invalidationErr) + } + if err == nil { + for _, pr := range prs { + pr.Issue.PullRequest = pr + notification.NotifyPullRequestSynchronized(doer, pr) + } } } - } - addHeadRepoTasks(prs) + addHeadRepoTasks(prs) - log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) - prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) - return - } - for _, pr := range prs { - AddToTaskQueue(pr) - } + log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) + prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) + return + } + for _, pr := range prs { + AddToTaskQueue(pr) + } + }) } // PushToBaseRepo pushes commits from branches of head repository to