[archiver] Add initial support for compression cache.

This commit is contained in:
Ciprian Dorin Craciun 2021-12-14 21:24:51 +02:00
parent d9d950e182
commit b7cc47166b
5 changed files with 177 additions and 47 deletions

View file

@ -594,6 +594,7 @@
"${ZRUN[@]}" ':: execute / archiver / debug' \
--sources ./examples/hello-world \
--archive ./examples/hello-world.cdb \
--progress \
--debug \
"${@}" \
#
@ -620,6 +621,32 @@
<< examples / huge / archive
exec -- "${ZRUN[@]}" ':: execute / archiver / debug' \
--sources ./.outputs/examples/huge \
--archive ./.outputs/examples/huge.cdb \
--compress gzip \
--compress-cache ./.outputs/examples/huge-cache.db \
--progress \
"${@}" \
#
!!
<< examples / huge / serve
exec -- "${ZRUN[@]}" ':: execute / server / debug' \
--bind 127.153.215.30:8080 \
--archive ./.outputs/examples/huge.cdb \
--archive-mmap \
--processes 1 \
--threads 1 \
--debug \
"${@}" \
#
!!
<< benchmark / dummy
test "${#}" -eq 0
_outputs="$( exec -- readlink -e -- ./.outputs )"

View file

@ -20,6 +20,7 @@ import "syscall"
import "time"
import "github.com/colinmarc/cdb"
import "go.etcd.io/bbolt"
import . "github.com/volution/kawipiko/lib/common"
import . "github.com/volution/kawipiko/lib/archiver"
@ -40,6 +41,7 @@ type context struct {
storedFiles map[[2]uint64][2]string
archivedReferences uint
compress string
compressCache *bbolt.DB
dataUncompressedCount int
dataUncompressedSize int
dataCompressedCount int
@ -414,40 +416,94 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
_dataEncoding := "identity"
_dataUncompressedSize := len (_dataContent)
_dataSize := _dataUncompressedSize
if _dataSize > 512 {
if _dataContent_0, _dataEncoding_0, _error := Compress (_dataContent, _context.compress); _error == nil {
if _dataEncoding_0 != "identity" {
_dataCompressedSize := len (_dataContent_0)
_dataCompressedDelta := _dataUncompressedSize - _dataCompressedSize
_dataCompressedRatio := (_dataCompressedDelta * 100) / _dataUncompressedSize
_accepted := false
_accepted = _accepted || ((_dataUncompressedSize > (1024 * 1024)) && (_dataCompressedRatio >= 5))
_accepted = _accepted || ((_dataUncompressedSize > (64 * 1024)) && (_dataCompressedRatio >= 10))
_accepted = _accepted || ((_dataUncompressedSize > (32 * 1024)) && (_dataCompressedRatio >= 15))
_accepted = _accepted || ((_dataUncompressedSize > (16 * 1024)) && (_dataCompressedRatio >= 20))
_accepted = _accepted || ((_dataUncompressedSize > (8 * 1024)) && (_dataCompressedRatio >= 25))
_accepted = _accepted || ((_dataUncompressedSize > (4 * 1024)) && (_dataCompressedRatio >= 30))
_accepted = _accepted || ((_dataUncompressedSize > (2 * 1024)) && (_dataCompressedRatio >= 35))
_accepted = _accepted || ((_dataUncompressedSize > (1 * 1024)) && (_dataCompressedRatio >= 40))
_accepted = _accepted || (_dataCompressedRatio >= 90)
if _accepted {
_dataContent = _dataContent_0
_dataEncoding = _dataEncoding_0
_dataSize = _dataCompressedSize
}
if _dataSize < _dataUncompressedSize {
if _context.debug {
log.Printf ("[ ] compress %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive)
}
var _compressAlgorithm string
var _compressEncoding string
if _algorithm_0, _encoding_0, _error := CompressEncoding (_context.compress); _error == nil {
_compressAlgorithm = _algorithm_0
_compressEncoding = _encoding_0
} else {
return "", nil, nil, _error
}
if (_dataSize > 512) && (_compressAlgorithm != "identity") {
var _dataCompressed []byte
var _dataCompressedCached bool
if _context.compressCache != nil {
_cacheTxn, _error := _context.compressCache.Begin (false)
if _error != nil {
AbortError (_error, "[91a5b78a] unexpected compression cache error!")
}
_cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm))
if _cacheBucket != nil {
_dataCompressed = _cacheBucket.Get (_fingerprintContentRaw[:])
_dataCompressedCached = _dataCompressed != nil
}
if _error := _cacheTxn.Rollback (); _error != nil {
AbortError (_error, "[a06cfe46] unexpected compression cache error!")
}
}
if _dataCompressed == nil {
if _data_0, _error := Compress (_dataContent, _compressAlgorithm); _error == nil {
_dataCompressed = _data_0
} else {
return "", nil, nil, _error
}
}
_dataCompressedSize := len (_dataCompressed)
_dataCompressedDelta := _dataUncompressedSize - _dataCompressedSize
_dataCompressedRatio := (_dataCompressedDelta * 100) / _dataUncompressedSize
_accepted := false
_accepted = _accepted || ((_dataUncompressedSize > (1024 * 1024)) && (_dataCompressedRatio >= 5))
_accepted = _accepted || ((_dataUncompressedSize > (64 * 1024)) && (_dataCompressedRatio >= 10))
_accepted = _accepted || ((_dataUncompressedSize > (32 * 1024)) && (_dataCompressedRatio >= 15))
_accepted = _accepted || ((_dataUncompressedSize > (16 * 1024)) && (_dataCompressedRatio >= 20))
_accepted = _accepted || ((_dataUncompressedSize > (8 * 1024)) && (_dataCompressedRatio >= 25))
_accepted = _accepted || ((_dataUncompressedSize > (4 * 1024)) && (_dataCompressedRatio >= 30))
_accepted = _accepted || ((_dataUncompressedSize > (2 * 1024)) && (_dataCompressedRatio >= 35))
_accepted = _accepted || ((_dataUncompressedSize > (1 * 1024)) && (_dataCompressedRatio >= 40))
_accepted = _accepted || (_dataCompressedRatio >= 90)
if _accepted {
_dataContent = _dataCompressed
_dataEncoding = _compressEncoding
_dataSize = _dataCompressedSize
}
if (_context.compressCache != nil) && !_dataCompressedCached && _accepted {
_cacheTxn, _error := _context.compressCache.Begin (true)
if _error != nil {
AbortError (_error, "[ddbe6a70] unexpected compression cache error!")
}
_cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm))
if _cacheBucket == nil {
if _bucket_0, _error := _cacheTxn.CreateBucket ([]byte (_compressAlgorithm)); _error == nil {
_cacheBucket = _bucket_0
} else {
if _context.debug {
log.Printf ("[ ] compress-NOK %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive)
}
AbortError (_error, "[b7766792] unexpected compression cache error!")
}
}
} else {
return "", nil, nil, _error
if _error := _cacheBucket.Put (_fingerprintContentRaw[:], _dataCompressed); _error != nil {
AbortError (_error, "[51d57220] unexpected compression cache error!")
}
if _error := _cacheTxn.Commit (); _error != nil {
AbortError (_error, "[a47c7c10] unexpected compression cache error!")
}
}
if _dataSize < _dataUncompressedSize {
if _context.debug {
log.Printf ("[ ] compress %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive)
}
} else {
if _context.debug {
log.Printf ("[ ] compress-NOK %02d %8d %8d `%s`\n", _dataCompressedRatio, _dataUncompressedSize, _dataCompressedDelta, _pathInArchive)
}
}
} else {
if _context.debug && (_context.compress != "identity") {
log.Printf ("[ ] compress-NOK %8d `%s`\n", _dataUncompressedSize, _pathInArchive)
@ -689,6 +745,7 @@ func main_0 () (error) {
var _sourcesFolder string
var _archiveFile string
var _compress string
var _compressCache string
var _includeIndex bool
var _includeStripped bool
var _includeCache bool
@ -720,7 +777,9 @@ func main_0 () (error) {
--sources <path>
--archive <path>
--compress <gzip | zopfli | brotli | identity>
--compress-cache <path>
--exclude-index
--exclude-strip
@ -742,6 +801,7 @@ func main_0 () (error) {
_sourcesFolder_0 := _flags.String ("sources", "", "")
_archiveFile_0 := _flags.String ("archive", "", "")
_compress_0 := _flags.String ("compress", "", "")
_compressCache_0 := _flags.String ("compress-cache", "", "")
_excludeIndex_0 := _flags.Bool ("exclude-index", false, "")
_excludeStripped_0 := _flags.Bool ("exclude-strip", false, "")
_excludeCache_0 := _flags.Bool ("exclude-cache", false, "")
@ -756,6 +816,7 @@ func main_0 () (error) {
_sourcesFolder = *_sourcesFolder_0
_archiveFile = *_archiveFile_0
_compress = *_compress_0
_compressCache = *_compressCache_0
_includeIndex = ! *_excludeIndex_0
_includeStripped = ! *_excludeStripped_0
_includeCache = ! *_excludeCache_0
@ -775,12 +836,27 @@ func main_0 () (error) {
var _cdbWriter *cdb.Writer
if _cdbWriter_0, _error := cdb.Create (_archiveFile); _error == nil {
_cdbWriter = _cdbWriter_0
if _db_0, _error := cdb.Create (_archiveFile); _error == nil {
_cdbWriter = _db_0
} else {
AbortError (_error, "[85234ba0] failed creating archive (while opening)!")
}
var _compressCacheDb *bbolt.DB
if _compressCache != "" {
_options := bbolt.Options {
PageSize : 16 * 1024,
InitialMmapSize : 1 * 1024 * 1024,
NoFreelistSync : true,
NoSync : true,
}
if _db_0, _error := bbolt.Open (_compressCache, 0600, &_options); _error == nil {
_compressCacheDb = _db_0
} else {
AbortError (_error, "[eaff07f6] failed opening compression cache!")
}
}
_context := & context {
cdbWriter : _cdbWriter,
storedFilePaths : make ([]string, 0, 16 * 1024),
@ -790,6 +866,7 @@ func main_0 () (error) {
storedDataContentMeta : make (map[string]map[string]string, 16 * 1024),
storedFiles : make (map[[2]uint64][2]string, 16 * 1024),
compress : _compress,
compressCache : _compressCacheDb,
includeIndex : _includeIndex,
includeStripped : _includeStripped,
includeCache : _includeCache,
@ -846,6 +923,12 @@ func main_0 () (error) {
}
_context.cdbWriter = nil
if _context.compressCache != nil {
if _error := _context.compressCache.Close (); _error != nil {
AbortError (_error, "[53cbe28d] failed closing compression cache!")
}
}
if true {
log.Printf ("[ ] completed -- %0.2f minutes -- %d files, %d folders, %0.2f MiB (%0.2f MiB/s) -- %d compressed (%0.2f MiB, %0.2f%%) -- %d records (%0.2f MiB)\n",
time.Since (_context.progressStarted) .Minutes (),

View file

@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.7.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

View file

@ -25,9 +25,12 @@ github.com/valyala/fasthttp v1.31.0 h1:lrauRLII19afgCs2fnWRJ4M5IkV0lo2FqA61uGkNB
github.com/valyala/fasthttp v1.31.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View file

@ -14,7 +14,7 @@ import "github.com/andybalholm/brotli"
func Compress (_data []byte, _algorithm string) ([]byte, string, error) {
func Compress (_data []byte, _algorithm string) ([]byte, error) {
switch _algorithm {
case "gz", "gzip" :
return CompressGzip (_data)
@ -23,16 +23,32 @@ func Compress (_data []byte, _algorithm string) ([]byte, string, error) {
case "br", "brotli" :
return CompressBrotli (_data)
case "", "none", "identity" :
return _data, "identity", nil
return _data, nil
default :
return nil, "", fmt.Errorf ("[ea23f966] invalid compression algorithm `%s`", _algorithm)
return nil, fmt.Errorf ("[ea23f966] invalid compression algorithm `%s`", _algorithm)
}
}
func CompressEncoding (_algorithm string) (string, string, error) {
switch _algorithm {
case "gz", "gzip" :
return "gzip", "gzip", nil
case "zopfli" :
return "zopfli", "gzip", nil
case "br", "brotli" :
return "brotli", "br", nil
case "", "none", "identity" :
return "identity", "identity", nil
default :
return "", "", fmt.Errorf ("[6026a403] invalid compression algorithm `%s`", _algorithm)
}
}
func CompressGzip (_data []byte) ([]byte, string, error) {
func CompressGzip (_data []byte) ([]byte, error) {
_buffer := & bytes.Buffer {}
@ -40,24 +56,24 @@ func CompressGzip (_data []byte) ([]byte, string, error) {
if _encoder_0, _error := gzip.NewWriterLevel (_buffer, gzip.BestCompression); _error == nil {
_encoder = _encoder_0
} else {
return nil, "", _error
return nil, _error
}
if _, _error := _encoder.Write (_data); _error != nil {
return nil, "", _error
return nil, _error
}
if _error := _encoder.Close (); _error != nil {
return nil, "", _error
return nil, _error
}
_data = _buffer.Bytes ()
return _data, "gzip", nil
return _data, nil
}
func CompressZopfli (_data []byte) ([]byte, string, error) {
func CompressZopfli (_data []byte) ([]byte, error) {
_buffer := & bytes.Buffer {}
@ -69,17 +85,17 @@ func CompressZopfli (_data []byte) ([]byte, string, error) {
_options.BlockType = zopfli.DYNAMIC_BLOCK
if _error := zopfli.GzipCompress (&_options, _data, _buffer); _error != nil {
return nil, "", _error
return nil, _error
}
_data = _buffer.Bytes ()
return _data, "gzip", nil
return _data, nil
}
func CompressBrotli (_data []byte) ([]byte, string, error) {
func CompressBrotli (_data []byte) ([]byte, error) {
_buffer := & bytes.Buffer {}
@ -88,13 +104,13 @@ func CompressBrotli (_data []byte) ([]byte, string, error) {
_encoder := brotli.NewWriterOptions (_buffer, _options)
if _, _error := _encoder.Write (_data); _error != nil {
return nil, "", _error
return nil, _error
}
if _error := _encoder.Close (); _error != nil {
return nil, "", _error
return nil, _error
}
_data = _buffer.Bytes ()
return _data, "br", nil
return _data, nil
}