mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-26 19:46:19 +00:00
scraper: custom warc parser
This commit is contained in:
parent
8f8e133b5f
commit
068afc9bd8
4 changed files with 171 additions and 104 deletions
|
@ -28,97 +28,7 @@ import { WARCParser } from "warcio";
|
||||||
// }
|
// }
|
||||||
// });
|
// });
|
||||||
|
|
||||||
const crlf = "\r\n";
|
// await bench("warc", );
|
||||||
const crlfB = Buffer.from(crlf, "utf-8");
|
|
||||||
const crlfcrlf = crlf + crlf;
|
|
||||||
const crlfcrlfB = Buffer.from(crlfcrlf, "utf-8");
|
|
||||||
const warc10B = Buffer.from("WARC/1.0", "utf-8");
|
|
||||||
const emptyBuffer = Buffer.from("", "utf-8");
|
|
||||||
|
|
||||||
await bench("warc", async () => {
|
|
||||||
// const warc = Bun.spawn(
|
|
||||||
// ["zstd", "-do", "/dev/stdout", "../data/carrefour.warc.zst"],
|
|
||||||
// {
|
|
||||||
// stderr: "ignore",
|
|
||||||
// }
|
|
||||||
// ).stdout;
|
|
||||||
|
|
||||||
const warc = Bun.stdin.stream();
|
|
||||||
|
|
||||||
// const warc = process.stdin;
|
|
||||||
|
|
||||||
let arrays: Buffer[] = [];
|
|
||||||
|
|
||||||
const myWritable = new Writable({
|
|
||||||
highWaterMark: 1024 * 1024 * 1024,
|
|
||||||
writev(chunks, callback) {},
|
|
||||||
});
|
|
||||||
|
|
||||||
for await (const chunk of warc) {
|
|
||||||
// console.debug(chunk.length);
|
|
||||||
const b = Buffer.from(chunk);
|
|
||||||
arrays.push(b);
|
|
||||||
if (
|
|
||||||
arrays.reduce((prev, curr) => prev + curr.length, 0) <
|
|
||||||
1024 * 1024 * 1024
|
|
||||||
)
|
|
||||||
continue;
|
|
||||||
let buf: Buffer;
|
|
||||||
while (
|
|
||||||
((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)),
|
|
||||||
buf.subarray(warc10B.length).includes(warc10B))
|
|
||||||
) {
|
|
||||||
const until = buf.indexOf(crlfcrlfB);
|
|
||||||
const header = buf.subarray(0, until);
|
|
||||||
|
|
||||||
const lines = splitBuffer(header, crlfB);
|
|
||||||
let i = 0;
|
|
||||||
const nextLine = () => {
|
|
||||||
const line = lines[i];
|
|
||||||
i++;
|
|
||||||
return line ? line : emptyBuffer;
|
|
||||||
};
|
|
||||||
let line: Buffer;
|
|
||||||
if (!(line = nextLine()).equals(warc10B)) {
|
|
||||||
throw new Error(`No WARC 1.0 header in '${line}'`);
|
|
||||||
}
|
|
||||||
|
|
||||||
let field;
|
|
||||||
let fields = new Map();
|
|
||||||
while (
|
|
||||||
((line = nextLine()),
|
|
||||||
(field = parseField(line.toString("utf8"))),
|
|
||||||
line.length !== 0)
|
|
||||||
) {
|
|
||||||
fields.set(field[0], field[1]);
|
|
||||||
}
|
|
||||||
const length = parseInt(fields.get("Content-Length"));
|
|
||||||
const content = buf.subarray(0, length);
|
|
||||||
console.debug(fields.get("WARC-Date"), content.length);
|
|
||||||
|
|
||||||
arrays = [
|
|
||||||
buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length),
|
|
||||||
];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
function splitBuffer(buffer: Buffer, val: Buffer): Buffer[] {
|
|
||||||
let bufs = [];
|
|
||||||
let rest = buffer;
|
|
||||||
let i;
|
|
||||||
while (((i = rest.indexOf(val)), i !== -1)) {
|
|
||||||
bufs.push(rest.subarray(0, i));
|
|
||||||
rest = rest.subarray(i + val.length);
|
|
||||||
}
|
|
||||||
bufs.push(rest);
|
|
||||||
return bufs;
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseField(line: string): [string, string] {
|
|
||||||
const [key, val] = line.split(": ");
|
|
||||||
return [key, val];
|
|
||||||
}
|
|
||||||
|
|
||||||
async function bench(name: string, func: () => Promise<void>) {
|
async function bench(name: string, func: () => Promise<void>) {
|
||||||
const t0 = performance.now();
|
const t0 = performance.now();
|
||||||
|
|
|
@ -13,8 +13,10 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"drizzle-orm": "^0.29.1",
|
"drizzle-orm": "^0.29.1",
|
||||||
"linkedom": "^0.16.5",
|
"linkedom": "^0.16.5",
|
||||||
|
"mitata": "^0.1.6",
|
||||||
"nanoid": "^5.0.4",
|
"nanoid": "^5.0.4",
|
||||||
"p-map": "^7.0.0",
|
"p-map": "^7.0.0",
|
||||||
|
"simple-zstd": "^1.4.2",
|
||||||
"undici": "^6.2.0",
|
"undici": "^6.2.0",
|
||||||
"warcio": "^2.2.1",
|
"warcio": "^2.2.1",
|
||||||
"zod": "^3.22.4"
|
"zod": "^3.22.4"
|
||||||
|
|
|
@ -13,8 +13,9 @@ import { getDiaProduct } from "./dia.js";
|
||||||
import { getCotoProduct } from "./coto.js";
|
import { getCotoProduct } from "./coto.js";
|
||||||
import { join } from "path";
|
import { join } from "path";
|
||||||
import pMap from "p-map";
|
import pMap from "p-map";
|
||||||
|
import { parseWARC } from "./warc.js";
|
||||||
|
|
||||||
const DEBUG = false;
|
const DEBUG = true;
|
||||||
|
|
||||||
const sqlite = new Database("sqlite.db");
|
const sqlite = new Database("sqlite.db");
|
||||||
const db = drizzle(sqlite);
|
const db = drizzle(sqlite);
|
||||||
|
@ -51,17 +52,14 @@ async function storePrecioPoint(point: Precio) {
|
||||||
async function parseWarc(path: string) {
|
async function parseWarc(path: string) {
|
||||||
// const warc = createReadStream(path);
|
// const warc = createReadStream(path);
|
||||||
|
|
||||||
const warc = Bun.spawn(["zstd", "-do", "/dev/stdout", path], {
|
const parser = parseWARC(path);
|
||||||
stderr: "ignore",
|
|
||||||
}).stdout;
|
|
||||||
|
|
||||||
const parser = new WARCParser(warc);
|
|
||||||
for await (const record of parser) {
|
for await (const record of parser) {
|
||||||
if (record.warcType === "response") {
|
if (record.fields.get("WARC-Type") === "response") {
|
||||||
if (!record.warcTargetURI) continue;
|
const rawUri = record.fields.get("WARC-Target-URI");
|
||||||
const html = await record.contentText();
|
if (!rawUri) continue;
|
||||||
|
const html = record.content.toString();
|
||||||
|
|
||||||
const url = new URL(record.warcTargetURI);
|
const url = new URL(rawUri.replace(/^</, "").replace(/>$/, ""));
|
||||||
try {
|
try {
|
||||||
let ish: Precioish | undefined = undefined;
|
let ish: Precioish | undefined = undefined;
|
||||||
if (url.hostname === "www.carrefour.com.ar")
|
if (url.hostname === "www.carrefour.com.ar")
|
||||||
|
@ -74,8 +72,8 @@ async function parseWarc(path: string) {
|
||||||
|
|
||||||
const p: Precio = {
|
const p: Precio = {
|
||||||
...ish,
|
...ish,
|
||||||
fetchedAt: new Date(record.warcDate!),
|
fetchedAt: new Date(record.fields.get("WARC-Date")!),
|
||||||
url: record.warcTargetURI,
|
url: url.toString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if (ish) await storePrecioPoint(p);
|
if (ish) await storePrecioPoint(p);
|
||||||
|
@ -88,7 +86,7 @@ async function parseWarc(path: string) {
|
||||||
|
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
const urlHash = createHash("md5")
|
const urlHash = createHash("md5")
|
||||||
.update(record.warcTargetURI!)
|
.update(url.toString())
|
||||||
.digest("hex");
|
.digest("hex");
|
||||||
const output = join("debug", `${urlHash}.html`);
|
const output = join("debug", `${urlHash}.html`);
|
||||||
await writeFile(output, html);
|
await writeFile(output, html);
|
||||||
|
|
157
scraper/warc.ts
Normal file
157
scraper/warc.ts
Normal file
|
@ -0,0 +1,157 @@
|
||||||
|
const crlf = "\r\n";
|
||||||
|
const crlfB = Buffer.from(crlf, "utf-8");
|
||||||
|
const crlfcrlf = crlf + crlf;
|
||||||
|
const crlfcrlfB = Buffer.from(crlfcrlf, "utf-8");
|
||||||
|
const warc10B = Buffer.from("WARC/1.0", "utf-8");
|
||||||
|
const emptyBuffer = Buffer.from("", "utf-8");
|
||||||
|
|
||||||
|
export async function* parseWARC(path: string) {
|
||||||
|
const warc = Bun.spawn(["zstd", "-do", "/dev/stdout", path], {
|
||||||
|
stderr: "ignore",
|
||||||
|
}).stdout;
|
||||||
|
|
||||||
|
// const warc = Bun.stdin.stream(1024 * 1024 * 128);
|
||||||
|
|
||||||
|
// let buffer: Uint8Array[] = [];
|
||||||
|
// const transform = new TransformStream<Uint8Array, Buffer>({
|
||||||
|
// transform(chunk, controller) {
|
||||||
|
// buffer.push(chunk);
|
||||||
|
// if (
|
||||||
|
// buffer.reduce((prev, curr) => prev + curr.length, 0) >
|
||||||
|
// 1024 * 1024 * 64
|
||||||
|
// ) {
|
||||||
|
// controller.enqueue(Buffer.concat(buffer));
|
||||||
|
// buffer = [];
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// flush(controller) {
|
||||||
|
// controller.enqueue(Buffer.concat(buffer));
|
||||||
|
// },
|
||||||
|
// });
|
||||||
|
|
||||||
|
// warc.pipeTo(transform.writable);
|
||||||
|
|
||||||
|
const reader = warc.getReader();
|
||||||
|
// const reader = transform.readable.getReader();
|
||||||
|
|
||||||
|
// const warc = process.stdin;
|
||||||
|
|
||||||
|
let arrays: Buffer[] = [];
|
||||||
|
let done = false;
|
||||||
|
while (!done) {
|
||||||
|
const r = await reader.readMany();
|
||||||
|
if (r.done) {
|
||||||
|
done = true;
|
||||||
|
} else {
|
||||||
|
arrays = arrays.concat(r.value.map((x) => Buffer.from(x)));
|
||||||
|
if (
|
||||||
|
arrays.reduce((prev, curr) => prev + curr.length, 0) <
|
||||||
|
1024 * 1024 * 10
|
||||||
|
)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let buf: Buffer;
|
||||||
|
while (
|
||||||
|
((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)),
|
||||||
|
buf.subarray(warc10B.length).includes(warc10B))
|
||||||
|
) {
|
||||||
|
const until = buf.indexOf(crlfcrlfB);
|
||||||
|
const header = buf.subarray(0, until);
|
||||||
|
|
||||||
|
const lines = splitBuffer(header, crlfB);
|
||||||
|
let i = 0;
|
||||||
|
const nextLine = () => {
|
||||||
|
const line = lines[i];
|
||||||
|
i++;
|
||||||
|
return line ? line : emptyBuffer;
|
||||||
|
};
|
||||||
|
let line: Buffer;
|
||||||
|
if (!(line = nextLine()).equals(warc10B)) {
|
||||||
|
throw new Error(`No WARC 1.0 header in '${line}'`);
|
||||||
|
}
|
||||||
|
|
||||||
|
let field;
|
||||||
|
let fields = new Map<string, string>();
|
||||||
|
while (
|
||||||
|
((line = nextLine()),
|
||||||
|
(field = parseField(line.toString("utf8"))),
|
||||||
|
line.length !== 0)
|
||||||
|
) {
|
||||||
|
fields.set(field[0], field[1]);
|
||||||
|
}
|
||||||
|
const length = parseInt(fields.get("Content-Length")!);
|
||||||
|
|
||||||
|
const rawHttp = buf.subarray(
|
||||||
|
until + crlfcrlfB.length,
|
||||||
|
until + crlfcrlfB.length + length
|
||||||
|
);
|
||||||
|
const rawHttpHeaders = rawHttp
|
||||||
|
.subarray(
|
||||||
|
rawHttp.indexOf(crlfB) + crlfB.length,
|
||||||
|
rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length
|
||||||
|
)
|
||||||
|
.toString();
|
||||||
|
|
||||||
|
let httpHeaders = new Map<string, string>();
|
||||||
|
rawHttpHeaders.split(crlf).forEach((line) => {
|
||||||
|
if (!line.length) return;
|
||||||
|
const [key, val] = line.split(": ");
|
||||||
|
httpHeaders.set(key, val);
|
||||||
|
});
|
||||||
|
|
||||||
|
let content = rawHttp.subarray(
|
||||||
|
rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length
|
||||||
|
);
|
||||||
|
|
||||||
|
if (httpHeaders.get("Transfer-Encoding") === "chunked") {
|
||||||
|
content = dechunk(content);
|
||||||
|
}
|
||||||
|
|
||||||
|
// console.debug(fields.get("WARC-Date"), content.length);
|
||||||
|
|
||||||
|
yield {
|
||||||
|
fields,
|
||||||
|
content,
|
||||||
|
};
|
||||||
|
|
||||||
|
arrays = [
|
||||||
|
buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length),
|
||||||
|
];
|
||||||
|
if (!arrays[0].length) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function splitBuffer(buffer: Buffer, val: Buffer): Buffer[] {
|
||||||
|
let bufs = [];
|
||||||
|
let rest = buffer;
|
||||||
|
let i;
|
||||||
|
while (((i = rest.indexOf(val)), i !== -1)) {
|
||||||
|
bufs.push(rest.subarray(0, i));
|
||||||
|
rest = rest.subarray(i + val.length);
|
||||||
|
}
|
||||||
|
bufs.push(rest);
|
||||||
|
return bufs;
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseField(line: string): [string, string] {
|
||||||
|
const [key, val] = line.split(": ");
|
||||||
|
return [key, val];
|
||||||
|
}
|
||||||
|
|
||||||
|
function dechunk(content: Buffer): Buffer {
|
||||||
|
let actualContent = [];
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
let until = content.indexOf(crlf);
|
||||||
|
const hexLen = content.subarray(0, until).toString();
|
||||||
|
if (hexLen.length === 0) break;
|
||||||
|
const len = parseInt(hexLen, 16);
|
||||||
|
actualContent.push(
|
||||||
|
content.subarray(until + crlfB.length, until + crlfB.length + len)
|
||||||
|
);
|
||||||
|
content = content.subarray(until + crlfB.length + len + crlfB.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Buffer.concat(actualContent);
|
||||||
|
}
|
Loading…
Reference in a new issue