From 61727faa8404dd61a52abf1d453b152499e55ea0 Mon Sep 17 00:00:00 2001 From: Nulo Date: Sat, 23 Dec 2023 00:19:04 -0300 Subject: [PATCH] transform --- scraper/bench.ts | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/scraper/bench.ts b/scraper/bench.ts index e78e324..d0d532a 100644 --- a/scraper/bench.ts +++ b/scraper/bench.ts @@ -1,7 +1,6 @@ // import { run, bench, group, baseline } from "mitata"; import { createReadStream } from "node:fs"; -import { Writable } from "node:stream"; -import { pipeline } from "node:stream/promises"; +import { Readable } from "stream"; import { getCarrefourProduct } from "./carrefour.js"; import { WARCParser } from "warcio"; // import { ZSTDDecompress } from "simple-zstd"; @@ -44,25 +43,36 @@ await bench("warc", async () => { // ).stdout; const warc = Bun.stdin.stream(); + // const warc = Readable.toWeb(process.stdin); - // const warc = process.stdin; - - let arrays: Buffer[] = []; - - const myWritable = new Writable({ - highWaterMark: 1024 * 1024 * 1024, - writev(chunks, callback) {}, + let buffer: Uint8Array[] = []; + const transform = new TransformStream({ + transform(chunk, controller) { + buffer.push(chunk); + if (buffer.reduce((prev, curr) => prev + curr.length, 0) > 1024 * 1024) { + controller.enqueue(Buffer.concat(buffer)); + buffer = []; + } + }, + flush(controller) { + controller.enqueue(Buffer.concat(buffer)); + }, }); - for await (const chunk of warc) { + warc.pipeTo(transform.writable); + + let arrays: Buffer[] = []; + let n = 0; + for await (const chunk of transform.readable) { + console.debug(n); // console.debug(chunk.length); const b = Buffer.from(chunk); arrays.push(b); - if ( - arrays.reduce((prev, curr) => prev + curr.length, 0) < - 1024 * 1024 * 1024 - ) - continue; + // if ( + // arrays.reduce((prev, curr) => prev + curr.length, 0) < + // 1024 * 1024 * 1024 + // ) + // continue; let buf: Buffer; while ( ((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)), @@ -94,8 +104,8 @@ await bench("warc", async () => { } const length = parseInt(fields.get("Content-Length")); const content = buf.subarray(0, length); - console.debug(fields.get("WARC-Date"), content.length); - + // console.debug(fields.get("WARC-Date"), content.length); + n++; arrays = [ buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length), ];