From 95eac9dcca0047393028faf1a44446bc6904b5be Mon Sep 17 00:00:00 2001 From: Nulo Date: Sat, 23 Dec 2023 00:19:23 -0300 Subject: [PATCH] Revert "transform" This reverts commit 61727faa8404dd61a52abf1d453b152499e55ea0. --- scraper/bench.ts | 44 +++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/scraper/bench.ts b/scraper/bench.ts index d0d532a..e78e324 100644 --- a/scraper/bench.ts +++ b/scraper/bench.ts @@ -1,6 +1,7 @@ // import { run, bench, group, baseline } from "mitata"; import { createReadStream } from "node:fs"; -import { Readable } from "stream"; +import { Writable } from "node:stream"; +import { pipeline } from "node:stream/promises"; import { getCarrefourProduct } from "./carrefour.js"; import { WARCParser } from "warcio"; // import { ZSTDDecompress } from "simple-zstd"; @@ -43,36 +44,25 @@ await bench("warc", async () => { // ).stdout; const warc = Bun.stdin.stream(); - // const warc = Readable.toWeb(process.stdin); - let buffer: Uint8Array[] = []; - const transform = new TransformStream({ - transform(chunk, controller) { - buffer.push(chunk); - if (buffer.reduce((prev, curr) => prev + curr.length, 0) > 1024 * 1024) { - controller.enqueue(Buffer.concat(buffer)); - buffer = []; - } - }, - flush(controller) { - controller.enqueue(Buffer.concat(buffer)); - }, - }); - - warc.pipeTo(transform.writable); + // const warc = process.stdin; let arrays: Buffer[] = []; - let n = 0; - for await (const chunk of transform.readable) { - console.debug(n); + + const myWritable = new Writable({ + highWaterMark: 1024 * 1024 * 1024, + writev(chunks, callback) {}, + }); + + for await (const chunk of warc) { // console.debug(chunk.length); const b = Buffer.from(chunk); arrays.push(b); - // if ( - // arrays.reduce((prev, curr) => prev + curr.length, 0) < - // 1024 * 1024 * 1024 - // ) - // continue; + if ( + arrays.reduce((prev, curr) => prev + curr.length, 0) < + 1024 * 1024 * 1024 + ) + continue; let buf: Buffer; while ( ((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)), @@ -104,8 +94,8 @@ await bench("warc", async () => { } const length = parseInt(fields.get("Content-Length")); const content = buf.subarray(0, length); - // console.debug(fields.get("WARC-Date"), content.length); - n++; + console.debug(fields.get("WARC-Date"), content.length); + arrays = [ buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length), ];