Revert "transform"

This reverts commit 61727faa84.
This commit is contained in:
Cat /dev/Nulo 2023-12-23 00:19:23 -03:00
parent 61727faa84
commit 95eac9dcca

View file

@ -1,6 +1,7 @@
// import { run, bench, group, baseline } from "mitata"; // import { run, bench, group, baseline } from "mitata";
import { createReadStream } from "node:fs"; import { createReadStream } from "node:fs";
import { Readable } from "stream"; import { Writable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { getCarrefourProduct } from "./carrefour.js"; import { getCarrefourProduct } from "./carrefour.js";
import { WARCParser } from "warcio"; import { WARCParser } from "warcio";
// import { ZSTDDecompress } from "simple-zstd"; // import { ZSTDDecompress } from "simple-zstd";
@ -43,36 +44,25 @@ await bench("warc", async () => {
// ).stdout; // ).stdout;
const warc = Bun.stdin.stream(); const warc = Bun.stdin.stream();
// const warc = Readable.toWeb(process.stdin);
let buffer: Uint8Array[] = []; // const warc = process.stdin;
const transform = new TransformStream<Uint8Array, Buffer>({
transform(chunk, controller) {
buffer.push(chunk);
if (buffer.reduce((prev, curr) => prev + curr.length, 0) > 1024 * 1024) {
controller.enqueue(Buffer.concat(buffer));
buffer = [];
}
},
flush(controller) {
controller.enqueue(Buffer.concat(buffer));
},
});
warc.pipeTo(transform.writable);
let arrays: Buffer[] = []; let arrays: Buffer[] = [];
let n = 0;
for await (const chunk of transform.readable) { const myWritable = new Writable({
console.debug(n); highWaterMark: 1024 * 1024 * 1024,
writev(chunks, callback) {},
});
for await (const chunk of warc) {
// console.debug(chunk.length); // console.debug(chunk.length);
const b = Buffer.from(chunk); const b = Buffer.from(chunk);
arrays.push(b); arrays.push(b);
// if ( if (
// arrays.reduce((prev, curr) => prev + curr.length, 0) < arrays.reduce((prev, curr) => prev + curr.length, 0) <
// 1024 * 1024 * 1024 1024 * 1024 * 1024
// ) )
// continue; continue;
let buf: Buffer; let buf: Buffer;
while ( while (
((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)), ((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)),
@ -104,8 +94,8 @@ await bench("warc", async () => {
} }
const length = parseInt(fields.get("Content-Length")); const length = parseInt(fields.get("Content-Length"));
const content = buf.subarray(0, length); const content = buf.subarray(0, length);
// console.debug(fields.get("WARC-Date"), content.length); console.debug(fields.get("WARC-Date"), content.length);
n++;
arrays = [ arrays = [
buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length), buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length),
]; ];