tar-stream/extract.js
2023-06-17 21:47:29 +02:00

407 lines
9 KiB
JavaScript

const { Writable, Readable, getStreamError } = require('streamx')
const FIFO = require('fast-fifo')
const b4a = require('b4a')
const headers = require('./headers')
const EMPTY = b4a.alloc(0)
class BufferList {
constructor () {
this.buffered = 0
this.shifted = 0
this.queue = new FIFO()
this._offset = 0
}
push (buffer) {
this.buffered += buffer.byteLength
this.queue.push(buffer)
}
shiftFirst (size) {
return this._buffered === 0 ? null : this._next(size)
}
shift (size) {
if (size > this.buffered) return null
if (size === 0) return EMPTY
let chunk = this._next(size)
if (size === chunk.byteLength) return chunk // likely case
const chunks = [chunk]
while ((size -= chunk.byteLength) > 0) {
chunk = this._next(size)
chunks.push(chunk)
}
return b4a.concat(chunks)
}
_next (size) {
const buf = this.queue.peek()
const rem = buf.byteLength - this._offset
if (size >= rem) {
const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
this.queue.shift()
this._offset = 0
this.buffered -= rem
this.shifted += rem
return sub
}
this.buffered -= size
this.shifted += size
return buf.subarray(this._offset, (this._offset += size))
}
}
class Source extends Readable {
constructor (self, header, offset) {
super()
this.header = header
this.offset = offset
this._parent = self
}
_read (cb) {
if (this.header.size === 0) {
this.push(null)
}
if (this._parent._stream === this) {
this._parent._update()
}
cb(null)
}
_predestroy () {
this._parent.destroy(getStreamError(this))
}
_detach () {
if (this._parent._stream === this) {
this._parent._stream = null
this._parent._missing = overflow(this.header.size)
this._parent._update()
}
}
_destroy (cb) {
this._detach()
cb(null)
}
}
class Extract extends Writable {
constructor (opts) {
super(opts)
if (!opts) opts = {}
this._buffer = new BufferList()
this._offset = 0
this._header = null
this._stream = null
this._missing = 0
this._longHeader = false
this._callback = noop
this._locked = false
this._finished = false
this._pax = null
this._paxGlobal = null
this._gnuLongPath = null
this._gnuLongLinkPath = null
this._filenameEncoding = opts.filenameEncoding || 'utf-8'
this._allowUnknownFormat = !!opts.allowUnknownFormat
this._unlockBound = this._unlock.bind(this)
}
_unlock (err) {
this._locked = false
if (err) {
this.destroy(err)
this._continueWrite(err)
return
}
this._update()
}
_consumeHeader () {
if (this._locked) return false
this._offset = this._buffer.shifted
try {
this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
} catch (err) {
this._continueWrite(err)
return false
}
if (!this._header) return true
switch (this._header.type) {
case 'gnu-long-path':
case 'gnu-long-link-path':
case 'pax-global-header':
case 'pax-header':
this._longHeader = true
this._missing = this._header.size
return true
}
this._locked = true
this._applyLongHeaders()
if (this._header.size === 0 || this._header.type === 'directory') {
this.emit('entry', this._header, this._createStream(), this._unlockBound)
return true
}
this._stream = this._createStream()
this._missing = this._header.size
this.emit('entry', this._header, this._stream, this._unlockBound)
return true
}
_applyLongHeaders () {
if (this._gnuLongPath) {
this._header.name = this._gnuLongPath
this._gnuLongPath = null
}
if (this._gnuLongLinkPath) {
this._header.linkname = this._gnuLongLinkPath
this._gnuLongLinkPath = null
}
if (this._pax) {
if (this._pax.path) this._header.name = this._pax.path
if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
this._header.pax = this._pax
this._pax = null
}
}
_decodeLongHeader (buf) {
switch (this._header.type) {
case 'gnu-long-path':
this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
break
case 'gnu-long-link-path':
this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
break
case 'pax-global-header':
this._paxGlobal = headers.decodePax(buf)
break
case 'pax-header':
this._pax = this._paxGlobal === null
? headers.decodePax(buf)
: Object.assign({}, this._paxGlobal, headers.decodePax(buf))
break
}
}
_consumeLongHeader () {
this._longHeader = false
this._missing = overflow(this._header.size)
const buf = this._buffer.shift(this._header.size)
try {
this._decodeLongHeader(buf)
} catch (err) {
this._continueWrite(err)
return false
}
return true
}
_consumeStream () {
const buf = this._buffer.shiftFirst(this._missing)
if (buf === null) return false
this._missing -= buf.byteLength
const drained = this._stream.push(buf)
if (this._missing === 0) {
this._stream.push(null)
if (drained) this._stream._detach()
return drained && this._locked === false
}
return drained
}
_createStream () {
return new Source(this, this._header, this._offset)
}
_update () {
while (this._buffer.buffered > 0 && !this.destroying) {
if (this._missing > 0) {
if (this._stream !== null) {
if (this._consumeStream() === false) return
continue
}
if (this._longHeader === true) {
if (this._missing > this._buffer.buffered) break
if (this._consumeLongHeader() === false) return false
continue
}
const ignore = this._buffer.shiftFirst(this._missing)
if (ignore !== null) this._missing -= ignore.byteLength
continue
}
if (this._buffer.buffered < 512) break
if (this._stream !== null || this._consumeHeader() === false) return
}
this._continueWrite(null)
}
_continueWrite (err) {
const cb = this._callback
this._callback = noop
cb(err)
}
_write (data, cb) {
this._callback = cb
this._buffer.push(data)
this._update()
}
_final (cb) {
this._finished = this._missing === 0 && this._buffer.buffered === 0
cb(this._finished ? null : new Error('Unexpected end of data'))
}
_predestroy () {
this._continueWrite(null)
}
_destroy (cb) {
if (this._stream) this._stream.destroy(getStreamError(this))
cb(null)
}
[Symbol.asyncIterator] () {
let error = null
let promiseResolve = null
let promiseReject = null
let entryStream = null
let entryCallback = null
const extract = this
this.on('entry', onentry)
this.on('error', (err) => { error = err })
this.on('close', onclose)
return {
[Symbol.asyncIterator] () {
return this
},
next () {
return new Promise(onnext)
},
return () {
return destroy(null)
},
throw (err) {
return destroy(err)
}
}
function consumeCallback (err) {
if (!entryCallback) return
const cb = entryCallback
entryCallback = null
cb(err)
}
function onnext (resolve, reject) {
if (error) {
return reject(error)
}
if (entryStream) {
resolve({ value: entryStream, done: false })
entryStream = null
return
}
promiseResolve = resolve
promiseReject = reject
consumeCallback(null)
if (extract._finished && promiseResolve) {
promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}
}
function onentry (header, stream, callback) {
entryCallback = callback
stream.on('error', noop) // no way around this due to tick sillyness
if (promiseResolve) {
promiseResolve({ value: stream, done: false })
promiseResolve = promiseReject = null
} else {
entryStream = stream
}
}
function onclose () {
consumeCallback(error)
if (!promiseResolve) return
if (error) promiseReject(error)
else promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}
function destroy (err) {
extract.destroy(err)
consumeCallback(err)
return new Promise((resolve, reject) => {
if (extract.destroyed) return resolve({ value: undefined, done: true })
extract.once('close', function () {
if (err) reject(err)
else resolve({ value: undefined, done: true })
})
})
}
}
}
module.exports = function extract (opts) {
return new Extract(opts)
}
function noop () {}
function overflow (size) {
size &= 511
return size && 512 - size
}