From b223d361955f8b722f7dd0b358b2e57e6f359edf Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Thu, 24 Jun 2021 05:12:38 +0800 Subject: [PATCH] Rework repository archive (#14723) * Use storage to store archive files * Fix backend lint * Add archiver table on database * Finish archive download * Fix test * Add database migrations * Add status for archiver * Fix lint * Add queue * Add doctor to check and delete old archives * Improve archive queue * Fix tests * improve archive storage * Delete repo archives * Add missing fixture * fix fixture * Fix fixture * Fix test * Fix archiver cleaning * Fix bug * Add docs for repository archive storage * remove repo-archive configuration * Fix test * Fix test * Fix lint Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick --- custom/conf/app.example.ini | 10 + .../doc/advanced/config-cheat-sheet.en-us.md | 17 + .../doc/advanced/config-cheat-sheet.zh-cn.md | 15 + .../user27/repo49.git/refs/heads/test/archive | 1 + models/fixtures/repo_archiver.yml | 1 + models/migrations/migrations.go | 2 + models/migrations/v181.go | 1 + models/migrations/v185.go | 22 + models/models.go | 1 + models/repo.go | 97 ++-- models/repo_archiver.go | 86 ++++ models/unit_tests.go | 2 + modules/context/context.go | 15 + modules/doctor/checkOldArchives.go | 59 +++ .../{commit_archive.go => repo_archive.go} | 31 +- modules/setting/repository.go | 6 + modules/setting/storage.go | 4 + modules/storage/storage.go | 15 +- routers/api/v1/repo/file.go | 3 +- routers/common/repo.go | 26 -- routers/init.go | 4 + routers/web/repo/repo.go | 122 ++++- routers/web/web.go | 3 +- services/archiver/archiver.go | 428 ++++++++---------- services/archiver/archiver_test.go | 157 ++----- 25 files changed, 648 insertions(+), 480 deletions(-) create mode 100644 integrations/gitea-repositories-meta/user27/repo49.git/refs/heads/test/archive create mode 100644 models/fixtures/repo_archiver.yml create mode 100644 models/migrations/v185.go create mode 100644 models/repo_archiver.go create mode 100644 modules/doctor/checkOldArchives.go rename modules/git/{commit_archive.go => repo_archive.go} (60%) diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index 38a27509f..5adfb0546 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2048,6 +2048,16 @@ PATH = ;; storage type ;STORAGE_TYPE = local +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; settings for repository archives, will override storage setting +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;[storage.repo-archive] +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; storage type +;STORAGE_TYPE = local + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; lfs storage will override storage diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 8f1f9ce42..5e976174f 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -995,6 +995,23 @@ MINIO_USE_SSL = false And used by `[attachment]`, `[lfs]` and etc. as `STORAGE_TYPE`. +## Repository Archive Storage (`storage.repo-archive`) + +Configuration for repository archive storage. It will inherit from default `[storage]` or +`[storage.xxx]` when set `STORAGE_TYPE` to `xxx`. The default of `PATH` +is `data/repo-archive` and the default of `MINIO_BASE_PATH` is `repo-archive/`. + +- `STORAGE_TYPE`: **local**: Storage type for repo archive, `local` for local disk or `minio` for s3 compatible object storage service or other name defined with `[storage.xxx]` +- `SERVE_DIRECT`: **false**: Allows the storage driver to redirect to authenticated URLs to serve files directly. Currently, only Minio/S3 is supported via signed URLs, local does nothing. +- `PATH`: **./data/repo-archive**: Where to store archive files, only available when `STORAGE_TYPE` is `local`. +- `MINIO_ENDPOINT`: **localhost:9000**: Minio endpoint to connect only available when `STORAGE_TYPE` is `minio` +- `MINIO_ACCESS_KEY_ID`: Minio accessKeyID to connect only available when `STORAGE_TYPE` is `minio` +- `MINIO_SECRET_ACCESS_KEY`: Minio secretAccessKey to connect only available when `STORAGE_TYPE is` `minio` +- `MINIO_BUCKET`: **gitea**: Minio bucket to store the lfs only available when `STORAGE_TYPE` is `minio` +- `MINIO_LOCATION`: **us-east-1**: Minio location to create bucket only available when `STORAGE_TYPE` is `minio` +- `MINIO_BASE_PATH`: **repo-archive/**: Minio base path on the bucket only available when `STORAGE_TYPE` is `minio` +- `MINIO_USE_SSL`: **false**: Minio enabled ssl only available when `STORAGE_TYPE` is `minio` + ## Other (`other`) - `SHOW_FOOTER_BRANDING`: **false**: Show Gitea branding in the footer. diff --git a/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md b/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md index 79cfd94cc..2303a631d 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md +++ b/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md @@ -382,6 +382,21 @@ MINIO_USE_SSL = false 然后你在 `[attachment]`, `[lfs]` 等中可以把这个名字用作 `STORAGE_TYPE` 的值。 +## Repository Archive Storage (`storage.repo-archive`) + +Repository archive 的存储配置。 如果 `STORAGE_TYPE` 为空,则此配置将从 `[storage]` 继承。如果不为 `local` 或者 `minio` 而为 `xxx`, 则从 `[storage.xxx]` 继承。当继承时, `PATH` 默认为 `data/repo-archive`,`MINIO_BASE_PATH` 默认为 `repo-archive/`。 + +- `STORAGE_TYPE`: **local**: Repository archive 的存储类型,`local` 将存储到磁盘,`minio` 将存储到 s3 兼容的对象服务。 +- `SERVE_DIRECT`: **false**: 允许直接重定向到存储系统。当前,仅 Minio/S3 是支持的。 +- `PATH`: 存放 Repository archive 上传的文件的地方,默认是 `data/repo-archive`。 +- `MINIO_ENDPOINT`: **localhost:9000**: Minio 地址,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_ACCESS_KEY_ID`: Minio accessKeyID,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_SECRET_ACCESS_KEY`: Minio secretAccessKey,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_BUCKET`: **gitea**: Minio bucket,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_LOCATION`: **us-east-1**: Minio location ,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_BASE_PATH`: **repo-archive/**: Minio base path ,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 +- `MINIO_USE_SSL`: **false**: Minio 是否启用 ssl ,仅当 `STORAGE_TYPE` 为 `minio` 时有效。 + ## Other (`other`) - `SHOW_FOOTER_BRANDING`: 为真则在页面底部显示Gitea的字样。 diff --git a/integrations/gitea-repositories-meta/user27/repo49.git/refs/heads/test/archive b/integrations/gitea-repositories-meta/user27/repo49.git/refs/heads/test/archive new file mode 100644 index 000000000..0f13243bf --- /dev/null +++ b/integrations/gitea-repositories-meta/user27/repo49.git/refs/heads/test/archive @@ -0,0 +1 @@ +aacbdfe9e1c4b47f60abe81849045fa4e96f1d75 diff --git a/models/fixtures/repo_archiver.yml b/models/fixtures/repo_archiver.yml new file mode 100644 index 000000000..ca780a73a --- /dev/null +++ b/models/fixtures/repo_archiver.yml @@ -0,0 +1 @@ +[] # empty diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 880f55092..4e17a6a2c 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -319,6 +319,8 @@ var migrations = []Migration{ NewMigration("Create PushMirror table", createPushMirrorTable), // v184 -> v185 NewMigration("Rename Task errors to message", renameTaskErrorsToMessage), + // v185 -> v186 + NewMigration("Add new table repo_archiver", addRepoArchiver), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v181.go b/models/migrations/v181.go index 6ba4edc15..65045593a 100644 --- a/models/migrations/v181.go +++ b/models/migrations/v181.go @@ -1,3 +1,4 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. diff --git a/models/migrations/v185.go b/models/migrations/v185.go new file mode 100644 index 000000000..096994889 --- /dev/null +++ b/models/migrations/v185.go @@ -0,0 +1,22 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package migrations + +import ( + "xorm.io/xorm" +) + +func addRepoArchiver(x *xorm.Engine) error { + // RepoArchiver represents all archivers + type RepoArchiver struct { + ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"index unique(s)"` + Type int `xorm:"unique(s)"` + Status int + CommitID string `xorm:"VARCHAR(40) unique(s)"` + CreatedUnix int64 `xorm:"INDEX NOT NULL created"` + } + return x.Sync2(new(RepoArchiver)) +} diff --git a/models/models.go b/models/models.go index c325fd381..3266be0f4 100644 --- a/models/models.go +++ b/models/models.go @@ -136,6 +136,7 @@ func init() { new(RepoTransfer), new(IssueIndex), new(PushMirror), + new(RepoArchiver), ) gonicNames := []string{"SSL", "UID"} diff --git a/models/repo.go b/models/repo.go index dc4e03a28..2baf6e9bd 100644 --- a/models/repo.go +++ b/models/repo.go @@ -1587,6 +1587,22 @@ func DeleteRepository(doer *User, uid, repoID int64) error { return err } + // Remove archives + var archives []*RepoArchiver + if err = sess.Where("repo_id=?", repoID).Find(&archives); err != nil { + return err + } + + for _, v := range archives { + v.Repo = repo + p, _ := v.RelativePath() + removeStorageWithNotice(sess, storage.RepoArchives, "Delete repo archive file", p) + } + + if _, err := sess.Delete(&RepoArchiver{RepoID: repoID}); err != nil { + return err + } + if repo.NumForks > 0 { if _, err = sess.Exec("UPDATE `repository` SET fork_id=0,is_fork=? WHERE fork_id=?", false, repo.ID); err != nil { log.Error("reset 'fork_id' and 'is_fork': %v", err) @@ -1768,64 +1784,45 @@ func DeleteRepositoryArchives(ctx context.Context) error { func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error { log.Trace("Doing: ArchiveCleanup") - if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error { - return deleteOldRepositoryArchives(ctx, olderThan, idx, bean) - }); err != nil { - log.Trace("Error: ArchiveClean: %v", err) - return err + for { + var archivers []RepoArchiver + err := x.Where("created_unix < ?", time.Now().Add(-olderThan).Unix()). + Asc("created_unix"). + Limit(100). + Find(&archivers) + if err != nil { + log.Trace("Error: ArchiveClean: %v", err) + return err + } + + for _, archiver := range archivers { + if err := deleteOldRepoArchiver(ctx, &archiver); err != nil { + return err + } + } + if len(archivers) < 100 { + break + } } log.Trace("Finished: ArchiveCleanup") return nil } -func deleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration, idx int, bean interface{}) error { - repo := bean.(*Repository) - basePath := filepath.Join(repo.RepoPath(), "archives") +var delRepoArchiver = new(RepoArchiver) - for _, ty := range []string{"zip", "targz"} { - select { - case <-ctx.Done(): - return ErrCancelledf("before deleting old repository archives with filetype %s for %s", ty, repo.FullName()) - default: - } - - path := filepath.Join(basePath, ty) - file, err := os.Open(path) - if err != nil { - if !os.IsNotExist(err) { - log.Warn("Unable to open directory %s: %v", path, err) - return err - } - - // If the directory doesn't exist, that's okay. - continue - } - - files, err := file.Readdir(0) - file.Close() - if err != nil { - log.Warn("Unable to read directory %s: %v", path, err) - return err - } - - minimumOldestTime := time.Now().Add(-olderThan) - for _, info := range files { - if info.ModTime().Before(minimumOldestTime) && !info.IsDir() { - select { - case <-ctx.Done(): - return ErrCancelledf("before deleting old repository archive file %s with filetype %s for %s", info.Name(), ty, repo.FullName()) - default: - } - toDelete := filepath.Join(path, info.Name()) - // This is a best-effort purge, so we do not check error codes to confirm removal. - if err = util.Remove(toDelete); err != nil { - log.Trace("Unable to delete %s, but proceeding: %v", toDelete, err) - } - } - } +func deleteOldRepoArchiver(ctx context.Context, archiver *RepoArchiver) error { + p, err := archiver.RelativePath() + if err != nil { + return err + } + _, err = x.ID(archiver.ID).Delete(delRepoArchiver) + if err != nil { + return err + } + if err := storage.RepoArchives.Delete(p); err != nil { + log.Error("delete repo archive file failed: %v", err) } - return nil } diff --git a/models/repo_archiver.go b/models/repo_archiver.go new file mode 100644 index 000000000..833a22ee1 --- /dev/null +++ b/models/repo_archiver.go @@ -0,0 +1,86 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package models + +import ( + "fmt" + + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/timeutil" +) + +// RepoArchiverStatus represents repo archive status +type RepoArchiverStatus int + +// enumerate all repo archive statuses +const ( + RepoArchiverGenerating = iota // the archiver is generating + RepoArchiverReady // it's ready +) + +// RepoArchiver represents all archivers +type RepoArchiver struct { + ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"index unique(s)"` + Repo *Repository `xorm:"-"` + Type git.ArchiveType `xorm:"unique(s)"` + Status RepoArchiverStatus + CommitID string `xorm:"VARCHAR(40) unique(s)"` + CreatedUnix timeutil.TimeStamp `xorm:"INDEX NOT NULL created"` +} + +// LoadRepo loads repository +func (archiver *RepoArchiver) LoadRepo() (*Repository, error) { + if archiver.Repo != nil { + return archiver.Repo, nil + } + + var repo Repository + has, err := x.ID(archiver.RepoID).Get(&repo) + if err != nil { + return nil, err + } + if !has { + return nil, ErrRepoNotExist{ + ID: archiver.RepoID, + } + } + return &repo, nil +} + +// RelativePath returns relative path +func (archiver *RepoArchiver) RelativePath() (string, error) { + repo, err := archiver.LoadRepo() + if err != nil { + return "", err + } + + return fmt.Sprintf("%s/%s/%s.%s", repo.FullName(), archiver.CommitID[:2], archiver.CommitID, archiver.Type.String()), nil +} + +// GetRepoArchiver get an archiver +func GetRepoArchiver(ctx DBContext, repoID int64, tp git.ArchiveType, commitID string) (*RepoArchiver, error) { + var archiver RepoArchiver + has, err := ctx.e.Where("repo_id=?", repoID).And("`type`=?", tp).And("commit_id=?", commitID).Get(&archiver) + if err != nil { + return nil, err + } + if has { + return &archiver, nil + } + return nil, nil +} + +// AddRepoArchiver adds an archiver +func AddRepoArchiver(ctx DBContext, archiver *RepoArchiver) error { + _, err := ctx.e.Insert(archiver) + return err +} + +// UpdateRepoArchiverStatus updates archiver's status +func UpdateRepoArchiverStatus(ctx DBContext, archiver *RepoArchiver) error { + _, err := ctx.e.ID(archiver.ID).Cols("status").Update(archiver) + return err +} diff --git a/models/unit_tests.go b/models/unit_tests.go index 5a145fa2c..f8d681933 100644 --- a/models/unit_tests.go +++ b/models/unit_tests.go @@ -74,6 +74,8 @@ func MainTest(m *testing.M, pathToGiteaRoot string) { setting.RepoAvatar.Storage.Path = filepath.Join(setting.AppDataPath, "repo-avatars") + setting.RepoArchive.Storage.Path = filepath.Join(setting.AppDataPath, "repo-archive") + if err = storage.Init(); err != nil { fatalTestError("storage.Init: %v\n", err) } diff --git a/modules/context/context.go b/modules/context/context.go index 7b3fd2899..64f8b1208 100644 --- a/modules/context/context.go +++ b/modules/context/context.go @@ -380,6 +380,21 @@ func (ctx *Context) ServeFile(file string, names ...string) { http.ServeFile(ctx.Resp, ctx.Req, file) } +// ServeStream serves file via io stream +func (ctx *Context) ServeStream(rd io.Reader, name string) { + ctx.Resp.Header().Set("Content-Description", "File Transfer") + ctx.Resp.Header().Set("Content-Type", "application/octet-stream") + ctx.Resp.Header().Set("Content-Disposition", "attachment; filename="+name) + ctx.Resp.Header().Set("Content-Transfer-Encoding", "binary") + ctx.Resp.Header().Set("Expires", "0") + ctx.Resp.Header().Set("Cache-Control", "must-revalidate") + ctx.Resp.Header().Set("Pragma", "public") + _, err := io.Copy(ctx.Resp, rd) + if err != nil { + ctx.ServerError("Download file failed", err) + } +} + // Error returned an error to web browser func (ctx *Context) Error(status int, contents ...string) { var v = http.StatusText(status) diff --git a/modules/doctor/checkOldArchives.go b/modules/doctor/checkOldArchives.go new file mode 100644 index 000000000..a4e2ffbd1 --- /dev/null +++ b/modules/doctor/checkOldArchives.go @@ -0,0 +1,59 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package doctor + +import ( + "os" + "path/filepath" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" +) + +func checkOldArchives(logger log.Logger, autofix bool) error { + numRepos := 0 + numReposUpdated := 0 + err := iterateRepositories(func(repo *models.Repository) error { + if repo.IsEmpty { + return nil + } + + p := filepath.Join(repo.RepoPath(), "archives") + isDir, err := util.IsDir(p) + if err != nil { + log.Warn("check if %s is directory failed: %v", p, err) + } + if isDir { + numRepos++ + if autofix { + if err := os.RemoveAll(p); err == nil { + numReposUpdated++ + } else { + log.Warn("remove %s failed: %v", p, err) + } + } + } + return nil + }) + + if autofix { + logger.Info("%d / %d old archives in repository deleted", numReposUpdated, numRepos) + } else { + logger.Info("%d old archives in repository need to be deleted", numRepos) + } + + return err +} + +func init() { + Register(&Check{ + Title: "Check old archives", + Name: "check-old-archives", + IsDefault: false, + Run: checkOldArchives, + Priority: 7, + }) +} diff --git a/modules/git/commit_archive.go b/modules/git/repo_archive.go similarity index 60% rename from modules/git/commit_archive.go rename to modules/git/repo_archive.go index d075ba091..07003aa6b 100644 --- a/modules/git/commit_archive.go +++ b/modules/git/repo_archive.go @@ -8,6 +8,7 @@ package git import ( "context" "fmt" + "io" "path/filepath" "strings" ) @@ -33,32 +34,28 @@ func (a ArchiveType) String() string { return "unknown" } -// CreateArchiveOpts represents options for creating an archive -type CreateArchiveOpts struct { - Format ArchiveType - Prefix bool -} - // CreateArchive create archive content to the target path -func (c *Commit) CreateArchive(ctx context.Context, target string, opts CreateArchiveOpts) error { - if opts.Format.String() == "unknown" { - return fmt.Errorf("unknown format: %v", opts.Format) +func (repo *Repository) CreateArchive(ctx context.Context, format ArchiveType, target io.Writer, usePrefix bool, commitID string) error { + if format.String() == "unknown" { + return fmt.Errorf("unknown format: %v", format) } args := []string{ "archive", } - if opts.Prefix { - args = append(args, "--prefix="+filepath.Base(strings.TrimSuffix(c.repo.Path, ".git"))+"/") + if usePrefix { + args = append(args, "--prefix="+filepath.Base(strings.TrimSuffix(repo.Path, ".git"))+"/") } args = append(args, - "--format="+opts.Format.String(), - "-o", - target, - c.ID.String(), + "--format="+format.String(), + commitID, ) - _, err := NewCommandContext(ctx, args...).RunInDir(c.repo.Path) - return err + var stderr strings.Builder + err := NewCommandContext(ctx, args...).RunInDirPipeline(repo.Path, target, &stderr) + if err != nil { + return ConcatenateError(err, stderr.String()) + } + return nil } diff --git a/modules/setting/repository.go b/modules/setting/repository.go index a7666895e..c2a6357d9 100644 --- a/modules/setting/repository.go +++ b/modules/setting/repository.go @@ -251,6 +251,10 @@ var ( } RepoRootPath string ScriptType = "bash" + + RepoArchive = struct { + Storage + }{} ) func newRepository() { @@ -328,4 +332,6 @@ func newRepository() { if !filepath.IsAbs(Repository.Upload.TempPath) { Repository.Upload.TempPath = path.Join(AppWorkPath, Repository.Upload.TempPath) } + + RepoArchive.Storage = getStorage("repo-archive", "", nil) } diff --git a/modules/setting/storage.go b/modules/setting/storage.go index 3ab08d8d2..075152db5 100644 --- a/modules/setting/storage.go +++ b/modules/setting/storage.go @@ -43,6 +43,10 @@ func getStorage(name, typ string, targetSec *ini.Section) Storage { sec.Key("MINIO_LOCATION").MustString("us-east-1") sec.Key("MINIO_USE_SSL").MustBool(false) + if targetSec == nil { + targetSec, _ = Cfg.NewSection(name) + } + var storage Storage storage.Section = targetSec storage.Type = typ diff --git a/modules/storage/storage.go b/modules/storage/storage.go index 984f154db..b3708908f 100644 --- a/modules/storage/storage.go +++ b/modules/storage/storage.go @@ -114,6 +114,9 @@ var ( Avatars ObjectStorage // RepoAvatars represents repository avatars storage RepoAvatars ObjectStorage + + // RepoArchives represents repository archives storage + RepoArchives ObjectStorage ) // Init init the stoarge @@ -130,7 +133,11 @@ func Init() error { return err } - return initLFS() + if err := initLFS(); err != nil { + return err + } + + return initRepoArchives() } // NewStorage takes a storage type and some config and returns an ObjectStorage or an error @@ -169,3 +176,9 @@ func initRepoAvatars() (err error) { RepoAvatars, err = NewStorage(setting.RepoAvatar.Storage.Type, &setting.RepoAvatar.Storage) return } + +func initRepoArchives() (err error) { + log.Info("Initialising Repository Archive storage with type: %s", setting.RepoArchive.Storage.Type) + RepoArchives, err = NewStorage(setting.RepoArchive.Storage.Type, &setting.RepoArchive.Storage) + return +} diff --git a/routers/api/v1/repo/file.go b/routers/api/v1/repo/file.go index 39a60df33..e6427ea4f 100644 --- a/routers/api/v1/repo/file.go +++ b/routers/api/v1/repo/file.go @@ -18,6 +18,7 @@ import ( api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers/common" + "code.gitea.io/gitea/routers/web/repo" ) // GetRawFile get a file by path on a repository @@ -126,7 +127,7 @@ func GetArchive(ctx *context.APIContext) { ctx.Repo.GitRepo = gitRepo defer gitRepo.Close() - common.Download(ctx.Context) + repo.Download(ctx.Context) } // GetEditorconfig get editor config of a repository diff --git a/routers/common/repo.go b/routers/common/repo.go index c61b5ec57..22403da09 100644 --- a/routers/common/repo.go +++ b/routers/common/repo.go @@ -7,7 +7,6 @@ package common import ( "fmt" "io" - "net/http" "path" "path/filepath" "strings" @@ -19,7 +18,6 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/typesniffer" - "code.gitea.io/gitea/services/archiver" ) // ServeBlob download a git.Blob @@ -41,30 +39,6 @@ func ServeBlob(ctx *context.Context, blob *git.Blob) error { return ServeData(ctx, ctx.Repo.TreePath, blob.Size(), dataRc) } -// Download an archive of a repository -func Download(ctx *context.Context) { - uri := ctx.Params("*") - aReq := archiver.DeriveRequestFrom(ctx, uri) - - if aReq == nil { - ctx.Error(http.StatusNotFound) - return - } - - downloadName := ctx.Repo.Repository.Name + "-" + aReq.GetArchiveName() - complete := aReq.IsComplete() - if !complete { - aReq = archiver.ArchiveRepository(aReq) - complete = aReq.WaitForCompletion(ctx) - } - - if complete { - ctx.ServeFile(aReq.GetArchivePath(), downloadName) - } else { - ctx.Error(http.StatusNotFound) - } -} - // ServeData download file from io.Reader func ServeData(ctx *context.Context, name string, size int64, reader io.Reader) error { buf := make([]byte, 1024) diff --git a/routers/init.go b/routers/init.go index 4c28a9539..bbf39a3f5 100644 --- a/routers/init.go +++ b/routers/init.go @@ -33,6 +33,7 @@ import ( "code.gitea.io/gitea/routers/common" "code.gitea.io/gitea/routers/private" web_routers "code.gitea.io/gitea/routers/web" + "code.gitea.io/gitea/services/archiver" "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/mailer" mirror_service "code.gitea.io/gitea/services/mirror" @@ -63,6 +64,9 @@ func NewServices() { mailer.NewContext() _ = cache.NewContext() notification.NewContext() + if err := archiver.Init(); err != nil { + log.Fatal("archiver init failed: %v", err) + } } // GlobalInit is for global configuration reload-able. diff --git a/routers/web/repo/repo.go b/routers/web/repo/repo.go index f149e92a8..919fd4620 100644 --- a/routers/web/repo/repo.go +++ b/routers/web/repo/repo.go @@ -15,8 +15,10 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/web" archiver_service "code.gitea.io/gitea/services/archiver" "code.gitea.io/gitea/services/forms" @@ -364,25 +366,123 @@ func RedirectDownload(ctx *context.Context) { ctx.Error(http.StatusNotFound) } -// InitiateDownload will enqueue an archival request, as needed. It may submit -// a request that's already in-progress, but the archiver service will just -// kind of drop it on the floor if this is the case. -func InitiateDownload(ctx *context.Context) { +// Download an archive of a repository +func Download(ctx *context.Context) { uri := ctx.Params("*") - aReq := archiver_service.DeriveRequestFrom(ctx, uri) - + aReq, err := archiver_service.NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, uri) + if err != nil { + ctx.ServerError("archiver_service.NewRequest", err) + return + } if aReq == nil { ctx.Error(http.StatusNotFound) return } - complete := aReq.IsComplete() - if !complete { - aReq = archiver_service.ArchiveRepository(aReq) - complete, _ = aReq.TimedWaitForCompletion(ctx, 2*time.Second) + archiver, err := models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID) + if err != nil { + ctx.ServerError("models.GetRepoArchiver", err) + return + } + if archiver != nil && archiver.Status == models.RepoArchiverReady { + download(ctx, aReq.GetArchiveName(), archiver) + return + } + + if err := archiver_service.StartArchive(aReq); err != nil { + ctx.ServerError("archiver_service.StartArchive", err) + return + } + + var times int + var t = time.NewTicker(time.Second * 1) + defer t.Stop() + + for { + select { + case <-graceful.GetManager().HammerContext().Done(): + log.Warn("exit archive download because system stop") + return + case <-t.C: + if times > 20 { + ctx.ServerError("wait download timeout", nil) + return + } + times++ + archiver, err = models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID) + if err != nil { + ctx.ServerError("archiver_service.StartArchive", err) + return + } + if archiver != nil && archiver.Status == models.RepoArchiverReady { + download(ctx, aReq.GetArchiveName(), archiver) + return + } + } + } +} + +func download(ctx *context.Context, archiveName string, archiver *models.RepoArchiver) { + downloadName := ctx.Repo.Repository.Name + "-" + archiveName + + rPath, err := archiver.RelativePath() + if err != nil { + ctx.ServerError("archiver.RelativePath", err) + return + } + + if setting.RepoArchive.ServeDirect { + //If we have a signed url (S3, object storage), redirect to this directly. + u, err := storage.RepoArchives.URL(rPath, downloadName) + if u != nil && err == nil { + ctx.Redirect(u.String()) + return + } + } + + //If we have matched and access to release or issue + fr, err := storage.RepoArchives.Open(rPath) + if err != nil { + ctx.ServerError("Open", err) + return + } + defer fr.Close() + ctx.ServeStream(fr, downloadName) +} + +// InitiateDownload will enqueue an archival request, as needed. It may submit +// a request that's already in-progress, but the archiver service will just +// kind of drop it on the floor if this is the case. +func InitiateDownload(ctx *context.Context) { + uri := ctx.Params("*") + aReq, err := archiver_service.NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, uri) + if err != nil { + ctx.ServerError("archiver_service.NewRequest", err) + return + } + if aReq == nil { + ctx.Error(http.StatusNotFound) + return + } + + archiver, err := models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID) + if err != nil { + ctx.ServerError("archiver_service.StartArchive", err) + return + } + if archiver == nil || archiver.Status != models.RepoArchiverReady { + if err := archiver_service.StartArchive(aReq); err != nil { + ctx.ServerError("archiver_service.StartArchive", err) + return + } + } + + var completed bool + if archiver != nil && archiver.Status == models.RepoArchiverReady { + completed = true } ctx.JSON(http.StatusOK, map[string]interface{}{ - "complete": complete, + "complete": completed, }) } diff --git a/routers/web/web.go b/routers/web/web.go index 2c8a6411a..883213479 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -22,7 +22,6 @@ import ( "code.gitea.io/gitea/modules/validation" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers/api/v1/misc" - "code.gitea.io/gitea/routers/common" "code.gitea.io/gitea/routers/web/admin" "code.gitea.io/gitea/routers/web/dev" "code.gitea.io/gitea/routers/web/events" @@ -888,7 +887,7 @@ func RegisterRoutes(m *web.Route) { }, context.RepoRef(), repo.MustBeNotEmpty, context.RequireRepoReaderOr(models.UnitTypeCode)) m.Group("/archive", func() { - m.Get("/*", common.Download) + m.Get("/*", repo.Download) m.Post("/*", repo.InitiateDownload) }, repo.MustBeNotEmpty, reqRepoCodeReader) diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index dfa6334d9..00c028130 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -6,22 +6,20 @@ package archiver import ( + "errors" + "fmt" "io" - "io/ioutil" "os" - "path" "regexp" "strings" - "sync" - "time" - "code.gitea.io/gitea/modules/base" - "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/modules/storage" ) // ArchiveRequest defines the parameters of an archive request, which notably @@ -30,223 +28,174 @@ import ( // This is entirely opaque to external entities, though, and mostly used as a // handle elsewhere. type ArchiveRequest struct { - uri string - repo *git.Repository - refName string - ext string - archivePath string - archiveType git.ArchiveType - archiveComplete bool - commit *git.Commit - cchan chan struct{} + RepoID int64 + refName string + Type git.ArchiveType + CommitID string } -var archiveInProgress []*ArchiveRequest -var archiveMutex sync.Mutex - // SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all // the way to 64. var shaRegex = regexp.MustCompile(`^[0-9a-f]{4,64}$`) -// These facilitate testing, by allowing the unit tests to control (to some extent) -// the goroutine used for processing the queue. -var archiveQueueMutex *sync.Mutex -var archiveQueueStartCond *sync.Cond -var archiveQueueReleaseCond *sync.Cond +// NewRequest creates an archival request, based on the URI. The +// resulting ArchiveRequest is suitable for being passed to ArchiveRepository() +// if it's determined that the request still needs to be satisfied. +func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) { + r := &ArchiveRequest{ + RepoID: repoID, + } -// GetArchivePath returns the path from which we can serve this archive. -func (aReq *ArchiveRequest) GetArchivePath() string { - return aReq.archivePath + var ext string + switch { + case strings.HasSuffix(uri, ".zip"): + ext = ".zip" + r.Type = git.ZIP + case strings.HasSuffix(uri, ".tar.gz"): + ext = ".tar.gz" + r.Type = git.TARGZ + default: + return nil, fmt.Errorf("Unknown format: %s", uri) + } + + r.refName = strings.TrimSuffix(uri, ext) + + var err error + // Get corresponding commit. + if repo.IsBranchExist(r.refName) { + r.CommitID, err = repo.GetBranchCommitID(r.refName) + if err != nil { + return nil, err + } + } else if repo.IsTagExist(r.refName) { + r.CommitID, err = repo.GetTagCommitID(r.refName) + if err != nil { + return nil, err + } + } else if shaRegex.MatchString(r.refName) { + if repo.IsCommitExist(r.refName) { + r.CommitID = r.refName + } else { + return nil, git.ErrNotExist{ + ID: r.refName, + } + } + } else { + return nil, fmt.Errorf("Unknow ref %s type", r.refName) + } + + return r, nil } // GetArchiveName returns the name of the caller, based on the ref used by the // caller to create this request. func (aReq *ArchiveRequest) GetArchiveName() string { - return aReq.refName + aReq.ext + return strings.ReplaceAll(aReq.refName, "/", "-") + "." + aReq.Type.String() } -// IsComplete returns the completion status of this request. -func (aReq *ArchiveRequest) IsComplete() bool { - return aReq.archiveComplete -} - -// WaitForCompletion will wait for this request to complete, with no timeout. -// It returns whether the archive was actually completed, as the channel could -// have also been closed due to an error. -func (aReq *ArchiveRequest) WaitForCompletion(ctx *context.Context) bool { - select { - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete() -} - -// TimedWaitForCompletion will wait for this request to complete, with timeout -// happening after the specified Duration. It returns whether the archive is -// now complete and whether we hit the timeout or not. The latter may not be -// useful if the request is complete or we started to shutdown. -func (aReq *ArchiveRequest) TimedWaitForCompletion(ctx *context.Context, dur time.Duration) (bool, bool) { - timeout := false - select { - case <-time.After(dur): - timeout = true - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete(), timeout -} - -// The caller must hold the archiveMutex across calls to getArchiveRequest. -func getArchiveRequest(repo *git.Repository, commit *git.Commit, archiveType git.ArchiveType) *ArchiveRequest { - for _, r := range archiveInProgress { - // Need to be referring to the same repository. - if r.repo.Path == repo.Path && r.commit.ID == commit.ID && r.archiveType == archiveType { - return r - } - } - return nil -} - -// DeriveRequestFrom creates an archival request, based on the URI. The -// resulting ArchiveRequest is suitable for being passed to ArchiveRepository() -// if it's determined that the request still needs to be satisfied. -func DeriveRequestFrom(ctx *context.Context, uri string) *ArchiveRequest { - if ctx.Repo == nil || ctx.Repo.GitRepo == nil { - log.Trace("Repo not initialized") - return nil - } - r := &ArchiveRequest{ - uri: uri, - repo: ctx.Repo.GitRepo, - } - - switch { - case strings.HasSuffix(uri, ".zip"): - r.ext = ".zip" - r.archivePath = path.Join(r.repo.Path, "archives/zip") - r.archiveType = git.ZIP - case strings.HasSuffix(uri, ".tar.gz"): - r.ext = ".tar.gz" - r.archivePath = path.Join(r.repo.Path, "archives/targz") - r.archiveType = git.TARGZ - default: - log.Trace("Unknown format: %s", uri) - return nil - } - - r.refName = strings.TrimSuffix(r.uri, r.ext) - isDir, err := util.IsDir(r.archivePath) +func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { + ctx, commiter, err := models.TxDBContext() if err != nil { - ctx.ServerError("Download -> util.IsDir(archivePath)", err) - return nil + return nil, err } - if !isDir { - if err := os.MkdirAll(r.archivePath, os.ModePerm); err != nil { - ctx.ServerError("Download -> os.MkdirAll(archivePath)", err) - return nil - } + defer commiter.Close() + + archiver, err := models.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID) + if err != nil { + return nil, err } - // Get corresponding commit. - if r.repo.IsBranchExist(r.refName) { - r.commit, err = r.repo.GetBranchCommit(r.refName) - if err != nil { - ctx.ServerError("GetBranchCommit", err) - return nil - } - } else if r.repo.IsTagExist(r.refName) { - r.commit, err = r.repo.GetTagCommit(r.refName) - if err != nil { - ctx.ServerError("GetTagCommit", err) - return nil - } - } else if shaRegex.MatchString(r.refName) { - r.commit, err = r.repo.GetCommit(r.refName) - if err != nil { - ctx.NotFound("GetCommit", nil) - return nil + if archiver != nil { + // FIXME: If another process are generating it, we think it's not ready and just return + // Or we should wait until the archive generated. + if archiver.Status == models.RepoArchiverGenerating { + return nil, nil } } else { - ctx.NotFound("DeriveRequestFrom", nil) - return nil + archiver = &models.RepoArchiver{ + RepoID: r.RepoID, + Type: r.Type, + CommitID: r.CommitID, + Status: models.RepoArchiverGenerating, + } + if err := models.AddRepoArchiver(ctx, archiver); err != nil { + return nil, err + } } - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(r.repo, r.commit, r.archiveType); rExisting != nil { - return rExisting - } - - r.archivePath = path.Join(r.archivePath, base.ShortSha(r.commit.ID.String())+r.ext) - r.archiveComplete, err = util.IsFile(r.archivePath) + rPath, err := archiver.RelativePath() if err != nil { - ctx.ServerError("util.IsFile", err) - return nil - } - return r -} - -func doArchive(r *ArchiveRequest) { - var ( - err error - tmpArchive *os.File - destArchive *os.File - ) - - // Close the channel to indicate to potential waiters that this request - // has finished. - defer close(r.cchan) - - // It could have happened that we enqueued two archival requests, due to - // race conditions and difficulties in locking. Do one last check that - // the archive we're referring to doesn't already exist. If it does exist, - // then just mark the request as complete and move on. - isFile, err := util.IsFile(r.archivePath) - if err != nil { - log.Error("Unable to check if %s util.IsFile: %v. Will ignore and recreate.", r.archivePath, err) - } - if isFile { - r.archiveComplete = true - return + return nil, err } - // Create a temporary file to use while the archive is being built. We - // will then copy it into place (r.archivePath) once it's fully - // constructed. - tmpArchive, err = ioutil.TempFile("", "archive") - if err != nil { - log.Error("Unable to create a temporary archive file! Error: %v", err) - return + _, err = storage.RepoArchives.Stat(rPath) + if err == nil { + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + return archiver, models.UpdateRepoArchiverStatus(ctx, archiver) + } + return archiver, nil } + + if !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("unable to stat archive: %v", err) + } + + rd, w := io.Pipe() defer func() { - tmpArchive.Close() - os.Remove(tmpArchive.Name()) + w.Close() + rd.Close() }() - - if err = r.commit.CreateArchive(graceful.GetManager().ShutdownContext(), tmpArchive.Name(), git.CreateArchiveOpts{ - Format: r.archiveType, - Prefix: setting.Repository.PrefixArchiveFiles, - }); err != nil { - log.Error("Download -> CreateArchive "+tmpArchive.Name(), err) - return - } - - // Now we copy it into place - if destArchive, err = os.Create(r.archivePath); err != nil { - log.Error("Unable to open archive " + r.archivePath) - return - } - _, err = io.Copy(destArchive, tmpArchive) - destArchive.Close() + var done = make(chan error) + repo, err := archiver.LoadRepo() if err != nil { - log.Error("Unable to write archive " + r.archivePath) - return + return nil, fmt.Errorf("archiver.LoadRepo failed: %v", err) } - // Block any attempt to finalize creating a new request if we're marking - r.archiveComplete = true + gitRepo, err := git.OpenRepository(repo.RepoPath()) + if err != nil { + return nil, err + } + defer gitRepo.Close() + + go func(done chan error, w *io.PipeWriter, archiver *models.RepoArchiver, gitRepo *git.Repository) { + defer func() { + if r := recover(); r != nil { + done <- fmt.Errorf("%v", r) + } + }() + + err = gitRepo.CreateArchive( + graceful.GetManager().ShutdownContext(), + archiver.Type, + w, + setting.Repository.PrefixArchiveFiles, + archiver.CommitID, + ) + _ = w.CloseWithError(err) + done <- err + }(done, w, archiver, gitRepo) + + // TODO: add lfs data to zip + // TODO: add submodule data to zip + + if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil { + return nil, fmt.Errorf("unable to write archive: %v", err) + } + + err = <-done + if err != nil { + return nil, err + } + + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + if err = models.UpdateRepoArchiverStatus(ctx, archiver); err != nil { + return nil, err + } + } + + return archiver, commiter.Commit() } // ArchiveRepository satisfies the ArchiveRequest being passed in. Processing @@ -255,65 +204,46 @@ func doArchive(r *ArchiveRequest) { // anything. In all cases, the caller should be examining the *ArchiveRequest // being returned for completion, as it may be different than the one they passed // in. -func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest { - // We'll return the request that's already been enqueued if it has been - // enqueued, or we'll immediately enqueue it if it has not been enqueued - // and it is not marked complete. - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil { - return rExisting - } - if request.archiveComplete { - return request - } +func ArchiveRepository(request *ArchiveRequest) (*models.RepoArchiver, error) { + return doArchive(request) +} - request.cchan = make(chan struct{}) - archiveInProgress = append(archiveInProgress, request) - go func() { - // Wait to start, if we have the Cond for it. This is currently only - // useful for testing, so that the start and release of queued entries - // can be controlled to examine the queue. - if archiveQueueStartCond != nil { - archiveQueueMutex.Lock() - archiveQueueStartCond.Wait() - archiveQueueMutex.Unlock() - } +var archiverQueue queue.UniqueQueue - // Drop the mutex while we process the request. This may take a long - // time, and it's not necessary now that we've added the reequest to - // archiveInProgress. - doArchive(request) - - if archiveQueueReleaseCond != nil { - archiveQueueMutex.Lock() - archiveQueueReleaseCond.Wait() - archiveQueueMutex.Unlock() - } - - // Purge this request from the list. To do so, we'll just take the - // index at which we ended up at and swap the final element into that - // position, then chop off the now-redundant final element. The slice - // may have change in between these two segments and we may have moved, - // so we search for it here. We could perhaps avoid this search - // entirely if len(archiveInProgress) == 1, but we should verify - // correctness. - archiveMutex.Lock() - defer archiveMutex.Unlock() - - idx := -1 - for _idx, req := range archiveInProgress { - if req == request { - idx = _idx - break +// Init initlize archive +func Init() error { + handler := func(data ...queue.Data) { + for _, datum := range data { + archiveReq, ok := datum.(*ArchiveRequest) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("ArchiverData Process: %#v", archiveReq) + if _, err := doArchive(archiveReq); err != nil { + log.Error("Archive %v faild: %v", datum, err) } } - if idx == -1 { - log.Error("ArchiveRepository: Failed to find request for removal.") - return - } - archiveInProgress = append(archiveInProgress[:idx], archiveInProgress[idx+1:]...) - }() + } - return request + archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) + if archiverQueue == nil { + return errors.New("unable to create codes indexer queue") + } + + go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run) + + return nil +} + +// StartArchive push the archive request to the queue +func StartArchive(request *ArchiveRequest) error { + has, err := archiverQueue.Has(request) + if err != nil { + return err + } + if has { + return nil + } + return archiverQueue.Push(request) } diff --git a/services/archiver/archiver_test.go b/services/archiver/archiver_test.go index 6dcd942bf..3f3f36998 100644 --- a/services/archiver/archiver_test.go +++ b/services/archiver/archiver_test.go @@ -6,108 +6,75 @@ package archiver import ( "path/filepath" - "sync" "testing" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/test" - "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" ) -var queueMutex sync.Mutex - func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..")) } func waitForCount(t *testing.T, num int) { - var numQueued int - // Wait for up to 10 seconds for the queue to be impacted. - timeout := time.Now().Add(10 * time.Second) - for { - numQueued = len(archiveInProgress) - if numQueued == num || time.Now().After(timeout) { - break - } - } - - assert.Len(t, archiveInProgress, num) -} - -func releaseOneEntry(t *testing.T, inFlight []*ArchiveRequest) { - var nowQueued, numQueued int - - numQueued = len(archiveInProgress) - - // Release one, then wait up to 10 seconds for it to complete. - queueMutex.Lock() - archiveQueueReleaseCond.Signal() - queueMutex.Unlock() - timeout := time.Now().Add(10 * time.Second) - for { - nowQueued = len(archiveInProgress) - if nowQueued != numQueued || time.Now().After(timeout) { - break - } - } - - // Make sure we didn't just timeout. - assert.NotEqual(t, numQueued, nowQueued) - - // Also make sure that we released only one. - assert.Equal(t, numQueued-1, nowQueued) } func TestArchive_Basic(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) - archiveQueueMutex = &queueMutex - archiveQueueStartCond = sync.NewCond(&queueMutex) - archiveQueueReleaseCond = sync.NewCond(&queueMutex) - defer func() { - archiveQueueMutex = nil - archiveQueueStartCond = nil - archiveQueueReleaseCond = nil - }() - ctx := test.MockContext(t, "user27/repo49") firstCommit, secondCommit := "51f84af23134", "aacbdfe9e1c4" - bogusReq := DeriveRequestFrom(ctx, firstCommit+".zip") - assert.Nil(t, bogusReq) - test.LoadRepo(t, ctx, 49) - bogusReq = DeriveRequestFrom(ctx, firstCommit+".zip") - assert.Nil(t, bogusReq) - test.LoadGitRepo(t, ctx) defer ctx.Repo.GitRepo.Close() + bogusReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, firstCommit+".zip", bogusReq.GetArchiveName()) + // Check a series of bogus requests. // Step 1, valid commit with a bad extension. - bogusReq = DeriveRequestFrom(ctx, firstCommit+".dilbert") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".dilbert") + assert.Error(t, err) assert.Nil(t, bogusReq) // Step 2, missing commit. - bogusReq = DeriveRequestFrom(ctx, "dbffff.zip") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "dbffff.zip") + assert.Error(t, err) assert.Nil(t, bogusReq) // Step 3, doesn't look like branch/tag/commit. - bogusReq = DeriveRequestFrom(ctx, "db.zip") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "db.zip") + assert.Error(t, err) assert.Nil(t, bogusReq) + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "master.zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, "master.zip", bogusReq.GetArchiveName()) + + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "test/archive.zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, "test-archive.zip", bogusReq.GetArchiveName()) + // Now two valid requests, firstCommit with valid extensions. - zipReq := DeriveRequestFrom(ctx, firstCommit+".zip") + zipReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) assert.NotNil(t, zipReq) - tgzReq := DeriveRequestFrom(ctx, firstCommit+".tar.gz") + tgzReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".tar.gz") + assert.NoError(t, err) assert.NotNil(t, tgzReq) - secondReq := DeriveRequestFrom(ctx, secondCommit+".zip") + secondReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".zip") + assert.NoError(t, err) assert.NotNil(t, secondReq) inFlight := make([]*ArchiveRequest, 3) @@ -128,41 +95,9 @@ func TestArchive_Basic(t *testing.T) { // Sleep two seconds to make sure the queue doesn't change. time.Sleep(2 * time.Second) - assert.Len(t, archiveInProgress, 3) - // Release them all, they'll then stall at the archiveQueueReleaseCond while - // we examine the queue state. - queueMutex.Lock() - archiveQueueStartCond.Broadcast() - queueMutex.Unlock() - - // Iterate through all of the in-flight requests and wait for their - // completion. - for _, req := range inFlight { - req.WaitForCompletion(ctx) - } - - for _, req := range inFlight { - assert.True(t, req.IsComplete()) - exist, err := util.IsExist(req.GetArchivePath()) - assert.NoError(t, err) - assert.True(t, exist) - } - - arbitraryReq := inFlight[0] - // Reopen the channel so we don't double-close, mark it incomplete. We're - // going to run it back through the archiver, and it should get marked - // complete again. - arbitraryReq.cchan = make(chan struct{}) - arbitraryReq.archiveComplete = false - doArchive(arbitraryReq) - assert.True(t, arbitraryReq.IsComplete()) - - // Queues should not have drained yet, because we haven't released them. - // Do so now. - assert.Len(t, archiveInProgress, 3) - - zipReq2 := DeriveRequestFrom(ctx, firstCommit+".zip") + zipReq2, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) // This zipReq should match what's sitting in the queue, as we haven't // let it release yet. From the consumer's point of view, this looks like // a long-running archive task. @@ -173,46 +108,22 @@ func TestArchive_Basic(t *testing.T) { // predecessor has cleared out of the queue. ArchiveRepository(zipReq2) - // Make sure the queue hasn't grown any. - assert.Len(t, archiveInProgress, 3) - - // Make sure the queue drains properly - releaseOneEntry(t, inFlight) - assert.Len(t, archiveInProgress, 2) - releaseOneEntry(t, inFlight) - assert.Len(t, archiveInProgress, 1) - releaseOneEntry(t, inFlight) - assert.Empty(t, archiveInProgress) - // Now we'll submit a request and TimedWaitForCompletion twice, before and // after we release it. We should trigger both the timeout and non-timeout // cases. - var completed, timedout bool - timedReq := DeriveRequestFrom(ctx, secondCommit+".tar.gz") + timedReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz") + assert.NoError(t, err) assert.NotNil(t, timedReq) ArchiveRepository(timedReq) - // Guaranteed to timeout; we haven't signalled the request to start.. - completed, timedout = timedReq.TimedWaitForCompletion(ctx, 2*time.Second) - assert.False(t, completed) - assert.True(t, timedout) - - queueMutex.Lock() - archiveQueueStartCond.Broadcast() - queueMutex.Unlock() - - // Shouldn't timeout, we've now signalled it and it's a small request. - completed, timedout = timedReq.TimedWaitForCompletion(ctx, 15*time.Second) - assert.True(t, completed) - assert.False(t, timedout) - - zipReq2 = DeriveRequestFrom(ctx, firstCommit+".zip") + zipReq2, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) // Now, we're guaranteed to have released the original zipReq from the queue. // Ensure that we don't get handed back the released entry somehow, but they // should remain functionally equivalent in all fields. The exception here // is zipReq.cchan, which will be non-nil because it's a completed request. // It's fine to go ahead and set it to nil now. - zipReq.cchan = nil + assert.Equal(t, zipReq, zipReq2) assert.False(t, zipReq == zipReq2)