From b7cc47166b2cce9e4032e260416c661a8dbd4229 Mon Sep 17 00:00:00 2001 From: Ciprian Dorin Craciun Date: Tue, 14 Dec 2021 21:24:51 +0200 Subject: [PATCH] [archiver] Add initial support for compression cache. --- scripts/z-run | 27 ++++++ sources/cmd/archiver/archiver.go | 147 ++++++++++++++++++++++++------- sources/go.mod | 1 + sources/go.sum | 3 + sources/lib/archiver/compress.go | 46 ++++++---- 5 files changed, 177 insertions(+), 47 deletions(-) diff --git a/scripts/z-run b/scripts/z-run index 7d2ec15..c34456c 100644 --- a/scripts/z-run +++ b/scripts/z-run @@ -594,6 +594,7 @@ "${ZRUN[@]}" ':: execute / archiver / debug' \ --sources ./examples/hello-world \ --archive ./examples/hello-world.cdb \ + --progress \ --debug \ "${@}" \ # @@ -620,6 +621,32 @@ +<< examples / huge / archive + exec -- "${ZRUN[@]}" ':: execute / archiver / debug' \ + --sources ./.outputs/examples/huge \ + --archive ./.outputs/examples/huge.cdb \ + --compress gzip \ + --compress-cache ./.outputs/examples/huge-cache.db \ + --progress \ + "${@}" \ + # +!! + +<< examples / huge / serve + exec -- "${ZRUN[@]}" ':: execute / server / debug' \ + --bind 127.153.215.30:8080 \ + --archive ./.outputs/examples/huge.cdb \ + --archive-mmap \ + --processes 1 \ + --threads 1 \ + --debug \ + "${@}" \ + # +!! + + + + << benchmark / dummy test "${#}" -eq 0 _outputs="$( exec -- readlink -e -- ./.outputs )" diff --git a/sources/cmd/archiver/archiver.go b/sources/cmd/archiver/archiver.go index a79bfc4..8d2408e 100644 --- a/sources/cmd/archiver/archiver.go +++ b/sources/cmd/archiver/archiver.go @@ -20,6 +20,7 @@ import "syscall" import "time" import "github.com/colinmarc/cdb" +import "go.etcd.io/bbolt" import . "github.com/volution/kawipiko/lib/common" import . "github.com/volution/kawipiko/lib/archiver" @@ -40,6 +41,7 @@ type context struct { storedFiles map[[2]uint64][2]string archivedReferences uint compress string + compressCache *bbolt.DB dataUncompressedCount int dataUncompressedSize int dataCompressedCount int @@ -414,40 +416,94 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive _dataEncoding := "identity" _dataUncompressedSize := len (_dataContent) _dataSize := _dataUncompressedSize - if _dataSize > 512 { - if _dataContent_0, _dataEncoding_0, _error := Compress (_dataContent, _context.compress); _error == nil { - if _dataEncoding_0 != "identity" { - _dataCompressedSize := len (_dataContent_0) - _dataCompressedDelta := _dataUncompressedSize - _dataCompressedSize - _dataCompressedRatio := (_dataCompressedDelta * 100) / _dataUncompressedSize - _accepted := false - _accepted = _accepted || ((_dataUncompressedSize > (1024 * 1024)) && (_dataCompressedRatio >= 5)) - _accepted = _accepted || ((_dataUncompressedSize > (64 * 1024)) && (_dataCompressedRatio >= 10)) - _accepted = _accepted || ((_dataUncompressedSize > (32 * 1024)) && (_dataCompressedRatio >= 15)) - _accepted = _accepted || ((_dataUncompressedSize > (16 * 1024)) && (_dataCompressedRatio >= 20)) - _accepted = _accepted || ((_dataUncompressedSize > (8 * 1024)) && (_dataCompressedRatio >= 25)) - _accepted = _accepted || ((_dataUncompressedSize > (4 * 1024)) && (_dataCompressedRatio >= 30)) - _accepted = _accepted || ((_dataUncompressedSize > (2 * 1024)) && (_dataCompressedRatio >= 35)) - _accepted = _accepted || ((_dataUncompressedSize > (1 * 1024)) && (_dataCompressedRatio >= 40)) - _accepted = _accepted || (_dataCompressedRatio >= 90) - if _accepted { - _dataContent = _dataContent_0 - _dataEncoding = _dataEncoding_0 - _dataSize = _dataCompressedSize - } - if _dataSize < _dataUncompressedSize { - if _context.debug { - log.Printf ("[ ] compress %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive) - } + + var _compressAlgorithm string + var _compressEncoding string + if _algorithm_0, _encoding_0, _error := CompressEncoding (_context.compress); _error == nil { + _compressAlgorithm = _algorithm_0 + _compressEncoding = _encoding_0 + } else { + return "", nil, nil, _error + } + + if (_dataSize > 512) && (_compressAlgorithm != "identity") { + + var _dataCompressed []byte + var _dataCompressedCached bool + + if _context.compressCache != nil { + _cacheTxn, _error := _context.compressCache.Begin (false) + if _error != nil { + AbortError (_error, "[91a5b78a] unexpected compression cache error!") + } + _cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm)) + if _cacheBucket != nil { + _dataCompressed = _cacheBucket.Get (_fingerprintContentRaw[:]) + _dataCompressedCached = _dataCompressed != nil + } + if _error := _cacheTxn.Rollback (); _error != nil { + AbortError (_error, "[a06cfe46] unexpected compression cache error!") + } + } + + if _dataCompressed == nil { + if _data_0, _error := Compress (_dataContent, _compressAlgorithm); _error == nil { + _dataCompressed = _data_0 + } else { + return "", nil, nil, _error + } + } + + _dataCompressedSize := len (_dataCompressed) + _dataCompressedDelta := _dataUncompressedSize - _dataCompressedSize + _dataCompressedRatio := (_dataCompressedDelta * 100) / _dataUncompressedSize + _accepted := false + _accepted = _accepted || ((_dataUncompressedSize > (1024 * 1024)) && (_dataCompressedRatio >= 5)) + _accepted = _accepted || ((_dataUncompressedSize > (64 * 1024)) && (_dataCompressedRatio >= 10)) + _accepted = _accepted || ((_dataUncompressedSize > (32 * 1024)) && (_dataCompressedRatio >= 15)) + _accepted = _accepted || ((_dataUncompressedSize > (16 * 1024)) && (_dataCompressedRatio >= 20)) + _accepted = _accepted || ((_dataUncompressedSize > (8 * 1024)) && (_dataCompressedRatio >= 25)) + _accepted = _accepted || ((_dataUncompressedSize > (4 * 1024)) && (_dataCompressedRatio >= 30)) + _accepted = _accepted || ((_dataUncompressedSize > (2 * 1024)) && (_dataCompressedRatio >= 35)) + _accepted = _accepted || ((_dataUncompressedSize > (1 * 1024)) && (_dataCompressedRatio >= 40)) + _accepted = _accepted || (_dataCompressedRatio >= 90) + if _accepted { + _dataContent = _dataCompressed + _dataEncoding = _compressEncoding + _dataSize = _dataCompressedSize + } + + if (_context.compressCache != nil) && !_dataCompressedCached && _accepted { + _cacheTxn, _error := _context.compressCache.Begin (true) + if _error != nil { + AbortError (_error, "[ddbe6a70] unexpected compression cache error!") + } + _cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm)) + if _cacheBucket == nil { + if _bucket_0, _error := _cacheTxn.CreateBucket ([]byte (_compressAlgorithm)); _error == nil { + _cacheBucket = _bucket_0 } else { - if _context.debug { - log.Printf ("[ ] compress-NOK %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive) - } + AbortError (_error, "[b7766792] unexpected compression cache error!") } } - } else { - return "", nil, nil, _error + if _error := _cacheBucket.Put (_fingerprintContentRaw[:], _dataCompressed); _error != nil { + AbortError (_error, "[51d57220] unexpected compression cache error!") + } + if _error := _cacheTxn.Commit (); _error != nil { + AbortError (_error, "[a47c7c10] unexpected compression cache error!") + } } + + if _dataSize < _dataUncompressedSize { + if _context.debug { + log.Printf ("[ ] compress %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive) + } + } else { + if _context.debug { + log.Printf ("[ ] compress-NOK %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive) + } + } + } else { if _context.debug && (_context.compress != "identity") { log.Printf ("[ ] compress-NOK %8d `%s`\n", _dataUncompressedSize, _pathInArchive) @@ -689,6 +745,7 @@ func main_0 () (error) { var _sourcesFolder string var _archiveFile string var _compress string + var _compressCache string var _includeIndex bool var _includeStripped bool var _includeCache bool @@ -720,7 +777,9 @@ func main_0 () (error) { --sources --archive + --compress + --compress-cache --exclude-index --exclude-strip @@ -742,6 +801,7 @@ func main_0 () (error) { _sourcesFolder_0 := _flags.String ("sources", "", "") _archiveFile_0 := _flags.String ("archive", "", "") _compress_0 := _flags.String ("compress", "", "") + _compressCache_0 := _flags.String ("compress-cache", "", "") _excludeIndex_0 := _flags.Bool ("exclude-index", false, "") _excludeStripped_0 := _flags.Bool ("exclude-strip", false, "") _excludeCache_0 := _flags.Bool ("exclude-cache", false, "") @@ -756,6 +816,7 @@ func main_0 () (error) { _sourcesFolder = *_sourcesFolder_0 _archiveFile = *_archiveFile_0 _compress = *_compress_0 + _compressCache = *_compressCache_0 _includeIndex = ! *_excludeIndex_0 _includeStripped = ! *_excludeStripped_0 _includeCache = ! *_excludeCache_0 @@ -775,12 +836,27 @@ func main_0 () (error) { var _cdbWriter *cdb.Writer - if _cdbWriter_0, _error := cdb.Create (_archiveFile); _error == nil { - _cdbWriter = _cdbWriter_0 + if _db_0, _error := cdb.Create (_archiveFile); _error == nil { + _cdbWriter = _db_0 } else { AbortError (_error, "[85234ba0] failed creating archive (while opening)!") } + var _compressCacheDb *bbolt.DB + if _compressCache != "" { + _options := bbolt.Options { + PageSize : 16 * 1024, + InitialMmapSize : 1 * 1024 * 1024, + NoFreelistSync : true, + NoSync : true, + } + if _db_0, _error := bbolt.Open (_compressCache, 0600, &_options); _error == nil { + _compressCacheDb = _db_0 + } else { + AbortError (_error, "[eaff07f6] failed opening compression cache!") + } + } + _context := & context { cdbWriter : _cdbWriter, storedFilePaths : make ([]string, 0, 16 * 1024), @@ -790,6 +866,7 @@ func main_0 () (error) { storedDataContentMeta : make (map[string]map[string]string, 16 * 1024), storedFiles : make (map[[2]uint64][2]string, 16 * 1024), compress : _compress, + compressCache : _compressCacheDb, includeIndex : _includeIndex, includeStripped : _includeStripped, includeCache : _includeCache, @@ -846,6 +923,12 @@ func main_0 () (error) { } _context.cdbWriter = nil + if _context.compressCache != nil { + if _error := _context.compressCache.Close (); _error != nil { + AbortError (_error, "[53cbe28d] failed closing compression cache!") + } + } + if true { log.Printf ("[ ] completed -- %0.2f minutes -- %d files, %d folders, %0.2f MiB (%0.2f MiB/s) -- %d compressed (%0.2f MiB, %0.2f%%) -- %d records (%0.2f MiB)\n", time.Since (_context.progressStarted) .Minutes (), diff --git a/sources/go.mod b/sources/go.mod index 60afe3f..8913bf9 100644 --- a/sources/go.mod +++ b/sources/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.7.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect + go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/sources/go.sum b/sources/go.sum index aced853..a146bee 100644 --- a/sources/go.sum +++ b/sources/go.sum @@ -25,9 +25,12 @@ github.com/valyala/fasthttp v1.31.0 h1:lrauRLII19afgCs2fnWRJ4M5IkV0lo2FqA61uGkNB github.com/valyala/fasthttp v1.31.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= +go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/sources/lib/archiver/compress.go b/sources/lib/archiver/compress.go index dd88923..8151bf3 100644 --- a/sources/lib/archiver/compress.go +++ b/sources/lib/archiver/compress.go @@ -14,7 +14,7 @@ import "github.com/andybalholm/brotli" -func Compress (_data []byte, _algorithm string) ([]byte, string, error) { +func Compress (_data []byte, _algorithm string) ([]byte, error) { switch _algorithm { case "gz", "gzip" : return CompressGzip (_data) @@ -23,16 +23,32 @@ func Compress (_data []byte, _algorithm string) ([]byte, string, error) { case "br", "brotli" : return CompressBrotli (_data) case "", "none", "identity" : - return _data, "identity", nil + return _data, nil default : - return nil, "", fmt.Errorf ("[ea23f966] invalid compression algorithm `%s`", _algorithm) + return nil, fmt.Errorf ("[ea23f966] invalid compression algorithm `%s`", _algorithm) + } +} + + +func CompressEncoding (_algorithm string) (string, string, error) { + switch _algorithm { + case "gz", "gzip" : + return "gzip", "gzip", nil + case "zopfli" : + return "zopfli", "gzip", nil + case "br", "brotli" : + return "brotli", "br", nil + case "", "none", "identity" : + return "identity", "identity", nil + default : + return "", "", fmt.Errorf ("[6026a403] invalid compression algorithm `%s`", _algorithm) } } -func CompressGzip (_data []byte) ([]byte, string, error) { +func CompressGzip (_data []byte) ([]byte, error) { _buffer := & bytes.Buffer {} @@ -40,24 +56,24 @@ func CompressGzip (_data []byte) ([]byte, string, error) { if _encoder_0, _error := gzip.NewWriterLevel (_buffer, gzip.BestCompression); _error == nil { _encoder = _encoder_0 } else { - return nil, "", _error + return nil, _error } if _, _error := _encoder.Write (_data); _error != nil { - return nil, "", _error + return nil, _error } if _error := _encoder.Close (); _error != nil { - return nil, "", _error + return nil, _error } _data = _buffer.Bytes () - return _data, "gzip", nil + return _data, nil } -func CompressZopfli (_data []byte) ([]byte, string, error) { +func CompressZopfli (_data []byte) ([]byte, error) { _buffer := & bytes.Buffer {} @@ -69,17 +85,17 @@ func CompressZopfli (_data []byte) ([]byte, string, error) { _options.BlockType = zopfli.DYNAMIC_BLOCK if _error := zopfli.GzipCompress (&_options, _data, _buffer); _error != nil { - return nil, "", _error + return nil, _error } _data = _buffer.Bytes () - return _data, "gzip", nil + return _data, nil } -func CompressBrotli (_data []byte) ([]byte, string, error) { +func CompressBrotli (_data []byte) ([]byte, error) { _buffer := & bytes.Buffer {} @@ -88,13 +104,13 @@ func CompressBrotli (_data []byte) ([]byte, string, error) { _encoder := brotli.NewWriterOptions (_buffer, _options) if _, _error := _encoder.Write (_data); _error != nil { - return nil, "", _error + return nil, _error } if _error := _encoder.Close (); _error != nil { - return nil, "", _error + return nil, _error } _data = _buffer.Bytes () - return _data, "br", nil + return _data, nil }