Improve streams (#150)

This commit is contained in:
Mathias Buus 2023-06-17 19:11:53 +02:00 committed by GitHub
parent ca0e270f11
commit 5f9ff0b594
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 925 additions and 580 deletions

View file

@ -27,14 +27,14 @@ If you want to pack/unpack directories on the file system check out [tar-fs](htt
To create a pack stream use `tar.pack()` and call `pack.entry(header, [callback])` to add tar entries.
``` js
var tar = require('tar-stream')
var pack = tar.pack() // pack is a stream
const tar = require('tar-stream')
const pack = tar.pack() // pack is a stream
// add a file called my-test.txt with the content "Hello World!"
pack.entry({ name: 'my-test.txt' }, 'Hello World!')
// add a file called my-stream-test.txt from a stream
var entry = pack.entry({ name: 'my-stream-test.txt', size: 11 }, function(err) {
const entry = pack.entry({ name: 'my-stream-test.txt', size: 11 }, function(err) {
// the stream was added
// no more entries
pack.finalize()
@ -54,21 +54,21 @@ pack.pipe(process.stdout)
To extract a stream use `tar.extract()` and listen for `extract.on('entry', (header, stream, next) )`
``` js
var extract = tar.extract()
const extract = tar.extract()
extract.on('entry', function(header, stream, next) {
extract.on('entry', function (header, stream, next) {
// header is the tar header
// stream is the content body (might be an empty stream)
// call next when you are done with this entry
stream.on('end', function() {
stream.on('end', function () {
next() // ready for next entry
})
stream.resume() // just auto drain the stream
})
extract.on('finish', function() {
extract.on('finish', function () {
// all entries read
})
@ -77,6 +77,21 @@ pack.pipe(extract)
The tar archive is streamed sequentially, meaning you **must** drain each entry's stream as you get them or else the main extract stream will receive backpressure and stop reading.
## Extracting as an async iterator
The extraction stream in addition to being a writable stream is also an async iterator
``` js
const extract = tar.extract()
someStream.pipe(extract)
for await (const entry of extract) {
entry.header // the tar header
entry.resume() // the entry is the stream also
}
```
## Headers
The header object using in `entry` should contain the following properties.
@ -106,18 +121,18 @@ Most of these values can be found by stat'ing a file.
Using tar-stream it is easy to rewrite paths / change modes etc in an existing tarball.
``` js
var extract = tar.extract()
var pack = tar.pack()
var path = require('path')
const extract = tar.extract()
const pack = tar.pack()
const path = require('path')
extract.on('entry', function(header, stream, callback) {
extract.on('entry', function (header, stream, callback) {
// let's prefix all names with 'tmp'
header.name = path.join('tmp', header.name)
// write the new entry to the pack stream
stream.pipe(pack.entry(header, callback))
})
extract.on('finish', function() {
extract.on('finish', function () {
// all entries done - lets finalize it
pack.finalize()
})
@ -133,15 +148,15 @@ pack.pipe(newTarballStream)
``` js
var fs = require('fs')
var tar = require('tar-stream')
const fs = require('fs')
const tar = require('tar-stream')
var pack = tar.pack() // pack is a stream
var path = 'YourTarBall.tar'
var yourTarball = fs.createWriteStream(path)
const pack = tar.pack() // pack is a stream
const path = 'YourTarBall.tar'
const yourTarball = fs.createWriteStream(path)
// add a file called YourFile.txt with the content "Hello World!"
pack.entry({name: 'YourFile.txt'}, 'Hello World!', function (err) {
pack.entry({ name: 'YourFile.txt' }, 'Hello World!', function (err) {
if (err) throw err
pack.finalize()
})

12
constants.js Normal file
View file

@ -0,0 +1,12 @@
try {
module.exports = require('fs').constants
} catch {
module.exports = { // just for envs without fs
S_IFMT: 61440,
S_IFDIR: 16384,
S_IFCHR: 8192,
S_IFBLK: 24576,
S_IFIFO: 4096,
S_IFLNK: 40960
}
}

View file

@ -1,37 +1,98 @@
const bl = require('bl')
const { Writable, PassThrough } = require('streamx')
const { Writable, Readable, getStreamError } = require('streamx')
const FIFO = require('fast-fifo')
const b4a = require('b4a')
const headers = require('./headers')
const noop = function () {}
const EMPTY = b4a.alloc(0)
const overflow = function (size) {
size &= 511
return size && 512 - size
class BufferList {
constructor () {
this.buffered = 0
this.shifted = 0
this.queue = new FIFO()
this._offset = 0
}
push (buffer) {
this.buffered += buffer.byteLength
this.queue.push(buffer)
}
shiftFirst (size) {
return this._buffered === 0 ? null : this._next(size)
}
shift (size) {
if (size > this.buffered) return null
if (size === 0) return EMPTY
let chunk = this._next(size)
if (size === chunk.byteLength) return chunk // likely case
const chunks = [chunk]
while ((size -= chunk.byteLength) > 0) {
chunk = this._next(size)
chunks.push(chunk)
}
return b4a.concat(chunks)
}
_next (size) {
const buf = this.queue.peek()
const rem = buf.byteLength - this._offset
if (size >= rem) {
const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
this.queue.shift()
this._offset = 0
this.buffered -= rem
this.shifted += rem
return sub
}
this.buffered -= size
this.shifted += size
return buf.subarray(this._offset, (this._offset += size))
}
}
const emptyStream = function (self, offset) {
const s = new Source(self, offset)
s.end()
return s
}
const mixinPax = function (header, pax) {
if (pax.path) header.name = pax.path
if (pax.linkpath) header.linkname = pax.linkpath
if (pax.size) header.size = parseInt(pax.size, 10)
header.pax = pax
return header
}
class Source extends PassThrough {
constructor (self, offset) {
class Source extends Readable {
constructor (self, header, offset) {
super()
this._parent = self
this.header = header
this.offset = offset
this._parent = self
}
_read (cb) {
if (this._parent._stream === this) {
this._parent._update()
}
cb(null)
}
_predestroy () {
this._parent.destroy()
this._parent.destroy(getStreamError(this))
}
_detach () {
if (this._parent._stream === this) {
this._parent._stream = null
this._parent._missing = overflow(this.header.size)
this._parent._update()
}
}
_destroy (cb) {
this._detach()
cb(null)
}
}
@ -39,208 +100,306 @@ class Extract extends Writable {
constructor (opts) {
super(opts)
opts = opts || {}
if (!opts) opts = {}
this._buffer = new BufferList()
this._offset = 0
this._buffer = bl()
this._missing = 0
this._partial = false
this._onparse = noop
this._header = null
this._stream = null
this._overflow = null
this._cb = null
this._missing = 0
this._longHeader = false
this._callback = noop
this._locked = false
this._finished = false
this._pax = null
this._paxGlobal = null
this._gnuLongPath = null
this._gnuLongLinkPath = null
const self = this
const b = self._buffer
const oncontinue = function () {
self._continue()
}
const onunlock = function (err) {
self._locked = false
if (err) return self.destroy(err)
if (!self._stream) oncontinue()
}
const onstreamend = function () {
self._stream = null
const drain = overflow(self._header.size)
if (drain) self._parse(drain, ondrain)
else self._parse(512, onheader)
if (!self._locked) oncontinue()
}
const ondrain = function () {
self._buffer.consume(overflow(self._header.size))
self._parse(512, onheader)
oncontinue()
}
const onpaxglobalheader = function () {
const size = self._header.size
self._paxGlobal = headers.decodePax(b.slice(0, size))
b.consume(size)
onstreamend()
}
const onpaxheader = function () {
const size = self._header.size
self._pax = headers.decodePax(b.slice(0, size))
if (self._paxGlobal) self._pax = Object.assign({}, self._paxGlobal, self._pax)
b.consume(size)
onstreamend()
}
const ongnulongpath = function () {
const size = self._header.size
this._gnuLongPath = headers.decodeLongPath(b.slice(0, size), opts.filenameEncoding)
b.consume(size)
onstreamend()
}
const ongnulonglinkpath = function () {
const size = self._header.size
this._gnuLongLinkPath = headers.decodeLongPath(b.slice(0, size), opts.filenameEncoding)
b.consume(size)
onstreamend()
}
const onheader = function () {
const offset = self._offset
let header
try {
header = self._header = headers.decode(b.slice(0, 512), opts.filenameEncoding, opts.allowUnknownFormat)
} catch (err) {
self.destroy(err)
}
b.consume(512)
if (!header) {
self._parse(512, onheader)
oncontinue()
return
}
if (header.type === 'gnu-long-path') {
self._parse(header.size, ongnulongpath)
oncontinue()
return
}
if (header.type === 'gnu-long-link-path') {
self._parse(header.size, ongnulonglinkpath)
oncontinue()
return
}
if (header.type === 'pax-global-header') {
self._parse(header.size, onpaxglobalheader)
oncontinue()
return
}
if (header.type === 'pax-header') {
self._parse(header.size, onpaxheader)
oncontinue()
return
}
if (self._gnuLongPath) {
header.name = self._gnuLongPath
self._gnuLongPath = null
}
if (self._gnuLongLinkPath) {
header.linkname = self._gnuLongLinkPath
self._gnuLongLinkPath = null
}
if (self._pax) {
self._header = header = mixinPax(header, self._pax)
self._pax = null
}
self._locked = true
if (!header.size || header.type === 'directory') {
self._parse(512, onheader)
self.emit('entry', header, emptyStream(self, offset), onunlock)
return
}
self._stream = new Source(self, offset)
self.emit('entry', header, self._stream, onunlock)
self._parse(header.size, onstreamend)
oncontinue()
}
this._onheader = onheader
this._parse(512, onheader)
this._filenameEncoding = opts.filenameEncoding || 'utf-8'
this._allowUnknownFormat = !!opts.allowUnknownFormat
this._unlockBound = this._unlock.bind(this)
}
_parse (size, onparse) {
this._offset += size
this._missing = size
if (onparse === this._onheader) this._partial = false
this._onparse = onparse
_unlock (err) {
this._locked = false
if (err) {
this.destroy(err)
this._continueWrite(err)
return
}
this._update()
}
_continue () {
const cb = this._cb
this._cb = noop
if (this._overflow) this._write(this._overflow, cb)
else cb()
_consumeHeader () {
if (this._locked) return false
this._offset = this._buffer.shifted
try {
this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
} catch (err) {
this._continueWrite(err)
return false
}
if (!this._header) return true
switch (this._header.type) {
case 'gnu-long-path':
case 'gnu-long-link-path':
case 'pax-global-header':
case 'pax-header':
this._longHeader = true
this._missing = this._header.size
return true
}
this._locked = true
this._applyLongHeaders()
if (this._header.size === 0 || this._header.type === 'directory') {
const stream = this._createStream()
stream.push(null)
this.emit('entry', this._header, stream, this._unlockBound)
return true
}
this._stream = this._createStream()
this._missing = this._header.size
this.emit('entry', this._header, this._stream, this._unlockBound)
return true
}
_applyLongHeaders () {
if (this._gnuLongPath) {
this._header.name = this._gnuLongPath
this._gnuLongPath = null
}
if (this._gnuLongLinkPath) {
this._header.linkname = this._gnuLongLinkPath
this._gnuLongLinkPath = null
}
if (this._pax) {
if (this._pax.path) this._header.name = this._pax.path
if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
this._header.pax = this._pax
this._pax = null
}
}
_decodeLongHeader (buf) {
switch (this._header.type) {
case 'gnu-long-path':
this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
break
case 'gnu-long-link-path':
this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
break
case 'pax-global-header':
this._paxGlobal = headers.decodePax(buf)
break
case 'pax-header':
this._pax = this._paxGlobal === null
? headers.decodePax(buf)
: Object.assign({}, this._paxGlobal, headers.decodePax(buf))
break
}
}
_consumeLongHeader () {
this._longHeader = false
this._missing = overflow(this._header.size)
const buf = this._buffer.shift(this._header.size)
try {
this._decodeLongHeader(buf)
} catch (err) {
this._continueWrite(err)
return false
}
return true
}
_consumeStream () {
const buf = this._buffer.shiftFirst(this._missing)
if (buf === null) return false
this._missing -= buf.byteLength
const drained = this._stream.push(buf)
if (this._missing === 0) {
this._stream.push(null)
if (drained) this._stream._detach()
return drained && this._locked === false
}
return drained
}
_createStream () {
return new Source(this, this._header, this._offset)
}
_update () {
while (this._buffer.buffered > 0 && !this.destroying) {
if (this._missing > 0) {
if (this._stream !== null) {
if (this._consumeStream() === false) return
continue
}
if (this._longHeader === true) {
if (this._missing > this._buffer.buffered) break
if (this._consumeLongHeader() === false) return false
continue
}
const ignore = this._buffer.shiftFirst(this._missing)
if (ignore !== null) this._missing -= ignore.byteLength
continue
}
if (this._buffer.buffered < 512) break
if (this._stream !== null || this._consumeHeader() === false) return
}
this._continueWrite(null)
}
_continueWrite (err) {
const cb = this._callback
this._callback = noop
cb(err)
}
_write (data, cb) {
const s = this._stream
const b = this._buffer
const missing = this._missing
if (data.byteLength) this._partial = true
// we do not reach end-of-chunk now. just forward it
if (data.byteLength < missing) {
this._missing -= data.byteLength
this._overflow = null
if (s) {
if (s.write(data, cb)) cb()
else s.once('drain', cb)
return
}
b.append(data)
return cb()
}
// end-of-chunk. the parser should call cb.
this._cb = cb
this._missing = 0
let overflow = null
if (data.byteLength > missing) {
overflow = data.subarray(missing)
data = data.subarray(0, missing)
}
if (s) s.end(data)
else b.append(data)
this._overflow = overflow
this._onparse()
this._callback = cb
this._buffer.push(data)
this._update()
}
_final (cb) {
cb(this._partial ? new Error('Unexpected end of data') : null)
this._finished = this._missing === 0 && this._buffer.buffered === 0
cb(this._finished ? null : new Error('Unexpected end of data'))
}
_predestroy () {
this._continueWrite(null)
}
_destroy (cb) {
if (this._stream) this._stream.destroy(getStreamError(this))
cb(null)
}
[Symbol.asyncIterator] () {
let error = null
let promiseResolve = null
let promiseReject = null
let entryStream = null
let entryCallback = null
const extract = this
this.on('entry', onentry)
this.on('error', (err) => { error = err })
this.on('close', onclose)
return {
[Symbol.asyncIterator] () {
return this
},
next () {
return new Promise(onnext)
},
return () {
return destroy(null)
},
throw (err) {
return destroy(err)
}
}
function consumeCallback (err) {
if (!entryCallback) return
const cb = entryCallback
entryCallback = null
cb(err)
}
function onnext (resolve, reject) {
if (error) {
return reject(error)
}
if (entryStream) {
resolve({ value: entryStream, done: false })
entryStream = null
return
}
promiseResolve = resolve
promiseReject = reject
consumeCallback(null)
if (extract._finished && promiseResolve) {
promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}
}
function onentry (header, stream, callback) {
entryCallback = callback
stream.on('error', noop) // no way around this due to tick sillyness
if (promiseResolve) {
promiseResolve({ value: stream, done: false })
promiseResolve = promiseReject = null
} else {
entryStream = stream
}
}
function onclose () {
consumeCallback(error)
if (!promiseResolve) return
if (error) promiseReject(error)
else promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}
function destroy (err) {
extract.destroy(err)
consumeCallback(err)
return new Promise((resolve, reject) => {
if (extract.destroyed) return resolve({ value: undefined, done: true })
extract.once('close', function () {
if (err) reject(err)
else resolve({ value: undefined, done: true })
})
})
}
}
}
module.exports = function extract (opts) {
return new Extract(opts)
}
function noop () {}
function overflow (size) {
size &= 511
return size && 512 - size
}

View file

@ -3,15 +3,161 @@ const b4a = require('b4a')
const ZEROS = '0000000000000000000'
const SEVENS = '7777777777777777777'
const ZERO_OFFSET = '0'.charCodeAt(0)
const USTAR_MAGIC = b4a.from('ustar\x00', 'binary')
const USTAR_VER = b4a.from('00', 'binary')
const GNU_MAGIC = b4a.from('ustar\x20', 'binary')
const GNU_VER = b4a.from('\x20\x00', 'binary')
const USTAR_MAGIC = b4a.from([0x75, 0x73, 0x74, 0x61, 0x72, 0x00]) // ustar\x00
const USTAR_VER = b4a.from([ZERO_OFFSET, ZERO_OFFSET])
const GNU_MAGIC = b4a.from([0x75, 0x73, 0x74, 0x61, 0x72, 0x20]) // ustar\x20
const GNU_VER = b4a.from([0x20, 0x00])
const MASK = 0o7777
const MAGIC_OFFSET = 257
const VERSION_OFFSET = 263
const clamp = function (index, len, defaultValue) {
exports.decodeLongPath = function decodeLongPath (buf, encoding) {
return decodeStr(buf, 0, buf.length, encoding)
}
exports.encodePax = function encodePax (opts) { // TODO: encode more stuff in pax
let result = ''
if (opts.name) result += addLength(' path=' + opts.name + '\n')
if (opts.linkname) result += addLength(' linkpath=' + opts.linkname + '\n')
const pax = opts.pax
if (pax) {
for (const key in pax) {
result += addLength(' ' + key + '=' + pax[key] + '\n')
}
}
return b4a.from(result)
}
exports.decodePax = function decodePax (buf) {
const result = {}
while (buf.length) {
let i = 0
while (i < buf.length && buf[i] !== 32) i++
const len = parseInt(buf.subarray(0, i).toString(), 10)
if (!len) return result
const b = b4a.toString(buf.subarray(i + 1, len - 1))
const keyIndex = b.indexOf('=')
if (keyIndex === -1) return result
result[b.slice(0, keyIndex)] = b.slice(keyIndex + 1)
buf = buf.subarray(len)
}
return result
}
exports.encode = function encode (opts) {
const buf = b4a.alloc(512)
let name = opts.name
let prefix = ''
if (opts.typeflag === 5 && name[name.length - 1] !== '/') name += '/'
if (b4a.byteLength(name) !== name.length) return null // utf-8
while (b4a.byteLength(name) > 100) {
const i = name.indexOf('/')
if (i === -1) return null
prefix += prefix ? '/' + name.slice(0, i) : name.slice(0, i)
name = name.slice(i + 1)
}
if (b4a.byteLength(name) > 100 || b4a.byteLength(prefix) > 155) return null
if (opts.linkname && b4a.byteLength(opts.linkname) > 100) return null
b4a.write(buf, name)
b4a.write(buf, encodeOct(opts.mode & MASK, 6), 100)
b4a.write(buf, encodeOct(opts.uid, 6), 108)
b4a.write(buf, encodeOct(opts.gid, 6), 116)
encodeSize(opts.size, buf, 124)
b4a.write(buf, encodeOct((opts.mtime.getTime() / 1000) | 0, 11), 136)
buf[156] = ZERO_OFFSET + toTypeflag(opts.type)
if (opts.linkname) b4a.write(buf, opts.linkname, 157)
b4a.copy(USTAR_MAGIC, buf, MAGIC_OFFSET)
b4a.copy(USTAR_VER, buf, VERSION_OFFSET)
if (opts.uname) b4a.write(buf, opts.uname, 265)
if (opts.gname) b4a.write(buf, opts.gname, 297)
b4a.write(buf, encodeOct(opts.devmajor || 0, 6), 329)
b4a.write(buf, encodeOct(opts.devminor || 0, 6), 337)
if (prefix) b4a.write(buf, prefix, 345)
b4a.write(buf, encodeOct(cksum(buf), 6), 148)
return buf
}
exports.decode = function decode (buf, filenameEncoding, allowUnknownFormat) {
let typeflag = buf[156] === 0 ? 0 : buf[156] - ZERO_OFFSET
let name = decodeStr(buf, 0, 100, filenameEncoding)
const mode = decodeOct(buf, 100, 8)
const uid = decodeOct(buf, 108, 8)
const gid = decodeOct(buf, 116, 8)
const size = decodeOct(buf, 124, 12)
const mtime = decodeOct(buf, 136, 12)
const type = toType(typeflag)
const linkname = buf[157] === 0 ? null : decodeStr(buf, 157, 100, filenameEncoding)
const uname = decodeStr(buf, 265, 32)
const gname = decodeStr(buf, 297, 32)
const devmajor = decodeOct(buf, 329, 8)
const devminor = decodeOct(buf, 337, 8)
const c = cksum(buf)
// checksum is still initial value if header was null.
if (c === 8 * 32) return null
// valid checksum
if (c !== decodeOct(buf, 148, 8)) throw new Error('Invalid tar header. Maybe the tar is corrupted or it needs to be gunzipped?')
if (isUSTAR(buf)) {
// ustar (posix) format.
// prepend prefix, if present.
if (buf[345]) name = decodeStr(buf, 345, 155, filenameEncoding) + '/' + name
} else if (isGNU(buf)) {
// 'gnu'/'oldgnu' format. Similar to ustar, but has support for incremental and
// multi-volume tarballs.
} else {
if (!allowUnknownFormat) {
throw new Error('Invalid tar header: unknown format.')
}
}
// to support old tar versions that use trailing / to indicate dirs
if (typeflag === 0 && name && name[name.length - 1] === '/') typeflag = 5
return {
name,
mode,
uid,
gid,
size,
mtime: new Date(1000 * mtime),
type,
linkname,
uname,
gname,
devmajor,
devminor,
pax: null
}
}
function isUSTAR (buf) {
return b4a.equals(USTAR_MAGIC, buf.subarray(MAGIC_OFFSET, MAGIC_OFFSET + 6))
}
function isGNU (buf) {
return b4a.equals(GNU_MAGIC, buf.subarray(MAGIC_OFFSET, MAGIC_OFFSET + 6)) &&
b4a.equals(GNU_VER, buf.subarray(VERSION_OFFSET, VERSION_OFFSET + 2))
}
function clamp (index, len, defaultValue) {
if (typeof index !== 'number') return defaultValue
index = ~~index // Coerce to integer.
if (index >= len) return len
@ -21,7 +167,7 @@ const clamp = function (index, len, defaultValue) {
return 0
}
const toType = function (flag) {
function toType (flag) {
switch (flag) {
case 0:
return 'file'
@ -53,7 +199,7 @@ const toType = function (flag) {
return null
}
const toTypeflag = function (flag) {
function toTypeflag (flag) {
switch (flag) {
case 'file':
return 0
@ -78,24 +224,40 @@ const toTypeflag = function (flag) {
return 0
}
const indexOf = function (block, num, offset, end) {
function indexOf (block, num, offset, end) {
for (; offset < end; offset++) {
if (block[offset] === num) return offset
}
return end
}
const cksum = function (block) {
function cksum (block) {
let sum = 8 * 32
for (let i = 0; i < 148; i++) sum += block[i]
for (let j = 156; j < 512; j++) sum += block[j]
return sum
}
const encodeOct = function (val, n) {
function encodeOct (val, n) {
val = val.toString(8)
if (val.length > n) return SEVENS.slice(0, n) + ' '
else return ZEROS.slice(0, n - val.length) + val + ' '
return ZEROS.slice(0, n - val.length) + val + ' '
}
function encodeSizeBin (num, buf, off) {
buf[off] = 0x80
for (let i = 11; i > 0; i--) {
buf[off + i] = num & 0xff
num = Math.floor(num / 0x100)
}
}
function encodeSize (num, buf, off) {
if (num.toString(8).length > 11) {
encodeSizeBin(num, buf, off)
} else {
b4a.write(buf, encodeOct(num, 11), off)
}
}
/* Copied from the node-tar repo and modified to meet
@ -129,8 +291,8 @@ function parse256 (buf) {
return positive ? sum : -1 * sum
}
const decodeOct = function (val, offset, length) {
val = val.slice(offset, offset + length)
function decodeOct (val, offset, length) {
val = val.subarray(offset, offset + length)
offset = 0
// If prefixed with 0x80 then parse as a base-256 integer
@ -142,155 +304,18 @@ const decodeOct = function (val, offset, length) {
const end = clamp(indexOf(val, 32, offset, val.length), val.length, val.length)
while (offset < end && val[offset] === 0) offset++
if (end === offset) return 0
return parseInt(val.slice(offset, end).toString(), 8)
return parseInt(val.subarray(offset, end).toString(), 8)
}
}
const decodeStr = function (val, offset, length, encoding) {
return val.slice(offset, indexOf(val, 0, offset, offset + length)).toString(encoding)
function decodeStr (val, offset, length, encoding) {
return b4a.toString(val.subarray(offset, indexOf(val, 0, offset, offset + length)), encoding)
}
const addLength = function (str) {
function addLength (str) {
const len = b4a.byteLength(str)
let digits = Math.floor(Math.log(len) / Math.log(10)) + 1
if (len + digits >= Math.pow(10, digits)) digits++
return (len + digits) + str
}
exports.decodeLongPath = function (buf, encoding) {
return decodeStr(buf, 0, buf.length, encoding)
}
exports.encodePax = function (opts) { // TODO: encode more stuff in pax
let result = ''
if (opts.name) result += addLength(' path=' + opts.name + '\n')
if (opts.linkname) result += addLength(' linkpath=' + opts.linkname + '\n')
const pax = opts.pax
if (pax) {
for (const key in pax) {
result += addLength(' ' + key + '=' + pax[key] + '\n')
}
}
return b4a.from(result)
}
exports.decodePax = function (buf) {
const result = {}
while (buf.length) {
let i = 0
while (i < buf.length && buf[i] !== 32) i++
const len = parseInt(buf.slice(0, i).toString(), 10)
if (!len) return result
const b = buf.slice(i + 1, len - 1).toString()
const keyIndex = b.indexOf('=')
if (keyIndex === -1) return result
result[b.slice(0, keyIndex)] = b.slice(keyIndex + 1)
buf = buf.slice(len)
}
return result
}
exports.encode = function (opts) {
const buf = b4a.alloc(512)
let name = opts.name
let prefix = ''
if (opts.typeflag === 5 && name[name.length - 1] !== '/') name += '/'
if (b4a.byteLength(name) !== name.length) return null // utf-8
while (b4a.byteLength(name) > 100) {
const i = name.indexOf('/')
if (i === -1) return null
prefix += prefix ? '/' + name.slice(0, i) : name.slice(0, i)
name = name.slice(i + 1)
}
if (b4a.byteLength(name) > 100 || b4a.byteLength(prefix) > 155) return null
if (opts.linkname && b4a.byteLength(opts.linkname) > 100) return null
b4a.write(buf, name)
b4a.write(buf, encodeOct(opts.mode & MASK, 6), 100)
b4a.write(buf, encodeOct(opts.uid, 6), 108)
b4a.write(buf, encodeOct(opts.gid, 6), 116)
b4a.write(buf, encodeOct(opts.size, 11), 124)
b4a.write(buf, encodeOct((opts.mtime.getTime() / 1000) | 0, 11), 136)
buf[156] = ZERO_OFFSET + toTypeflag(opts.type)
if (opts.linkname) b4a.write(buf, opts.linkname, 157)
b4a.copy(USTAR_MAGIC, buf, MAGIC_OFFSET)
b4a.copy(USTAR_VER, buf, VERSION_OFFSET)
if (opts.uname) b4a.write(buf, opts.uname, 265)
if (opts.gname) b4a.write(buf, opts.gname, 297)
b4a.write(buf, encodeOct(opts.devmajor || 0, 6), 329)
b4a.write(buf, encodeOct(opts.devminor || 0, 6), 337)
if (prefix) b4a.write(buf, prefix, 345)
b4a.write(buf, encodeOct(cksum(buf), 6), 148)
return buf
}
exports.decode = function (buf, filenameEncoding, allowUnknownFormat) {
let typeflag = buf[156] === 0 ? 0 : buf[156] - ZERO_OFFSET
let name = decodeStr(buf, 0, 100, filenameEncoding)
const mode = decodeOct(buf, 100, 8)
const uid = decodeOct(buf, 108, 8)
const gid = decodeOct(buf, 116, 8)
const size = decodeOct(buf, 124, 12)
const mtime = decodeOct(buf, 136, 12)
const type = toType(typeflag)
const linkname = buf[157] === 0 ? null : decodeStr(buf, 157, 100, filenameEncoding)
const uname = decodeStr(buf, 265, 32)
const gname = decodeStr(buf, 297, 32)
const devmajor = decodeOct(buf, 329, 8)
const devminor = decodeOct(buf, 337, 8)
const c = cksum(buf)
// checksum is still initial value if header was null.
if (c === 8 * 32) return null
// valid checksum
if (c !== decodeOct(buf, 148, 8)) throw new Error('Invalid tar header. Maybe the tar is corrupted or it needs to be gunzipped?')
if (USTAR_MAGIC.compare(buf, MAGIC_OFFSET, MAGIC_OFFSET + 6) === 0) {
// ustar (posix) format.
// prepend prefix, if present.
if (buf[345]) name = decodeStr(buf, 345, 155, filenameEncoding) + '/' + name
} else if (GNU_MAGIC.compare(buf, MAGIC_OFFSET, MAGIC_OFFSET + 6) === 0 &&
GNU_VER.compare(buf, VERSION_OFFSET, VERSION_OFFSET + 2) === 0) {
// 'gnu'/'oldgnu' format. Similar to ustar, but has support for incremental and
// multi-volume tarballs.
} else {
if (!allowUnknownFormat) {
throw new Error('Invalid tar header: unknown format.')
}
}
// to support old tar versions that use trailing / to indicate dirs
if (typeflag === 0 && name && name[name.length - 1] === '/') typeflag = 5
return {
name,
mode,
uid,
gid,
size,
mtime: new Date(1000 * mtime),
type,
linkname,
uname,
gname,
devmajor,
devminor
}
}

244
pack.js
View file

@ -1,8 +1,7 @@
const { constants } = require('fs')
const { Readable, Writable } = require('streamx')
const { StringDecoder } = require('string_decoder')
const { Readable, Writable, getStreamError } = require('streamx')
const b4a = require('b4a')
const constants = require('./constants')
const headers = require('./headers')
const DMODE = 0o755
@ -10,70 +9,112 @@ const FMODE = 0o644
const END_OF_TAR = b4a.alloc(1024)
const noop = function () {}
const overflow = function (self, size) {
size &= 511
if (size) self.push(END_OF_TAR.subarray(0, 512 - size))
}
function modeToType (mode) {
switch (mode & constants.S_IFMT) {
case constants.S_IFBLK: return 'block-device'
case constants.S_IFCHR: return 'character-device'
case constants.S_IFDIR: return 'directory'
case constants.S_IFIFO: return 'fifo'
case constants.S_IFLNK: return 'symlink'
}
return 'file'
}
class Sink extends Writable {
constructor (to) {
super()
constructor (pack, header, callback) {
super({ mapWritable })
this.written = 0
this._to = to
this.header = header
this._callback = callback
this._linkname = null
this._isLinkname = header.type === 'symlink' && !header.linkname
this._isVoid = header.type !== 'file' && header.type !== 'contiguous-file'
this._finished = false
this._pack = pack
this._openCallback = null
if (this._pack._stream === null) this._pack._stream = this
else this._pack._pending.push(this)
}
_open (cb) {
this._openCallback = cb
if (this._pack._stream === this) this._continueOpen()
}
_continueOpen () {
if (this._pack._stream === null) this._pack._stream = this
const cb = this._openCallback
this._openCallback = null
if (cb === null) return
if (this._pack.destroying) return cb(new Error('pack stream destroyed'))
if (this._pack._finalized) return cb(new Error('pack stream is already finalized'))
this._pack._stream = this
if (!this._isLinkname) {
this._pack._encode(this.header)
}
cb(null)
}
_write (data, cb) {
if (this._isLinkname) {
this._linkname = this._linkname ? b4a.concat([this._linkname, data]) : data
return cb(null)
}
if (this._isVoid) {
return cb(new Error('No body allowed for this entry'))
}
this.written += data.byteLength
if (this._to.push(data)) return cb()
this._to._drain = cb
}
}
class LinkSink extends Writable {
constructor () {
super()
this.linkname = ''
this._decoder = new StringDecoder('utf-8')
if (this._pack.push(data)) return cb()
this._pack._drain = cb
}
_write (data, cb) {
this.linkname += this._decoder.write(data)
_final (cb) {
if (this._isLinkname) {
this.header.linkname = this._linkname ? b4a.toString(this._linkname, 'utf-8') : ''
this._pack._encode(this.header)
}
overflow(this._pack, this.header.size)
if (this.written !== this.header.size) { // corrupting tar
return cb(new Error('Size mismatch'))
}
this._pack._done(this)
this._finished = true
cb(null)
}
_getError () {
return getStreamError(this) || new Error('tar entry destroyed')
}
_predestroy () {
this._pack.destroy(this._getError())
}
_destroy (cb) {
this._pack._done(this)
if (this._finished) this._callback(null)
else this._callback(this._getError())
cb()
}
}
class Void extends Writable {
_write (data, cb) {
cb(new Error('No body allowed for this entry'))
}
}
class Pack extends Readable {
constructor (opts) {
super(opts)
this._drain = noop
this._finalized = false
this._finalizing = false
this._pending = []
this._stream = null
}
entry (header, buffer, callback) {
if (this._stream) throw new Error('already piping an entry')
if (this._finalized || this.destroyed) return
if (this._finalized || this.destroying) throw new Error('already finalized or destroyed')
if (typeof buffer === 'function') {
callback = buffer
@ -82,8 +123,6 @@ class Pack extends Readable {
if (!callback) callback = noop
const self = this
if (!header.size || header.type === 'symlink') header.size = 0
if (!header.type) header.type = modeToType(header.mode)
if (!header.mode) header.mode = header.type === 'directory' ? DMODE : FMODE
@ -92,74 +131,46 @@ class Pack extends Readable {
if (!header.mtime) header.mtime = new Date()
if (typeof buffer === 'string') buffer = b4a.from(buffer)
const sink = new Sink(this, header, callback)
if (b4a.isBuffer(buffer)) {
header.size = buffer.byteLength
this._encode(header)
const ok = this.push(buffer)
overflow(self, header.size)
if (ok) process.nextTick(callback)
else this._drain = callback
return new Void()
sink.write(buffer)
sink.end()
return sink
}
if (header.type === 'symlink' && !header.linkname) {
const linkSink = new LinkSink()
linkSink
.on('error', function (err) {
self.destroy()
callback(err)
})
.on('close', function () {
header.linkname = linkSink.linkname
self._encode(header)
callback()
})
return linkSink
if (sink._isVoid) {
sink.end()
return sink
}
this._encode(header)
if (header.type !== 'file' && header.type !== 'contiguous-file') {
process.nextTick(callback)
return new Void()
}
const sink = new Sink(this)
sink
.on('error', function (err) {
self._stream = null
self.destroy()
callback(err)
})
.on('close', function () {
self._stream = null
if (sink.written !== header.size) { // corrupting tar
}
overflow(self, header.size)
if (self._finalizing) { self.finalize() }
callback()
})
this._stream = sink
return sink
}
finalize () {
if (this._stream) {
if (this._stream || this._pending.length > 0) {
this._finalizing = true
return
}
if (this._finalized) return
this._finalized = true
this.push(END_OF_TAR)
this.push(null)
}
_done (stream) {
if (stream !== this._stream) return
this._stream = null
if (this._finalizing) this.finalize()
if (this._pending.length) this._pending.shift()._continueOpen()
}
_encode (header) {
if (!header.pax) {
const buf = headers.encode(header)
@ -202,10 +213,28 @@ class Pack extends Readable {
this.push(headers.encode(newHeader))
}
_read (cb) {
_doDrain () {
const drain = this._drain
this._drain = noop
drain()
}
_predestroy () {
const err = getStreamError(this)
if (this._stream) this._stream.destroy(err)
while (this._pending.length) {
const stream = this._pending.shift()
stream.destroy(err)
stream._continueOpen()
}
this._doDrain()
}
_read (cb) {
this._doDrain()
cb()
}
}
@ -213,3 +242,26 @@ class Pack extends Readable {
module.exports = function pack (opts) {
return new Pack(opts)
}
function modeToType (mode) {
switch (mode & constants.S_IFMT) {
case constants.S_IFBLK: return 'block-device'
case constants.S_IFCHR: return 'character-device'
case constants.S_IFDIR: return 'directory'
case constants.S_IFIFO: return 'fifo'
case constants.S_IFLNK: return 'symlink'
}
return 'file'
}
function noop () {}
function overflow (self, size) {
size &= 511
if (size) self.push(END_OF_TAR.subarray(0, 512 - size))
}
function mapWritable (buf) {
return b4a.isBuffer(buf) ? buf : b4a.from(buf)
}

View file

@ -6,6 +6,9 @@
"files": [
"*.js"
],
"browser": {
"fs": false
},
"scripts": {
"test": "standard && brittle test/*.js"
},
@ -20,13 +23,12 @@
},
"homepage": "https://github.com/mafintosh/tar-stream",
"dependencies": {
"b4a": "^1.6.1",
"bl": "^6.0.0",
"streamx": "^2.12.5"
"b4a": "^1.6.4",
"streamx": "^2.15.0"
},
"devDependencies": {
"brittle": "^3.1.1",
"brittle": "^3.3.2",
"concat-stream": "^2.0.0",
"standard": "^17.0.0"
"standard": "^17.0.1"
}
}

48
test/dual.js Normal file
View file

@ -0,0 +1,48 @@
const test = require('brittle')
const { Readable } = require('streamx')
const tar = require('../')
test('write and read huge archive', function (t) {
t.plan(2)
const pack = tar.pack()
const extract = tar.extract()
extract.on('entry', function (header, stream, next) {
let size = 0
stream.on('data', function (data) {
size += data.byteLength
})
stream.on('end', function () {
t.is(size, header.size)
next()
})
})
pack.pipe(extract, function (err) {
t.ok(!err, 'pipeline finished')
})
const entry = pack.entry({
name: 'huge.txt',
size: 10 * 1024 * 1024 * 1024
})
const buf = Buffer.alloc(1024 * 1024)
let pushed = 0
const rs = new Readable({
read (cb) {
this.push(buf)
pushed += buf.byteLength
if (pushed === entry.header.size) this.push(null)
cb(null)
}
})
rs.pipe(entry)
pack.finalize()
})

View file

@ -4,16 +4,6 @@ const fs = require('fs')
const tar = require('..')
const fixtures = require('./fixtures')
const clamp = function (index, len, defaultValue) {
if (typeof index !== 'number') return defaultValue
index = ~~index // Coerce to integer.
if (index >= len) return len
if (index >= 0) return index
index += len
if (index >= 0) return index
return 0
}
test('one-file', function (t) {
t.plan(3)
@ -33,7 +23,8 @@ test('one-file', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -69,7 +60,8 @@ test('chunked-one-file', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -97,52 +89,6 @@ test('multi-file', function (t) {
const extract = tar.extract()
let noEntries = false
const onfile1 = function (header, stream, cb) {
t.alike(header, {
name: 'file-1.txt',
mode: 0o644,
uid: 501,
gid: 20,
size: 12,
mtime: new Date(1387580181000),
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
})
extract.on('entry', onfile2)
stream.pipe(concat(function (data) {
t.is(data.toString(), 'i am file-1\n')
cb()
}))
}
const onfile2 = function (header, stream, cb) {
t.alike(header, {
name: 'file-2.txt',
mode: 0o644,
uid: 501,
gid: 20,
size: 12,
mtime: new Date(1387580181000),
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
})
stream.pipe(concat(function (data) {
noEntries = true
t.is(data.toString(), 'i am file-2\n')
cb()
}))
}
extract.once('entry', onfile1)
extract.on('finish', function () {
@ -150,15 +96,8 @@ test('multi-file', function (t) {
})
extract.end(fs.readFileSync(fixtures.MULTI_FILE_TAR))
})
test('chunked-multi-file', function (t) {
t.plan(5)
const extract = tar.extract()
let noEntries = false
const onfile1 = function (header, stream, cb) {
function onfile1 (header, stream, cb) {
t.alike(header, {
name: 'file-1.txt',
mode: 0o644,
@ -171,7 +110,8 @@ test('chunked-multi-file', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
extract.on('entry', onfile2)
@ -181,7 +121,7 @@ test('chunked-multi-file', function (t) {
}))
}
const onfile2 = function (header, stream, cb) {
function onfile2 (header, stream, cb) {
t.alike(header, {
name: 'file-2.txt',
mode: 0o644,
@ -194,7 +134,8 @@ test('chunked-multi-file', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -203,6 +144,13 @@ test('chunked-multi-file', function (t) {
cb()
}))
}
})
test('chunked-multi-file', function (t) {
t.plan(5)
const extract = tar.extract()
let noEntries = false
extract.once('entry', onfile1)
@ -215,6 +163,54 @@ test('chunked-multi-file', function (t) {
extract.write(b.subarray(i, clamp(i + 321, b.length, b.length)))
}
extract.end()
function onfile1 (header, stream, cb) {
t.alike(header, {
name: 'file-1.txt',
mode: 0o644,
uid: 501,
gid: 20,
size: 12,
mtime: new Date(1387580181000),
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0,
pax: null
})
extract.on('entry', onfile2)
stream.pipe(concat(function (data) {
t.is(data.toString(), 'i am file-1\n')
cb()
}))
}
function onfile2 (header, stream, cb) {
t.alike(header, {
name: 'file-2.txt',
mode: 0o644,
uid: 501,
gid: 20,
size: 12,
mtime: new Date(1387580181000),
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
noEntries = true
t.is(data.toString(), 'i am file-2\n')
cb()
}))
}
})
test('pax', function (t) {
@ -260,7 +256,15 @@ test('types', function (t) {
const extract = tar.extract()
let noEntries = false
const ondir = function (header, stream, cb) {
extract.once('entry', ondir)
extract.on('finish', function () {
t.ok(noEntries)
})
extract.end(fs.readFileSync(fixtures.TYPES_TAR))
function ondir (header, stream, cb) {
t.alike(header, {
name: 'directory',
mode: 0o755,
@ -273,7 +277,8 @@ test('types', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.on('data', function () {
t.ok(false)
@ -282,7 +287,7 @@ test('types', function (t) {
cb()
}
const onlink = function (header, stream, cb) {
function onlink (header, stream, cb) {
t.alike(header, {
name: 'directory-link',
mode: 0o755,
@ -295,7 +300,8 @@ test('types', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.on('data', function () {
t.ok(false)
@ -303,14 +309,6 @@ test('types', function (t) {
noEntries = true
cb()
}
extract.once('entry', ondir)
extract.on('finish', function () {
t.ok(noEntries)
})
extract.end(fs.readFileSync(fixtures.TYPES_TAR))
})
test('long-name', function (t) {
@ -332,7 +330,8 @@ test('long-name', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -363,13 +362,13 @@ test('unicode-bsd', function (t) { // can unpack a bsdtar unicoded tarball
gid: 20,
size: 4,
mtime: new Date(1387588646000),
pax: { 'SCHILY.dev': '16777217', 'SCHILY.ino': '3599143', 'SCHILY.nlink': '1', atime: '1387589077', ctime: '1387588646', path: 'høllø.txt' },
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: { 'SCHILY.dev': '16777217', 'SCHILY.ino': '3599143', 'SCHILY.nlink': '1', atime: '1387589077', ctime: '1387588646', path: 'høllø.txt' }
})
stream.pipe(concat(function (data) {
@ -400,13 +399,13 @@ test('unicode', function (t) { // can unpack a bsdtar unicoded tarball
gid: 20,
size: 8,
mtime: new Date(1387580181000),
pax: { path: 'høstål.txt' },
type: 'file',
linkname: null,
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: { path: 'høstål.txt' }
})
stream.pipe(concat(function (data) {
@ -522,7 +521,8 @@ test('base 256 size', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
cb()
})
@ -554,7 +554,8 @@ test('latin-1', function (t) { // can unpack filenames encoded in latin-1
uname: 'root',
gname: 'root',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -610,7 +611,8 @@ test('gnu', function (t) { // can correctly unpack gnu-tar format
uname: 'myuser',
gname: 'mygroup',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -650,7 +652,8 @@ test('gnu-incremental', function (t) {
uname: 'myuser',
gname: 'mygroup',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -699,7 +702,15 @@ test('unknown format attempts to extract if allowed', function (t) {
const extract = tar.extract({ allowUnknownFormat: true })
let noEntries = false
const onfile1 = function (header, stream, cb) {
extract.once('entry', onfile1)
extract.on('finish', function () {
t.ok(noEntries)
})
extract.end(fs.readFileSync(fixtures.UNKNOWN_FORMAT))
function onfile1 (header, stream, cb) {
t.alike(header, {
name: 'file-1.txt',
mode: 0o644,
@ -712,7 +723,8 @@ test('unknown format attempts to extract if allowed', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
extract.on('entry', onfile2)
@ -722,7 +734,7 @@ test('unknown format attempts to extract if allowed', function (t) {
}))
}
const onfile2 = function (header, stream, cb) {
function onfile2 (header, stream, cb) {
t.alike(header, {
name: 'file-2.txt',
mode: 0o644,
@ -735,7 +747,8 @@ test('unknown format attempts to extract if allowed', function (t) {
uname: 'maf',
gname: 'staff',
devmajor: 0,
devminor: 0
devminor: 0,
pax: null
})
stream.pipe(concat(function (data) {
@ -744,12 +757,30 @@ test('unknown format attempts to extract if allowed', function (t) {
cb()
}))
}
extract.once('entry', onfile1)
extract.on('finish', function () {
t.ok(noEntries)
})
extract.end(fs.readFileSync(fixtures.UNKNOWN_FORMAT))
})
test('extract streams are async iterators', async function (t) {
const extract = tar.extract()
const b = fs.readFileSync(fixtures.MULTI_FILE_TAR)
extract.end(b)
const expected = ['file-1.txt', 'file-2.txt']
for await (const entry of extract) {
t.is(entry.header.name, expected.shift())
entry.resume()
t.comment('wait a bit...')
await new Promise(resolve => setTimeout(resolve, 100))
}
})
function clamp (index, len, defaultValue) {
if (typeof index !== 'number') return defaultValue
index = ~~index // Coerce to integer.
if (index >= len) return len
if (index >= 0) return index
index += len
if (index >= 0) return index
return 0
}

View file

@ -6,7 +6,7 @@ const tar = require('../..')
const fixtures = require('../fixtures')
test('huge', function (t) {
t.plan(1)
t.plan(3)
const extract = tar.extract()
let noEntries = false
@ -48,6 +48,7 @@ test('huge', function (t) {
noEntries = true
stream.pipe(countStream)
callback()
})
extract.on('finish', function () {