From 2d988264f6cc872718bea3c5b792c4aa48779f57 Mon Sep 17 00:00:00 2001 From: Ciprian Dorin Craciun Date: Wed, 15 Dec 2021 17:42:26 +0200 Subject: [PATCH] [archiver] Add support for sources cache. --- sources/cmd/archiver/archiver.go | 192 ++++++++++++++++++++++++++++--- 1 file changed, 173 insertions(+), 19 deletions(-) diff --git a/sources/cmd/archiver/archiver.go b/sources/cmd/archiver/archiver.go index e1c0ea9..884d361 100644 --- a/sources/cmd/archiver/archiver.go +++ b/sources/cmd/archiver/archiver.go @@ -38,11 +38,12 @@ type context struct { storedDataMeta map[string]bool storedDataContent map[string]bool storedDataContentMeta map[string]map[string]string - storedFiles map[[2]uint64][2]string + storedFiles map[string][2]string storedKeys map[string]string archivedReferences uint compress string compressCache *bbolt.DB + sourcesCache *bbolt.DB dataUncompressedCount int dataUncompressedSize int dataCompressedCount int @@ -73,16 +74,28 @@ func archiveFile (_context *context, _pathResolved string, _pathInArchive string defer _file.Close () - var _fileId [2]uint64 + var _fileDev uint64 + var _fileInode uint64 + var _fileSize uint64 + var _fileTimestamp [2]uint64 if _stat, _error := _file.Stat (); _error == nil { _stat := _stat.Sys() if _stat, _ok := _stat.(*syscall.Stat_t); _ok { - _fileId = [2]uint64 { uint64 (_stat.Dev), uint64 (_stat.Ino) } + _fileDev = uint64 (_stat.Dev) + _fileInode = uint64 (_stat.Ino) + _fileSize = uint64 (_stat.Size) + _fileTimestamp = [2]uint64 { uint64 (_stat.Mtim.Sec), uint64 (_stat.Mtim.Nsec) } } else { return fmt.Errorf ("[6578d2d7] failed `stat`-ing: `%s`!", _pathResolved) } + } else { + return _error } + _fileIdText := fmt.Sprintf ("%d.%d-%d-%d.%d", _fileDev, _fileInode, _fileSize, _fileTimestamp[0], _fileTimestamp[1]) + _fileIdRaw := sha256.Sum256 ([]byte (_fileIdText)) + _fileId := hex.EncodeToString (_fileIdRaw[:]) + var _wasStored bool var _fingerprintContent string var _fingerprintMeta string @@ -98,13 +111,35 @@ func archiveFile (_context *context, _pathResolved string, _pathInArchive string if ! _wasStored { - if _dataContent_0, _error := ioutil.ReadAll (_file); _error == nil { - _dataContent = _dataContent_0 - } else { - return _error - } + _dataContentRead := func () ([]byte, error) { + if _context.debug { + log.Printf ("[dd] [30ef6c2f] file <= `%s`\n", _pathInArchive) + } + var _data []byte + if _data_0, _error := ioutil.ReadAll (_file); _error == nil { + _data = _data_0 + } else { + return nil, _error + } + if _stat, _error := _file.Stat (); _error == nil { + _stat := _stat.Sys() + if _stat, _ok := _stat.(*syscall.Stat_t); _ok { + if ( + (_fileSize != uint64 (_stat.Size)) || + (_fileTimestamp[0] != uint64 (_stat.Mtim.Sec)) || + (_fileTimestamp[1] != uint64 (_stat.Mtim.Nsec))) { + return nil, fmt.Errorf ("[9689146e] file changed while reading: `%s`!", _pathResolved) + } + } else { + return nil, fmt.Errorf ("[523fa3d1] failed `stat`-ing: `%s`!", _pathResolved) + } + } else { + return nil, _error + } + return _data, nil + } - if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _dataContent, ""); _error != nil { + if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _fileId, _dataContentRead, ""); _error != nil { return _error } else { _fingerprintContent = _fingerprintContent_0 @@ -251,7 +286,11 @@ func archiveReferenceAndData (_context *context, _namespace string, _pathResolve var _dataMeta map[string]string var _dataMetaRaw []byte - if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _dataContent, _dataType); _error != nil { + _dataContentRead := func () ([]byte, error) { + return _dataContent, nil + } + + if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, "", _dataContentRead, _dataType); _error != nil { return "", "", _error } else { _fingerprintContent = _fingerprintContent_0 @@ -412,16 +451,60 @@ func archiveReference (_context *context, _namespace string, _pathInArchive stri -func prepareDataContent (_context *context, _pathResolved string, _pathInArchive string, _name string, _dataContent []byte, _dataType string) (string, []byte, map[string]string, error) { +func prepareDataContent (_context *context, _pathResolved string, _pathInArchive string, _name string, _dataContentId string, _dataContentRead func () ([]byte, error), _dataType string) (string, []byte, map[string]string, error) { - _fingerprintContentRaw := sha256.Sum256 (_dataContent) - _fingerprintContent := hex.EncodeToString (_fingerprintContentRaw[:]) + type DataPrepared struct { + DataFingerprint string + DataSize int + DataType string + } + + var _dataPreparedCached bool + var _dataPrepared *DataPrepared + if (_context.sourcesCache != nil) && (_dataContentId != "") { + _cacheTxn, _error := _context.sourcesCache.Begin (false) + if _error != nil { + AbortError (_error, "[5fe9ada0] unexpected sources cache error!") + } + _cacheBucket := _cacheTxn.Bucket ([]byte ("prepare")) + if _cacheBucket != nil { + if _dataPreparedRaw := _cacheBucket.Get ([]byte (_dataContentId)); _dataPreparedRaw != nil { + if _error := json.Unmarshal (_dataPreparedRaw, &_dataPrepared); _error != nil { + AbortError (_error, "[6865d963] unexpected sources cache error!") + } + } + _dataPreparedCached = _dataPrepared != nil + } + if _error := _cacheTxn.Rollback (); _error != nil { + AbortError (_error, "[5137d84a] unexpected sources cache error!") + } + } + + var _fingerprintContent string + var _dataContent []byte + var _dataSize int + if _dataPrepared != nil { + _fingerprintContent = _dataPrepared.DataFingerprint + _dataSize = _dataPrepared.DataSize + } else { + if _data_0, _error := _dataContentRead (); _error == nil { + _dataContent = _data_0 + } else { + return "", nil, nil, _error + } + _fingerprintContentRaw := sha256.Sum256 (_dataContent) + _fingerprintContent = hex.EncodeToString (_fingerprintContentRaw[:]) + _dataSize = len (_dataContent) + } if _wasStored, _ := _context.storedDataContent[_fingerprintContent]; _wasStored { _dataMeta := _context.storedDataContentMeta[_fingerprintContent] return _fingerprintContent, nil, _dataMeta, nil } + if (_dataType == "") && (_dataPrepared != nil) { + _dataType = _dataPrepared.DataType + } if (_dataType == "") && (_name != "") { _extension := filepath.Ext (_name) if _extension != "" { @@ -429,16 +512,47 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive } _dataType, _ = MimeTypesByExtension[_extension] } - if _dataType == "" { + if (_dataType == "") { _dataType = http.DetectContentType (_dataContent) } if _dataType == "" { _dataType = MimeTypeRaw } + if (_context.sourcesCache != nil) && (_dataContentId != "") && !_dataPreparedCached { + _dataPrepared = & DataPrepared { + DataFingerprint : _fingerprintContent, + DataSize : _dataSize, + DataType : _dataType, + } + _cacheTxn, _error := _context.sourcesCache.Begin (true) + if _error != nil { + AbortError (_error, "[acf09d20] unexpected sources cache error!") + } + _cacheBucket := _cacheTxn.Bucket ([]byte ("prepare")) + if _cacheBucket == nil { + if _bucket_0, _error := _cacheTxn.CreateBucket ([]byte ("prepare")); _error == nil { + _cacheBucket = _bucket_0 + } else { + AbortError (_error, "[c21b0972] unexpected sources cache error!") + } + } + var _dataPreparedRaw []byte + if _data_0, _error := json.Marshal (_dataPrepared); _error == nil { + _dataPreparedRaw = _data_0 + } else { + AbortError (_error, "[5538658b] unexpected sources cache error!") + } + if _error := _cacheBucket.Put ([]byte (_dataContentId), _dataPreparedRaw); _error != nil { + AbortError (_error, "[b4a6b6f9] unexpected sources cache error!") + } + if _error := _cacheTxn.Commit (); _error != nil { + AbortError (_error, "[5581f8ec] unexpected sources cache error!") + } + } + _dataEncoding := "identity" - _dataUncompressedSize := len (_dataContent) - _dataSize := _dataUncompressedSize + _dataUncompressedSize := _dataSize var _compressAlgorithm string var _compressEncoding string @@ -461,7 +575,7 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive } _cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm)) if _cacheBucket != nil { - _dataCompressed = _cacheBucket.Get (_fingerprintContentRaw[:]) + _dataCompressed = _cacheBucket.Get ([]byte (_fingerprintContent)) _dataCompressedCached = _dataCompressed != nil } if _error := _cacheTxn.Rollback (); _error != nil { @@ -470,6 +584,13 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive } if _dataCompressed == nil { + if _dataContent == nil { + if _data_0, _error := _dataContentRead (); _error == nil { + _dataContent = _data_0 + } else { + return "", nil, nil, _error + } + } if _data_0, _error := Compress (_dataContent, _compressAlgorithm); _error == nil { _dataCompressed = _data_0 } else { @@ -509,7 +630,7 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive AbortError (_error, "[b7766792] unexpected compression cache error!") } } - if _error := _cacheBucket.Put (_fingerprintContentRaw[:], _dataCompressed); _error != nil { + if _error := _cacheBucket.Put ([]byte (_fingerprintContent), _dataCompressed); _error != nil { AbortError (_error, "[51d57220] unexpected compression cache error!") } if _error := _cacheTxn.Commit (); _error != nil { @@ -533,6 +654,14 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive } } + if _dataContent == nil { + if _data_0, _error := _dataContentRead (); _error == nil { + _dataContent = _data_0 + } else { + return "", nil, nil, _error + } + } + _context.dataUncompressedSize += _dataUncompressedSize _context.dataUncompressedCount += 1 _context.dataCompressedSize += _dataSize @@ -783,6 +912,7 @@ func main_0 () (error) { var _sourcesFolder string + var _sourcesCache string var _archiveFile string var _compress string var _compressCache string @@ -839,6 +969,7 @@ func main_0 () (error) { } _sourcesFolder_0 := _flags.String ("sources", "", "") + _sourcesCache_0 := _flags.String ("sources-cache", "", "") _archiveFile_0 := _flags.String ("archive", "", "") _compress_0 := _flags.String ("compress", "", "") _compressCache_0 := _flags.String ("compress-cache", "", "") @@ -854,6 +985,7 @@ func main_0 () (error) { FlagsParse (_flags, 0, 0) _sourcesFolder = *_sourcesFolder_0 + _sourcesCache = *_sourcesCache_0 _archiveFile = *_archiveFile_0 _compress = *_compress_0 _compressCache = *_compressCache_0 @@ -897,6 +1029,21 @@ func main_0 () (error) { } } + var _sourcesCacheDb *bbolt.DB + if _sourcesCache != "" { + _options := bbolt.Options { + PageSize : 16 * 1024, + InitialMmapSize : 1 * 1024 * 1024, + NoFreelistSync : true, + NoSync : true, + } + if _db_0, _error := bbolt.Open (_sourcesCache, 0600, &_options); _error == nil { + _sourcesCacheDb = _db_0 + } else { + AbortError (_error, "[17a308dc] failed opening sources cache!") + } + } + _context := & context { cdbWriter : _cdbWriter, storedFilePaths : make ([]string, 0, 16 * 1024), @@ -904,10 +1051,11 @@ func main_0 () (error) { storedDataMeta : make (map[string]bool, 16 * 1024), storedDataContent : make (map[string]bool, 16 * 1024), storedDataContentMeta : make (map[string]map[string]string, 16 * 1024), - storedFiles : make (map[[2]uint64][2]string, 16 * 1024), + storedFiles : make (map[string][2]string, 16 * 1024), storedKeys : make (map[string]string, 16 * 1024), compress : _compress, compressCache : _compressCacheDb, + sourcesCache : _sourcesCacheDb, includeIndex : _includeIndex, includeStripped : _includeStripped, includeCache : _includeCache, @@ -970,6 +1118,12 @@ func main_0 () (error) { } } + if _context.sourcesCache != nil { + if _error := _context.sourcesCache.Close (); _error != nil { + AbortError (_error, "[7fe3692c] failed closing compression cache!") + } + } + if true { log.Printf ("[ii] [56f63575] completed -- %0.2f minutes -- %d files, %d folders, %0.2f MiB (%0.2f MiB/s) -- %d compressed (%0.2f MiB, %0.2f%%) -- %d records (%0.2f MiB)\n", time.Since (_context.progressStarted) .Minutes (),