This commit is contained in:
Cat /dev/Nulo 2024-01-02 00:21:21 -03:00
parent 405502877c
commit c4b49814fb
11 changed files with 53 additions and 1950 deletions

BIN
bun.lockb

Binary file not shown.

View file

@ -8,29 +8,19 @@ scrapeo "masivo" de precios y datos en supermercados argentinos
(no hace falta correrlos porque ya hay listas armadas en [data/](./data/))
- [warcificator](./warcificator/) descarga las paginas de productos y genera un archivo [WARC](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.0/) con ellas
- el [scraper](./scraper/) procesa estos WARCs, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts))
- el [scraper](./scraper/) descarga todos los links, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts))
- el [sitio](./sitio/) renderiza páginas a partir de la base de datos y hace gráficos lindos
## setup
hay que instalar [Bun](https://bun.sh/), que lo estoy usando porque hacía que el scraper corra más rápido. quizás en el futuro lo reemplace con good old Node.js.
aparte, se necesita zstd, que se usa para comprimir los WARCs eficientemente. seguro está disponible en las repos de tu distro favorita :)
empezá descargando un WARC con 50 páginas de sample, y recomprimilo con zstd:
```
wget --no-verbose --tries=3 --delete-after --input-file ./data/samples/Dia.txt --warc-file=dia-sample
gzip -dc dia-sample.warc.gz | zstd --long -15 --no-sparse -o dia-sample.warc.zst
```
después, scrapealo a una BD:
después, escrapea un sample de productos de Carrefour a una BD:
```
cd scraper/
bun install
bun cli.ts scrap ../dia-sample.warc.zst
bun cli.ts scrap ./data/samples/Carrefour.50.txt
```
ahora miralo en el sitio:

View file

@ -8,27 +8,12 @@ RUN bun install --frozen-lockfile \
&& bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \
&& rm -rf node_modules/
# https://dev.to/deciduously/use-multi-stage-docker-builds-for-statically-linked-rust-binaries-3jgd
FROM docker.io/rust:1.74 AS warcificator-builder
WORKDIR /usr/src/
RUN rustup target add x86_64-unknown-linux-musl
RUN apt-get update && apt-get install -y musl-tools musl-dev
RUN USER=root cargo new warcificator
WORKDIR /usr/src/warcificator
COPY ./warcificator/Cargo.toml ./warcificator/Cargo.lock ./
RUN cargo build --release
COPY ./warcificator/src ./src
RUN cargo install --target x86_64-unknown-linux-musl --path .
FROM base
RUN apk add --no-cache wget zstd tini
RUN printf "#!/bin/sh\nexec bun /bin/scraper auto\n" > /etc/periodic/daily/scraper \
&& chmod +x /etc/periodic/daily/scraper
COPY --from=builder /tmp/cli.build.js /bin/scraper
COPY --from=warcificator-builder /usr/local/cargo/bin/warcificator /bin/
COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle
COPY --from=builder /usr/src/app/data /listas
WORKDIR /app

View file

