[archiver] Add support for sources cache.

This commit is contained in:
Ciprian Dorin Craciun 2021-12-15 17:42:26 +02:00
parent 12ba12fd9e
commit 2d988264f6

View file

@ -38,11 +38,12 @@ type context struct {
storedDataMeta map[string]bool
storedDataContent map[string]bool
storedDataContentMeta map[string]map[string]string
storedFiles map[[2]uint64][2]string
storedFiles map[string][2]string
storedKeys map[string]string
archivedReferences uint
compress string
compressCache *bbolt.DB
sourcesCache *bbolt.DB
dataUncompressedCount int
dataUncompressedSize int
dataCompressedCount int
@ -73,16 +74,28 @@ func archiveFile (_context *context, _pathResolved string, _pathInArchive string
defer _file.Close ()
var _fileId [2]uint64
var _fileDev uint64
var _fileInode uint64
var _fileSize uint64
var _fileTimestamp [2]uint64
if _stat, _error := _file.Stat (); _error == nil {
_stat := _stat.Sys()
if _stat, _ok := _stat.(*syscall.Stat_t); _ok {
_fileId = [2]uint64 { uint64 (_stat.Dev), uint64 (_stat.Ino) }
_fileDev = uint64 (_stat.Dev)
_fileInode = uint64 (_stat.Ino)
_fileSize = uint64 (_stat.Size)
_fileTimestamp = [2]uint64 { uint64 (_stat.Mtim.Sec), uint64 (_stat.Mtim.Nsec) }
} else {
return fmt.Errorf ("[6578d2d7] failed `stat`-ing: `%s`!", _pathResolved)
}
} else {
return _error
}
_fileIdText := fmt.Sprintf ("%d.%d-%d-%d.%d", _fileDev, _fileInode, _fileSize, _fileTimestamp[0], _fileTimestamp[1])
_fileIdRaw := sha256.Sum256 ([]byte (_fileIdText))
_fileId := hex.EncodeToString (_fileIdRaw[:])
var _wasStored bool
var _fingerprintContent string
var _fingerprintMeta string
@ -98,13 +111,35 @@ func archiveFile (_context *context, _pathResolved string, _pathInArchive string
if ! _wasStored {
if _dataContent_0, _error := ioutil.ReadAll (_file); _error == nil {
_dataContent = _dataContent_0
_dataContentRead := func () ([]byte, error) {
if _context.debug {
log.Printf ("[dd] [30ef6c2f] file <= `%s`\n", _pathInArchive)
}
var _data []byte
if _data_0, _error := ioutil.ReadAll (_file); _error == nil {
_data = _data_0
} else {
return _error
return nil, _error
}
if _stat, _error := _file.Stat (); _error == nil {
_stat := _stat.Sys()
if _stat, _ok := _stat.(*syscall.Stat_t); _ok {
if (
(_fileSize != uint64 (_stat.Size)) ||
(_fileTimestamp[0] != uint64 (_stat.Mtim.Sec)) ||
(_fileTimestamp[1] != uint64 (_stat.Mtim.Nsec))) {
return nil, fmt.Errorf ("[9689146e] file changed while reading: `%s`!", _pathResolved)
}
} else {
return nil, fmt.Errorf ("[523fa3d1] failed `stat`-ing: `%s`!", _pathResolved)
}
} else {
return nil, _error
}
return _data, nil
}
if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _dataContent, ""); _error != nil {
if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _fileId, _dataContentRead, ""); _error != nil {
return _error
} else {
_fingerprintContent = _fingerprintContent_0
@ -251,7 +286,11 @@ func archiveReferenceAndData (_context *context, _namespace string, _pathResolve
var _dataMeta map[string]string
var _dataMetaRaw []byte
if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, _dataContent, _dataType); _error != nil {
_dataContentRead := func () ([]byte, error) {
return _dataContent, nil
}
if _fingerprintContent_0, _dataContent_0, _dataMeta_0, _error := prepareDataContent (_context, _pathResolved, _pathInArchive, _name, "", _dataContentRead, _dataType); _error != nil {
return "", "", _error
} else {
_fingerprintContent = _fingerprintContent_0
@ -412,16 +451,60 @@ func archiveReference (_context *context, _namespace string, _pathInArchive stri
func prepareDataContent (_context *context, _pathResolved string, _pathInArchive string, _name string, _dataContent []byte, _dataType string) (string, []byte, map[string]string, error) {
func prepareDataContent (_context *context, _pathResolved string, _pathInArchive string, _name string, _dataContentId string, _dataContentRead func () ([]byte, error), _dataType string) (string, []byte, map[string]string, error) {
type DataPrepared struct {
DataFingerprint string
DataSize int
DataType string
}
var _dataPreparedCached bool
var _dataPrepared *DataPrepared
if (_context.sourcesCache != nil) && (_dataContentId != "") {
_cacheTxn, _error := _context.sourcesCache.Begin (false)
if _error != nil {
AbortError (_error, "[5fe9ada0] unexpected sources cache error!")
}
_cacheBucket := _cacheTxn.Bucket ([]byte ("prepare"))
if _cacheBucket != nil {
if _dataPreparedRaw := _cacheBucket.Get ([]byte (_dataContentId)); _dataPreparedRaw != nil {
if _error := json.Unmarshal (_dataPreparedRaw, &_dataPrepared); _error != nil {
AbortError (_error, "[6865d963] unexpected sources cache error!")
}
}
_dataPreparedCached = _dataPrepared != nil
}
if _error := _cacheTxn.Rollback (); _error != nil {
AbortError (_error, "[5137d84a] unexpected sources cache error!")
}
}
var _fingerprintContent string
var _dataContent []byte
var _dataSize int
if _dataPrepared != nil {
_fingerprintContent = _dataPrepared.DataFingerprint
_dataSize = _dataPrepared.DataSize
} else {
if _data_0, _error := _dataContentRead (); _error == nil {
_dataContent = _data_0
} else {
return "", nil, nil, _error
}
_fingerprintContentRaw := sha256.Sum256 (_dataContent)
_fingerprintContent := hex.EncodeToString (_fingerprintContentRaw[:])
_fingerprintContent = hex.EncodeToString (_fingerprintContentRaw[:])
_dataSize = len (_dataContent)
}
if _wasStored, _ := _context.storedDataContent[_fingerprintContent]; _wasStored {
_dataMeta := _context.storedDataContentMeta[_fingerprintContent]
return _fingerprintContent, nil, _dataMeta, nil
}
if (_dataType == "") && (_dataPrepared != nil) {
_dataType = _dataPrepared.DataType
}
if (_dataType == "") && (_name != "") {
_extension := filepath.Ext (_name)
if _extension != "" {
@ -429,16 +512,47 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
}
_dataType, _ = MimeTypesByExtension[_extension]
}
if _dataType == "" {
if (_dataType == "") {
_dataType = http.DetectContentType (_dataContent)
}
if _dataType == "" {
_dataType = MimeTypeRaw
}
if (_context.sourcesCache != nil) && (_dataContentId != "") && !_dataPreparedCached {
_dataPrepared = & DataPrepared {
DataFingerprint : _fingerprintContent,
DataSize : _dataSize,
DataType : _dataType,
}
_cacheTxn, _error := _context.sourcesCache.Begin (true)
if _error != nil {
AbortError (_error, "[acf09d20] unexpected sources cache error!")
}
_cacheBucket := _cacheTxn.Bucket ([]byte ("prepare"))
if _cacheBucket == nil {
if _bucket_0, _error := _cacheTxn.CreateBucket ([]byte ("prepare")); _error == nil {
_cacheBucket = _bucket_0
} else {
AbortError (_error, "[c21b0972] unexpected sources cache error!")
}
}
var _dataPreparedRaw []byte
if _data_0, _error := json.Marshal (_dataPrepared); _error == nil {
_dataPreparedRaw = _data_0
} else {
AbortError (_error, "[5538658b] unexpected sources cache error!")
}
if _error := _cacheBucket.Put ([]byte (_dataContentId), _dataPreparedRaw); _error != nil {
AbortError (_error, "[b4a6b6f9] unexpected sources cache error!")
}
if _error := _cacheTxn.Commit (); _error != nil {
AbortError (_error, "[5581f8ec] unexpected sources cache error!")
}
}
_dataEncoding := "identity"
_dataUncompressedSize := len (_dataContent)
_dataSize := _dataUncompressedSize
_dataUncompressedSize := _dataSize
var _compressAlgorithm string
var _compressEncoding string
@ -461,7 +575,7 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
}
_cacheBucket := _cacheTxn.Bucket ([]byte (_compressAlgorithm))
if _cacheBucket != nil {
_dataCompressed = _cacheBucket.Get (_fingerprintContentRaw[:])
_dataCompressed = _cacheBucket.Get ([]byte (_fingerprintContent))
_dataCompressedCached = _dataCompressed != nil
}
if _error := _cacheTxn.Rollback (); _error != nil {
@ -470,6 +584,13 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
}
if _dataCompressed == nil {
if _dataContent == nil {
if _data_0, _error := _dataContentRead (); _error == nil {
_dataContent = _data_0
} else {
return "", nil, nil, _error
}
}
if _data_0, _error := Compress (_dataContent, _compressAlgorithm); _error == nil {
_dataCompressed = _data_0
} else {
@ -509,7 +630,7 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
AbortError (_error, "[b7766792] unexpected compression cache error!")
}
}
if _error := _cacheBucket.Put (_fingerprintContentRaw[:], _dataCompressed); _error != nil {
if _error := _cacheBucket.Put ([]byte (_fingerprintContent), _dataCompressed); _error != nil {
AbortError (_error, "[51d57220] unexpected compression cache error!")
}
if _error := _cacheTxn.Commit (); _error != nil {
@ -533,6 +654,14 @@ func prepareDataContent (_context *context, _pathResolved string, _pathInArchive
}
}
if _dataContent == nil {
if _data_0, _error := _dataContentRead (); _error == nil {
_dataContent = _data_0
} else {
return "", nil, nil, _error
}
}
_context.dataUncompressedSize += _dataUncompressedSize
_context.dataUncompressedCount += 1
_context.dataCompressedSize += _dataSize
@ -783,6 +912,7 @@ func main_0 () (error) {
var _sourcesFolder string
var _sourcesCache string
var _archiveFile string
var _compress string
var _compressCache string
@ -839,6 +969,7 @@ func main_0 () (error) {
}
_sourcesFolder_0 := _flags.String ("sources", "", "")
_sourcesCache_0 := _flags.String ("sources-cache", "", "")
_archiveFile_0 := _flags.String ("archive", "", "")
_compress_0 := _flags.String ("compress", "", "")
_compressCache_0 := _flags.String ("compress-cache", "", "")
@ -854,6 +985,7 @@ func main_0 () (error) {
FlagsParse (_flags, 0, 0)
_sourcesFolder = *_sourcesFolder_0
_sourcesCache = *_sourcesCache_0
_archiveFile = *_archiveFile_0
_compress = *_compress_0
_compressCache = *_compressCache_0
@ -897,6 +1029,21 @@ func main_0 () (error) {
}
}
var _sourcesCacheDb *bbolt.DB
if _sourcesCache != "" {
_options := bbolt.Options {
PageSize : 16 * 1024,
InitialMmapSize : 1 * 1024 * 1024,
NoFreelistSync : true,
NoSync : true,
}
if _db_0, _error := bbolt.Open (_sourcesCache, 0600, &_options); _error == nil {
_sourcesCacheDb = _db_0
} else {
AbortError (_error, "[17a308dc] failed opening sources cache!")
}
}
_context := & context {
cdbWriter : _cdbWriter,
storedFilePaths : make ([]string, 0, 16 * 1024),
@ -904,10 +1051,11 @@ func main_0 () (error) {
storedDataMeta : make (map[string]bool, 16 * 1024),
storedDataContent : make (map[string]bool, 16 * 1024),
storedDataContentMeta : make (map[string]map[string]string, 16 * 1024),
storedFiles : make (map[[2]uint64][2]string, 16 * 1024),
storedFiles : make (map[string][2]string, 16 * 1024),
storedKeys : make (map[string]string, 16 * 1024),
compress : _compress,
compressCache : _compressCacheDb,
sourcesCache : _sourcesCacheDb,
includeIndex : _includeIndex,
includeStripped : _includeStripped,
includeCache : _includeCache,
@ -970,6 +1118,12 @@ func main_0 () (error) {
}
}
if _context.sourcesCache != nil {
if _error := _context.sourcesCache.Close (); _error != nil {
AbortError (_error, "[7fe3692c] failed closing compression cache!")
}
}
if true {
log.Printf ("[ii] [56f63575] completed -- %0.2f minutes -- %d files, %d folders, %0.2f MiB (%0.2f MiB/s) -- %d compressed (%0.2f MiB, %0.2f%%) -- %d records (%0.2f MiB)\n",
time.Since (_context.progressStarted) .Minutes (),