From 068afc9bd8bfbed828a4788ec15e045d50b31c99 Mon Sep 17 00:00:00 2001 From: Nulo Date: Sat, 23 Dec 2023 15:42:25 -0300 Subject: [PATCH] scraper: custom warc parser --- scraper/bench.ts | 92 +------------------------ scraper/package.json | 2 + scraper/scrap.ts | 24 +++---- scraper/warc.ts | 157 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+), 104 deletions(-) create mode 100644 scraper/warc.ts diff --git a/scraper/bench.ts b/scraper/bench.ts index e78e324..9f55f5b 100644 --- a/scraper/bench.ts +++ b/scraper/bench.ts @@ -28,97 +28,7 @@ import { WARCParser } from "warcio"; // } // }); -const crlf = "\r\n"; -const crlfB = Buffer.from(crlf, "utf-8"); -const crlfcrlf = crlf + crlf; -const crlfcrlfB = Buffer.from(crlfcrlf, "utf-8"); -const warc10B = Buffer.from("WARC/1.0", "utf-8"); -const emptyBuffer = Buffer.from("", "utf-8"); - -await bench("warc", async () => { - // const warc = Bun.spawn( - // ["zstd", "-do", "/dev/stdout", "../data/carrefour.warc.zst"], - // { - // stderr: "ignore", - // } - // ).stdout; - - const warc = Bun.stdin.stream(); - - // const warc = process.stdin; - - let arrays: Buffer[] = []; - - const myWritable = new Writable({ - highWaterMark: 1024 * 1024 * 1024, - writev(chunks, callback) {}, - }); - - for await (const chunk of warc) { - // console.debug(chunk.length); - const b = Buffer.from(chunk); - arrays.push(b); - if ( - arrays.reduce((prev, curr) => prev + curr.length, 0) < - 1024 * 1024 * 1024 - ) - continue; - let buf: Buffer; - while ( - ((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)), - buf.subarray(warc10B.length).includes(warc10B)) - ) { - const until = buf.indexOf(crlfcrlfB); - const header = buf.subarray(0, until); - - const lines = splitBuffer(header, crlfB); - let i = 0; - const nextLine = () => { - const line = lines[i]; - i++; - return line ? line : emptyBuffer; - }; - let line: Buffer; - if (!(line = nextLine()).equals(warc10B)) { - throw new Error(`No WARC 1.0 header in '${line}'`); - } - - let field; - let fields = new Map(); - while ( - ((line = nextLine()), - (field = parseField(line.toString("utf8"))), - line.length !== 0) - ) { - fields.set(field[0], field[1]); - } - const length = parseInt(fields.get("Content-Length")); - const content = buf.subarray(0, length); - console.debug(fields.get("WARC-Date"), content.length); - - arrays = [ - buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length), - ]; - } - } -}); - -function splitBuffer(buffer: Buffer, val: Buffer): Buffer[] { - let bufs = []; - let rest = buffer; - let i; - while (((i = rest.indexOf(val)), i !== -1)) { - bufs.push(rest.subarray(0, i)); - rest = rest.subarray(i + val.length); - } - bufs.push(rest); - return bufs; -} - -function parseField(line: string): [string, string] { - const [key, val] = line.split(": "); - return [key, val]; -} +// await bench("warc", ); async function bench(name: string, func: () => Promise) { const t0 = performance.now(); diff --git a/scraper/package.json b/scraper/package.json index 3b6293a..c0f8ba9 100644 --- a/scraper/package.json +++ b/scraper/package.json @@ -13,8 +13,10 @@ "dependencies": { "drizzle-orm": "^0.29.1", "linkedom": "^0.16.5", + "mitata": "^0.1.6", "nanoid": "^5.0.4", "p-map": "^7.0.0", + "simple-zstd": "^1.4.2", "undici": "^6.2.0", "warcio": "^2.2.1", "zod": "^3.22.4" diff --git a/scraper/scrap.ts b/scraper/scrap.ts index d84c1ae..eb387d9 100644 --- a/scraper/scrap.ts +++ b/scraper/scrap.ts @@ -13,8 +13,9 @@ import { getDiaProduct } from "./dia.js"; import { getCotoProduct } from "./coto.js"; import { join } from "path"; import pMap from "p-map"; +import { parseWARC } from "./warc.js"; -const DEBUG = false; +const DEBUG = true; const sqlite = new Database("sqlite.db"); const db = drizzle(sqlite); @@ -51,17 +52,14 @@ async function storePrecioPoint(point: Precio) { async function parseWarc(path: string) { // const warc = createReadStream(path); - const warc = Bun.spawn(["zstd", "-do", "/dev/stdout", path], { - stderr: "ignore", - }).stdout; - - const parser = new WARCParser(warc); + const parser = parseWARC(path); for await (const record of parser) { - if (record.warcType === "response") { - if (!record.warcTargetURI) continue; - const html = await record.contentText(); + if (record.fields.get("WARC-Type") === "response") { + const rawUri = record.fields.get("WARC-Target-URI"); + if (!rawUri) continue; + const html = record.content.toString(); - const url = new URL(record.warcTargetURI); + const url = new URL(rawUri.replace(/^$/, "")); try { let ish: Precioish | undefined = undefined; if (url.hostname === "www.carrefour.com.ar") @@ -74,8 +72,8 @@ async function parseWarc(path: string) { const p: Precio = { ...ish, - fetchedAt: new Date(record.warcDate!), - url: record.warcTargetURI, + fetchedAt: new Date(record.fields.get("WARC-Date")!), + url: url.toString(), }; if (ish) await storePrecioPoint(p); @@ -88,7 +86,7 @@ async function parseWarc(path: string) { if (DEBUG) { const urlHash = createHash("md5") - .update(record.warcTargetURI!) + .update(url.toString()) .digest("hex"); const output = join("debug", `${urlHash}.html`); await writeFile(output, html); diff --git a/scraper/warc.ts b/scraper/warc.ts new file mode 100644 index 0000000..4e719c6 --- /dev/null +++ b/scraper/warc.ts @@ -0,0 +1,157 @@ +const crlf = "\r\n"; +const crlfB = Buffer.from(crlf, "utf-8"); +const crlfcrlf = crlf + crlf; +const crlfcrlfB = Buffer.from(crlfcrlf, "utf-8"); +const warc10B = Buffer.from("WARC/1.0", "utf-8"); +const emptyBuffer = Buffer.from("", "utf-8"); + +export async function* parseWARC(path: string) { + const warc = Bun.spawn(["zstd", "-do", "/dev/stdout", path], { + stderr: "ignore", + }).stdout; + + // const warc = Bun.stdin.stream(1024 * 1024 * 128); + + // let buffer: Uint8Array[] = []; + // const transform = new TransformStream({ + // transform(chunk, controller) { + // buffer.push(chunk); + // if ( + // buffer.reduce((prev, curr) => prev + curr.length, 0) > + // 1024 * 1024 * 64 + // ) { + // controller.enqueue(Buffer.concat(buffer)); + // buffer = []; + // } + // }, + // flush(controller) { + // controller.enqueue(Buffer.concat(buffer)); + // }, + // }); + + // warc.pipeTo(transform.writable); + + const reader = warc.getReader(); + // const reader = transform.readable.getReader(); + + // const warc = process.stdin; + + let arrays: Buffer[] = []; + let done = false; + while (!done) { + const r = await reader.readMany(); + if (r.done) { + done = true; + } else { + arrays = arrays.concat(r.value.map((x) => Buffer.from(x))); + if ( + arrays.reduce((prev, curr) => prev + curr.length, 0) < + 1024 * 1024 * 10 + ) + continue; + } + let buf: Buffer; + while ( + ((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)), + buf.subarray(warc10B.length).includes(warc10B)) + ) { + const until = buf.indexOf(crlfcrlfB); + const header = buf.subarray(0, until); + + const lines = splitBuffer(header, crlfB); + let i = 0; + const nextLine = () => { + const line = lines[i]; + i++; + return line ? line : emptyBuffer; + }; + let line: Buffer; + if (!(line = nextLine()).equals(warc10B)) { + throw new Error(`No WARC 1.0 header in '${line}'`); + } + + let field; + let fields = new Map(); + while ( + ((line = nextLine()), + (field = parseField(line.toString("utf8"))), + line.length !== 0) + ) { + fields.set(field[0], field[1]); + } + const length = parseInt(fields.get("Content-Length")!); + + const rawHttp = buf.subarray( + until + crlfcrlfB.length, + until + crlfcrlfB.length + length + ); + const rawHttpHeaders = rawHttp + .subarray( + rawHttp.indexOf(crlfB) + crlfB.length, + rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length + ) + .toString(); + + let httpHeaders = new Map(); + rawHttpHeaders.split(crlf).forEach((line) => { + if (!line.length) return; + const [key, val] = line.split(": "); + httpHeaders.set(key, val); + }); + + let content = rawHttp.subarray( + rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length + ); + + if (httpHeaders.get("Transfer-Encoding") === "chunked") { + content = dechunk(content); + } + + // console.debug(fields.get("WARC-Date"), content.length); + + yield { + fields, + content, + }; + + arrays = [ + buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length), + ]; + if (!arrays[0].length) break; + } + } +} + +function splitBuffer(buffer: Buffer, val: Buffer): Buffer[] { + let bufs = []; + let rest = buffer; + let i; + while (((i = rest.indexOf(val)), i !== -1)) { + bufs.push(rest.subarray(0, i)); + rest = rest.subarray(i + val.length); + } + bufs.push(rest); + return bufs; +} + +function parseField(line: string): [string, string] { + const [key, val] = line.split(": "); + return [key, val]; +} + +function dechunk(content: Buffer): Buffer { + let actualContent = []; + + while (true) { + let until = content.indexOf(crlf); + const hexLen = content.subarray(0, until).toString(); + if (hexLen.length === 0) break; + const len = parseInt(hexLen, 16); + actualContent.push( + content.subarray(until + crlfB.length, until + crlfB.length + len) + ); + content = content.subarray(until + crlfB.length + len + crlfB.length); + } + + return Buffer.concat(actualContent); +}