transform

This commit is contained in:
Cat /dev/Nulo 2023-12-23 00:19:04 -03:00
parent a54db788df
commit 61727faa84

View file

@ -1,7 +1,6 @@
// import { run, bench, group, baseline } from "mitata";
import { createReadStream } from "node:fs";
import { Writable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { Readable } from "stream";
import { getCarrefourProduct } from "./carrefour.js";
import { WARCParser } from "warcio";
// import { ZSTDDecompress } from "simple-zstd";
@ -44,25 +43,36 @@ await bench("warc", async () => {
// ).stdout;
const warc = Bun.stdin.stream();
// const warc = Readable.toWeb(process.stdin);
// const warc = process.stdin;
let arrays: Buffer[] = [];
const myWritable = new Writable({
highWaterMark: 1024 * 1024 * 1024,
writev(chunks, callback) {},
let buffer: Uint8Array[] = [];
const transform = new TransformStream<Uint8Array, Buffer>({
transform(chunk, controller) {
buffer.push(chunk);
if (buffer.reduce((prev, curr) => prev + curr.length, 0) > 1024 * 1024) {
controller.enqueue(Buffer.concat(buffer));
buffer = [];
}
},
flush(controller) {
controller.enqueue(Buffer.concat(buffer));
},
});
for await (const chunk of warc) {
warc.pipeTo(transform.writable);
let arrays: Buffer[] = [];
let n = 0;
for await (const chunk of transform.readable) {
console.debug(n);
// console.debug(chunk.length);
const b = Buffer.from(chunk);
arrays.push(b);
if (
arrays.reduce((prev, curr) => prev + curr.length, 0) <
1024 * 1024 * 1024
)
continue;
// if (
// arrays.reduce((prev, curr) => prev + curr.length, 0) <
// 1024 * 1024 * 1024
// )
// continue;
let buf: Buffer;
while (
((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)),
@ -94,8 +104,8 @@ await bench("warc", async () => {
}
const length = parseInt(fields.get("Content-Length"));
const content = buf.subarray(0, length);
console.debug(fields.get("WARC-Date"), content.length);
// console.debug(fields.get("WARC-Date"), content.length);
n++;
arrays = [
buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length),
];