@ -1,14 +1,10 @@
import { mkdtemp, access, writeFile } from "node:fs/promises";
import { mkdtemp, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join, resolve } from "node:path";
import { spawn } from "node:child_process";
import { join } from "node:path";
import { Supermercado, hosts } from "db-datos/supermercado.js";
import PQueue from "p-queue";
import { format, formatDuration, intervalToDuration } from "date-fns";
import { parseWarc } from "./scrap.js";
import { S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { BunFile } from "bun";
import { formatDuration, intervalToDuration } from "date-fns";
import { downloadList } from "./scrap.js";
import { db } from "db-datos/db.js";
import { like } from "drizzle-orm";
import { productoUrls } from "db-datos/schema.js";
@ -23,7 +19,7 @@ const supermercados: Supermercado[] = [
];
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
const scrapQueue = new PQueue({ concurrency: 1 });
const scrapQueue = new PQueue({ concurrency: 4 });
export async function auto() {
const a = new Auto();
@ -31,35 +27,9 @@ export async function auto() {
}
class Auto {
s3Config?: { s3: S3Client; bucketName: string };
telegramConfig?: { token: string; chatId: string };
constructor() {
if (
!process.env.S3_ACCESS_KEY_ID ||
!process.env.S3_SECRET_ACCESS_KEY ||
!process.env.S3_BUCKET_NAME
) {
if (process.env.NODE_ENV === "development") {
console.warn("faltan creds de s3, no voy a subir a s3");
} else {
throw new Error("faltan creds de s3");
}
} else {
this.s3Config = {
// https://www.backblaze.com/docs/cloud-storage-use-the-aws-sdk-for-javascript-v3-with-backblaze-b2
s3: new S3Client({
endpoint: "https://s3.us-west-004.backblazeb2.com",
region: "us-west-004",
credentials: {
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
},
}),
bucketName: process.env.S3_BUCKET_NAME,
};
}
if (!process.env.TELEGRAM_BOT_TOKEN)
console.warn("no hay TELEGRAM_BOT_TOKEN, no voy a loggear por allá");
else if (!process.env.TELEGRAM_BOT_CHAT_ID)
@ -107,93 +77,29 @@ class Auto {
const urls = results.map((r) => r.url);
await writeFile(listPath, urls.join("\n") + "\n");
const date = new Date();
const zstdWarcName = `${supermercado}-${format(
date,
"yyyy-MM-dd-HH:mm"
)}.warc.zst`;
const zstdWarcPath = join(ctxPath, zstdWarcName);
const subproc = Bun.spawn({
cmd: ["warcificator", listPath, zstdWarcPath],
stderr: "ignore",
stdout: "ignore",
cwd: ctxPath,
});
const t0 = performance.now();
await subproc.exited;
this.inform(
`[downloader] ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`
);
if (!(await fileExists(zstdWarcPath))) {
const err = this.report(`no encontré el ${zstdWarcPath}`);
throw err;
}
this.scrapAndInform({ zstdWarcPath, zstdWarcName });
try {
await this.uploadToBucket({
fileName: zstdWarcName,
file: Bun.file(zstdWarcPath),
});
} catch (error) {
this.inform(`Falló subir ${zstdWarcName} a S3; ${error}`);
console.error(error);
}
this.scrapAndInform({ listPath });
// TODO: borrar archivos temporales
}
async scrapAndInform({
zstdWarcPath,
zstdWarcName,
}: {
zstdWarcPath: string;
zstdWarcName: string;
}) {
async scrapAndInform({ listPath }: { listPath: string }) {
const res = await scrapQueue.add(async () => {
const t0 = performance.now();
const progress = await parseWarc(zstdWarcPath);
const progress = await downloadList(listPath);
return { took: performance.now() - t0, progress };
});
if (res) {
const { took, progress } = res;
this.inform(
`Procesado ${zstdWarcName} (${progress.done} ok, ${
progress.errors.length
} errores) (tardó ${formatMs(took)})`
`Procesado ${listPath} (${progress.done} ok, ${
progress.skipped
} skipped, ${progress.errors.length} errores) (tardó ${formatMs(took)})`
);
} else {
this.inform(`Algo falló en ${zstdWarcName}`);
this.inform(`Algo falló en ${listPath}`);
}
}
async uploadToBucket({
fileName,
file,
}: {
fileName: string;
file: BunFile;
}) {
if (!this.s3Config) {
this.inform(
`[s3] Se intentó subir ${fileName} pero no tenemos creds de S3`
);
return;
}
const parallelUploads3 = new Upload({
client: this.s3Config.s3,
params: {
Bucket: this.s3Config.bucketName,
Key: fileName,
Body: file,
},
});
await parallelUploads3.done();
}
inform(msg: string) {
this.sendTelegramMsg(msg);
console.info(msg);
@ -216,16 +122,6 @@ class Auto {
}
}
// no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists
async function fileExists(path: string) {
try {
access(path);
return true;
} catch {
return false;
}
}
function formatMs(ms: number) {
return formatDuration(intervalToDuration({ start: 0, end: Math.round(ms) }));
}

View file

@ -2,7 +2,7 @@ import { scrapCarrefourProducts } from "../carrefour-link-scraper/index.js";
import { scrapCotoProducts } from "../coto-link-scraper/index.js";
import { scrapDiaProducts } from "../dia-link-scraper/index.js";
import { auto } from "./auto.js";
import { parseWarc } from "./scrap.js";
import { downloadList } from "./scrap.js";
if (process.argv[2] === "auto") {
await auto();
@ -13,16 +13,16 @@ if (process.argv[2] === "auto") {
} else if (process.argv[2] === "scrap-coto-links") {
await scrapCotoProducts();
} else if (process.argv[2] === "scrap") {
const warcPaths = process.argv.slice(3);
if (warcPaths.length > 0) {
for (const path of warcPaths) {
const res = await parseWarc(path);
const urlLists = process.argv.slice(3);
if (urlLists.length > 0) {
for (const path of urlLists) {
const res = await downloadList(path);
console.info("=======================================");
console.info(path, res);
console.info("=======================================");
}
} else {
console.error("Especificá WARCs para scrapear.");
console.error("Especificá listas de urls para scrapear.");
process.exit(1);
}
} else {

View file

@ -19,8 +19,8 @@
"drizzle-orm": "=0.29.1",
"linkedom": "^0.16.5",
"nanoid": "^5.0.4",
"p-map": "^7.0.1",
"p-queue": "^8.0.1",
"warcio": "^2.2.1",
"zod": "^3.22.4"
},
"devDependencies": {

View file

@ -1,68 +1,52 @@
import * as schema from "db-datos/schema.js";
import { WARCParser } from "warcio";
import { writeFile } from "fs/promises";
import { createHash } from "crypto";
import { getCarrefourProduct } from "./parsers/carrefour.js";
import { getDiaProduct } from "./parsers/dia.js";
import { getCotoProduct } from "./parsers/coto.js";
import { join } from "path";
import { and, eq, sql } from "drizzle-orm";
import { db } from "db-datos/db.js";
import pMap from "p-map";
const DEBUG = false;
const PARSER_VERSION = 4;
const getPrevPrecio = db
.select({ id: schema.precios.id })
.from(schema.precios)
.where(
and(
eq(schema.precios.warcRecordId, sql.placeholder("warcRecordId")),
eq(schema.precios.parserVersion, PARSER_VERSION)
)
)
.limit(1)
.prepare();
export type Precio = typeof schema.precios.$inferInsert;
export type Precioish = Omit<
Precio,
"fetchedAt" | "url" | "id" | "warcRecordId" | "parserVersion"
>;
export async function parseWarc(path: string) {
// const warc = createReadStream(path);
export async function downloadList(path: string) {
let progress: {
done: number;
errors: { error: any; warcRecordId: string; path: string }[];
} = { done: 0, errors: [] };
skipped: number;
errors: { error: any; url: string; path: string }[];
} = { done: 0, skipped: 0, errors: [] };
const proc = Bun.spawn(["zstdcat", "-d", path], {});
const warc = proc.stdout;
// TODO: tirar error si falla zstd
let list = (await Bun.file(path).text())
.split("\n")
.filter((s) => s.length > 0);
const parser = new WARCParser(warc);
for await (const record of parser) {
if (record.warcType === "response") {
if (!record.warcTargetURI) continue;
const warcRecordId = record.warcHeader("WARC-Record-ID");
if (!warcRecordId) throw new Error("No tiene WARC-Record-ID");
if (getPrevPrecio.get({ warcRecordId })) {
console.debug(`skipped ${warcRecordId}`);
continue;
await pMap(
list,
async (urlS) => {
let url;
try {
url = new URL(urlS);
} catch (err) {
console.error("error parseando", urlS);
return;
}
if (record.httpHeaders?.statusCode !== 200) {
console.debug(
`skipped ${warcRecordId} because status=${record.httpHeaders?.statusCode} (!=200)`
);
continue;
const res = await fetch(url);
if (!res.ok) {
console.debug(`skipped ${urlS} because status=${res.status} (!=200)`);
progress.skipped++;
return;
}
// TODO: sobreescribir si existe el mismo record-id pero con version mas bajo?
const html = await record.contentText();
const html = await res.text();
const url = new URL(record.warcTargetURI);
try {
let ish: Precioish | undefined = undefined;
if (url.hostname === "www.carrefour.com.ar")
@ -75,9 +59,8 @@ export async function parseWarc(path: string) {
const p: Precio = {
...ish,
fetchedAt: new Date(record.warcDate!),
url: record.warcTargetURI,
warcRecordId,
fetchedAt: new Date(),
url: urlS,
parserVersion: PARSER_VERSION,
};
@ -85,28 +68,23 @@ export async function parseWarc(path: string) {
progress.done++;
} catch (error) {
console.error({ path, warcRecordId, error });
console.error({ path, urlS, error });
progress.errors.push({
path,
warcRecordId,
url: urlS,
error,
});
if (DEBUG) {
const urlHash = createHash("md5")
.update(record.warcTargetURI!)
.digest("hex");
const urlHash = createHash("md5").update(urlS).digest("hex");
const output = join("debug", `${urlHash}.html`);
await writeFile(output, html);
console.error(`wrote html to ${output}`);
}
}
}
}
if ((await proc.exited) !== 0) {
throw new Error("zstd tiró un error");
}
},
{ concurrency: 32 }
);
return progress;
}

View file

@ -1,157 +0,0 @@
const crlf = "\r\n";
const crlfB = Buffer.from(crlf, "utf-8");
const crlfcrlf = crlf + crlf;
const crlfcrlfB = Buffer.from(crlfcrlf, "utf-8");
const warc10B = Buffer.from("WARC/1.0", "utf-8");
const emptyBuffer = Buffer.from("", "utf-8");
export async function* parseWARC(path: string) {
const warc = Bun.spawn(["zstd", "-do", "/dev/stdout", path], {
stderr: "ignore",
}).stdout;
// const warc = Bun.stdin.stream(1024 * 1024 * 128);
// let buffer: Uint8Array[] = [];
// const transform = new TransformStream<Uint8Array, Buffer>({
// transform(chunk, controller) {
// buffer.push(chunk);
// if (
// buffer.reduce((prev, curr) => prev + curr.length, 0) >
// 1024 * 1024 * 64
// ) {
// controller.enqueue(Buffer.concat(buffer));
// buffer = [];
// }
// },
// flush(controller) {
// controller.enqueue(Buffer.concat(buffer));
// },
// });
// warc.pipeTo(transform.writable);
const reader = warc.getReader();
// const reader = transform.readable.getReader();
// const warc = process.stdin;
let arrays: Buffer[] = [];
let done = false;
while (!done) {
const r = await reader.readMany();
if (r.done) {
done = true;
} else {
arrays = arrays.concat(r.value.map((x) => Buffer.from(x)));
if (
arrays.reduce((prev, curr) => prev + curr.length, 0) <
1024 * 1024 * 10
)
continue;
}
let buf: Buffer;
while (
((buf = arrays.length === 1 ? arrays[0] : Buffer.concat(arrays)),
buf.subarray(warc10B.length).includes(warc10B))
) {
const until = buf.indexOf(crlfcrlfB);
const header = buf.subarray(0, until);
const lines = splitBuffer(header, crlfB);
let i = 0;
const nextLine = () => {
const line = lines[i];
i++;
return line ? line : emptyBuffer;
};
let line: Buffer;
if (!(line = nextLine()).equals(warc10B)) {
throw new Error(`No WARC 1.0 header in '${line}'`);
}
let field;
let fields = new Map<string, string>();
while (
((line = nextLine()),
(field = parseField(line.toString("utf8"))),
line.length !== 0)
) {
fields.set(field[0], field[1]);
}
const length = parseInt(fields.get("Content-Length")!);
const rawHttp = buf.subarray(
until + crlfcrlfB.length,
until + crlfcrlfB.length + length
);
const rawHttpHeaders = rawHttp
.subarray(
rawHttp.indexOf(crlfB) + crlfB.length,
rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length
)
.toString();
let httpHeaders = new Map<string, string>();
rawHttpHeaders.split(crlf).forEach((line) => {
if (!line.length) return;
const [key, val] = line.split(": ");
httpHeaders.set(key, val);
});
let content = rawHttp.subarray(
rawHttp.indexOf(crlfcrlfB) + crlfcrlfB.length
);
if (httpHeaders.get("Transfer-Encoding") === "chunked") {
content = dechunk(content);
}
// console.debug(fields.get("WARC-Date"), content.length);
yield {
fields,
content,
};
arrays = [
buf.subarray(until + crlfcrlfB.length + length + crlfcrlfB.length),
];
if (!arrays[0].length) break;
}
}
}
function splitBuffer(buffer: Buffer, val: Buffer): Buffer[] {
let bufs = [];
let rest = buffer;
let i;
while (((i = rest.indexOf(val)), i !== -1)) {
bufs.push(rest.subarray(0, i));
rest = rest.subarray(i + val.length);
}
bufs.push(rest);
return bufs;
}
function parseField(line: string): [string, string] {
const [key, val] = line.split(": ");
return [key, val];
}
function dechunk(content: Buffer): Buffer {
let actualContent = [];
while (true) {
let until = content.indexOf(crlf);
const hexLen = content.subarray(0, until).toString();
if (hexLen.length === 0) break;
const len = parseInt(hexLen, 16);
actualContent.push(
content.subarray(until + crlfB.length, until + crlfB.length + len)
);
content = content.subarray(until + crlfB.length + len + crlfB.length);
}
return Buffer.concat(actualContent);
}

1373
warcificator/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,17 +0,0 @@
[package]
name = "warcificator"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-channel = "2.1.1"
http = "0.2.11"
reqwest = { version = "0.11.23", default-features = false, features = [
"rustls-tls",
"gzip",
"brotli",
] }
tokio = { version = "1.35.1", features = ["full"] }
warc = "0.3.1"

View file

@ -1,199 +0,0 @@
use async_channel::{Receiver, Sender};
use std::{
env::args,
fs,
net::SocketAddr,
process::{Command, Stdio},
};
use tokio::io::{stderr, AsyncWriteExt};
use warc::{RecordBuilder, WarcHeader, WarcWriter};
struct FullExchange {
socket_addr: Option<SocketAddr>,
request: http::Request<&'static str>,
response: http::Response<Vec<u8>>,
}
#[tokio::main]
async fn main() {
let mut args = args().skip(1);
let links_list_path = args.next().unwrap();
let output_zstd_path = args.next().unwrap();
let links_str = fs::read_to_string(links_list_path).unwrap();
let links = links_str
.split("\n")
.map(|s| s.trim())
.filter(|s| s.len() > 0)
.map(|s| s.to_owned())
.collect::<Vec<_>>();
let handle = {
let (sender, receiver) = async_channel::bounded::<String>(1);
let (res_sender, res_receiver) = async_channel::unbounded::<FullExchange>();
let mut handles = Vec::new();
for _ in 1..16 {
let rx = receiver.clone();
let tx = res_sender.clone();
handles.push(tokio::spawn(worker(rx, tx)));
}
let warc_writer_handle = tokio::spawn(warc_writer(res_receiver, output_zstd_path));
for link in links {
sender.send_blocking(link).unwrap();
}
sender.close();
for handle in handles {
handle.await.unwrap();
}
warc_writer_handle
};
handle.await.unwrap();
}
async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) {
let client = reqwest::ClientBuilder::default().build().unwrap();
while let Ok(url) = rx.recv().await {
let res = fetch(&client, url.clone()).await;
match res {
Ok(ex) => {
tx.send(ex).await.unwrap();
}
Err(err) => {
stderr()
.write_all(format!("Failed to fetch {}: {:#?}", url.as_str(), err).as_bytes())
.await
.unwrap();
}
}
}
}
async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, reqwest::Error> {
let request = client.get(url).build().unwrap();
let mut http_request_builder = http::Request::builder()
.method(request.method())
.uri(request.url().as_str());
for (key, val) in request.headers() {
http_request_builder = http_request_builder.header(key, val);
}
let response = client.execute(request).await?;
let ip_address = response.remote_addr();
let http_request = {
http_request_builder
.version(response.version())
.body("")
.unwrap()
};
let http_response = {
let mut http_response_builder = http::Response::<()>::builder()
.status(response.status())
.version(response.version());
for (key, val) in response.headers() {
http_response_builder = http_response_builder.header(key, val);
}
let body = response.bytes().await?;
http_response_builder.body(body.to_vec()).unwrap()
};
Ok(FullExchange {
socket_addr: ip_address,
request: http_request,
response: http_response,
})
}
async fn warc_writer(rx: Receiver<FullExchange>, output_zstd_path: String) {
let zstd_proc = Command::new("zstd")
.args(&["-T0", "-15", "--long", "-o", &output_zstd_path])
.stdin(Stdio::piped())
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()
.unwrap();
let mut writer = WarcWriter::new(zstd_proc.stdin.unwrap());
writer
.write(
&RecordBuilder::default()
.version("1.0".to_owned())
.warc_type(warc::RecordType::WarcInfo)
.header(WarcHeader::ContentType, "application/warc-fields")
.body(format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html").into())
.build()
.unwrap(),
)
.unwrap();
while let Ok(res) = rx.recv().await {
let uri = res.request.uri().to_string();
let req_record = {
let mut builder = RecordBuilder::default()
.version("1.0".to_owned())
.warc_type(warc::RecordType::Request)
.header(WarcHeader::TargetURI, uri.clone())
.header(WarcHeader::ContentType, "application/http;msgtype=request")
.header(
WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
"the request contains other headers not included here",
);
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_request(res.request).into_bytes())
.build()
.unwrap()
};
writer.write(&req_record).unwrap();
writer
.write(&{
let mut builder = RecordBuilder::default()
.version("1.0".to_owned())
.warc_type(warc::RecordType::Response)
.header(WarcHeader::TargetURI, uri)
.header(WarcHeader::ConcurrentTo, req_record.warc_id())
.header(WarcHeader::ContentType, "application/http;msgtype=response");
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_response(res.response))
.build()
.unwrap()
})
.unwrap();
}
}
fn format_http11_request(req: http::Request<&'static str>) -> String {
let start_line = format!("{} {} HTTP/1.1", req.method().as_str(), req.uri().path());
let headers_str = req
.headers()
.iter()
.map(|(key, val)| format!("{}: {}\r\n", key, val.to_str().unwrap()))
.collect::<String>();
[start_line.as_str(), headers_str.as_str(), req.body()].join("\r\n")
}
fn format_http11_response(res: http::Response<Vec<u8>>) -> Vec<u8> {
let start_line = format!(
"HTTP/1.1 {} {}",
res.status().as_str(),
res.status().canonical_reason().unwrap_or("")
);
let headers_str = res
.headers()
.iter()
.map(|(key, val)| format!("{}: {}\r\n", key, val.to_str().unwrap()))
.collect::<String>();
let crlf: &[u8] = &[13, 10];
[start_line.as_bytes(), headers_str.as_bytes(), res.body()].join(crlf)
}