Compare commits

..

No commits in common. "8fa70d13005d86aec916acbd40c1dab9ec186fe8" and "50873cca8cbbcf35ff4553564ef324d37dc32230" have entirely different histories.

4 changed files with 131 additions and 117 deletions

View file

@ -8,7 +8,7 @@ scrapeo "masivo" de precios y datos en supermercados argentinos
(no hace falta correrlos porque ya hay listas armadas en [data/](./data/)) (no hace falta correrlos porque ya hay listas armadas en [data/](./data/))
- [warcificator](./warcificator/) descarga las paginas de productos y genera un archivo [WARC](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.0/) con ellas - se usa wget (potencialmente reemplazado por algo custom en el futuro) que genera un archivo [WARC](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.0/) con todas las paginas de productos
- el [scraper](./scraper/) procesa estos WARCs, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts)) - el [scraper](./scraper/) procesa estos WARCs, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts))
- el [sitio](./sitio/) renderiza páginas a partir de la base de datos y hace gráficos lindos - el [sitio](./sitio/) renderiza páginas a partir de la base de datos y hace gráficos lindos

View file

@ -8,27 +8,12 @@ RUN bun install --frozen-lockfile \
&& bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \ && bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \
&& rm -rf node_modules/ && rm -rf node_modules/
# https://dev.to/deciduously/use-multi-stage-docker-builds-for-statically-linked-rust-binaries-3jgd
FROM docker.io/rust:1.74 AS warcificator-builder
WORKDIR /usr/src/
RUN rustup target add x86_64-unknown-linux-musl
RUN apt-get update && apt-get install -y musl-tools musl-dev
RUN USER=root cargo new warcificator
WORKDIR /usr/src/warcificator
COPY ./warcificator/Cargo.toml ./warcificator/Cargo.lock ./
RUN cargo build --release
COPY ./warcificator/src ./src
RUN cargo install --target x86_64-unknown-linux-musl --path .
FROM base FROM base
RUN apk add --no-cache wget zstd tini RUN apk add --no-cache wget zstd tini
RUN printf "#!/bin/sh\nexec bun /bin/scraper auto\n" > /etc/periodic/daily/scraper \ RUN printf "#!/bin/sh\nexec bun /bin/scraper auto\n" > /etc/periodic/daily/scraper \
&& chmod +x /etc/periodic/daily/scraper && chmod +x /etc/periodic/daily/scraper
COPY --from=builder /tmp/cli.build.js /bin/scraper COPY --from=builder /tmp/cli.build.js /bin/scraper
COPY --from=warcificator-builder /usr/local/cargo/bin/warcificator /bin/
COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle
COPY --from=builder /usr/src/app/data /listas COPY --from=builder /usr/src/app/data /listas
WORKDIR /app WORKDIR /app

View file

@ -22,6 +22,9 @@ const supermercados: Supermercado[] = [
Supermercado.Dia, Supermercado.Dia,
]; ];
// hacemos una cola para la compresión para no sobrecargar la CPU
const compressionQueue = new PQueue({ concurrency: 1 });
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU // hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
const scrapQueue = new PQueue({ concurrency: 1 }); const scrapQueue = new PQueue({ concurrency: 1 });
@ -74,7 +77,7 @@ class Auto {
} }
async downloadList(supermercado: Supermercado) { async downloadList(supermercado: Supermercado) {
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-download-")); const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-"));
let listPath: string; let listPath: string;
{ {
@ -114,7 +117,15 @@ class Auto {
)}.warc.zst`; )}.warc.zst`;
const zstdWarcPath = join(ctxPath, zstdWarcName); const zstdWarcPath = join(ctxPath, zstdWarcName);
const subproc = Bun.spawn({ const subproc = Bun.spawn({
cmd: ["warcificator", listPath, zstdWarcPath], cmd: [
"wget",
"--no-verbose",
"--tries=3",
"--delete-after",
"--input-file",
listPath,
`--warc-file=temp`,
],
stderr: "ignore", stderr: "ignore",
stdout: "ignore", stdout: "ignore",
cwd: ctxPath, cwd: ctxPath,
@ -122,9 +133,18 @@ class Auto {
const t0 = performance.now(); const t0 = performance.now();
await subproc.exited; await subproc.exited;
this.inform( this.inform(
`[downloader] ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}` `[wget] ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`
); );
const gzippedWarcPath = join(ctxPath, "temp.warc.gz");
if (!(await fileExists(gzippedWarcPath))) {
const err = this.report(`no encontré el ${gzippedWarcPath}`);
throw err;
}
await compressionQueue.add(() =>
this.recompress(gzippedWarcPath, zstdWarcPath)
);
if (!(await fileExists(zstdWarcPath))) { if (!(await fileExists(zstdWarcPath))) {
const err = this.report(`no encontré el ${zstdWarcPath}`); const err = this.report(`no encontré el ${zstdWarcPath}`);
throw err; throw err;
@ -170,6 +190,49 @@ class Auto {
} }
} }
/**
* toma un archivo gzippeado y lo recomprime con zstd.
* borra el archivo original.
*/
recompress(inputPath: string, outputPath: string) {
// XXX: por alguna razón no funciona en Bun 1.0.20
// const decompressor = Bun.spawn({
// cmd: ["gzip", "-dc", inputPath],
// stderr: "inherit",
// });
// const compressor = Bun.spawn({
// cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath],
// stdin: decompressor.stdout,
// // stderr: "inherit",
// });
// const errorCode = await compressor.exited;
// if (errorCode !== 0) {
// const err = report(`zstd threw error code ${errorCode}`);
// throw err;
// }
return new Promise((resolve, reject) => {
const decompressor = spawn("gzip", ["-dc", inputPath], {
stdio: [null, "pipe", null],
});
const compressor = spawn(
"zstd",
["-T0", "-15", "--long", "-o", outputPath],
{
stdio: ["pipe", null, null],
}
);
decompressor.stdout.pipe(compressor.stdin);
compressor.on("close", (code) => {
if (code !== 0) {
const err = this.report(`zstd threw error code ${code}`);
reject(err);
}
resolve(void 0);
});
});
}
async uploadToBucket({ async uploadToBucket({
fileName, fileName,
file, file,
@ -215,6 +278,7 @@ class Auto {
await fetch(url); await fetch(url);
} }
} }
// await recompress("sqlite.db.gz", "sqlite.db.zst");
// no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists // no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists
async function fileExists(path: string) { async function fileExists(path: string) {

View file

@ -1,24 +1,16 @@
use async_channel::{Receiver, Sender}; use async_channel::{Receiver, Sender};
use std::{ use std::{env::args, fs, io::stdout, net::SocketAddr};
env::args,
fs,
net::SocketAddr,
process::{Command, Stdio},
};
use tokio::io::{stderr, AsyncWriteExt};
use warc::{RecordBuilder, WarcHeader, WarcWriter}; use warc::{RecordBuilder, WarcHeader, WarcWriter};
struct FullExchange { struct FullExchange {
socket_addr: Option<SocketAddr>, socket_addr: SocketAddr,
request: http::Request<&'static str>, request: http::Request<&'static str>,
response: http::Response<Vec<u8>>, response: http::Response<Vec<u8>>,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let mut args = args().skip(1); let links_list_path = args().skip(1).next().unwrap();
let links_list_path = args.next().unwrap();
let output_zstd_path = args.next().unwrap();
let links_str = fs::read_to_string(links_list_path).unwrap(); let links_str = fs::read_to_string(links_list_path).unwrap();
let links = links_str let links = links_str
.split("\n") .split("\n")
@ -26,7 +18,6 @@ async fn main() {
.filter(|s| s.len() > 0) .filter(|s| s.len() > 0)
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let handle = { let handle = {
let (sender, receiver) = async_channel::bounded::<String>(1); let (sender, receiver) = async_channel::bounded::<String>(1);
let (res_sender, res_receiver) = async_channel::unbounded::<FullExchange>(); let (res_sender, res_receiver) = async_channel::unbounded::<FullExchange>();
@ -38,7 +29,7 @@ async fn main() {
handles.push(tokio::spawn(worker(rx, tx))); handles.push(tokio::spawn(worker(rx, tx)));
} }
let warc_writer_handle = tokio::spawn(warc_writer(res_receiver, output_zstd_path)); let warc_writer_handle = tokio::spawn(warc_writer(res_receiver));
for link in links { for link in links {
sender.send_blocking(link).unwrap(); sender.send_blocking(link).unwrap();
@ -57,116 +48,90 @@ async fn main() {
async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) { async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) {
let client = reqwest::ClientBuilder::default().build().unwrap(); let client = reqwest::ClientBuilder::default().build().unwrap();
while let Ok(url) = rx.recv().await { while let Ok(url) = rx.recv().await {
let res = fetch(&client, url.clone()).await; let request = client.get(url).build().unwrap();
match res { let mut http_request_builder = http::Request::builder()
Ok(ex) => { .method(request.method())
tx.send(ex).await.unwrap(); .uri(request.url().as_str());
} for (key, val) in request.headers() {
Err(err) => { http_request_builder = http_request_builder.header(key, val);
stderr()
.write_all(format!("Failed to fetch {}: {:#?}", url.as_str(), err).as_bytes())
.await
.unwrap();
}
} }
} let response = client.execute(request).await.unwrap();
}
async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, reqwest::Error> { let ip_address = response.remote_addr().unwrap();
let request = client.get(url).build().unwrap();
let mut http_request_builder = http::Request::builder()
.method(request.method())
.uri(request.url().as_str());
for (key, val) in request.headers() {
http_request_builder = http_request_builder.header(key, val);
}
let response = client.execute(request).await?;
let ip_address = response.remote_addr(); let http_request = {
http_request_builder
.version(response.version())
.body("")
.unwrap()
};
let http_request = { let http_response = {
http_request_builder let mut http_response_builder = http::Response::<()>::builder()
.version(response.version()) .status(response.status())
.body("") .version(response.version());
.unwrap() for (key, val) in response.headers() {
}; http_response_builder = http_response_builder.header(key, val);
}
let body = response.bytes().await.unwrap();
http_response_builder.body(body.to_vec()).unwrap()
};
let http_response = { tx.send(FullExchange {
let mut http_response_builder = http::Response::<()>::builder() socket_addr: ip_address,
.status(response.status()) request: http_request,
.version(response.version()); response: http_response,
for (key, val) in response.headers() { })
http_response_builder = http_response_builder.header(key, val); .await
}
let body = response.bytes().await?;
http_response_builder.body(body.to_vec()).unwrap()
};
Ok(FullExchange {
socket_addr: ip_address,
request: http_request,
response: http_response,
})
}
async fn warc_writer(rx: Receiver<FullExchange>, output_zstd_path: String) {
let zstd_proc = Command::new("zstd")
.args(&["-T0", "-15", "--long", "-o", &output_zstd_path])
.stdin(Stdio::piped())
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()
.unwrap(); .unwrap();
}
}
let mut writer = WarcWriter::new(zstd_proc.stdin.unwrap()); async fn warc_writer(rx: Receiver<FullExchange>) {
let mut writer = WarcWriter::new(stdout());
let warc_fields = format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html");
writer writer
.write( .write(
&RecordBuilder::default() &RecordBuilder::default()
.version("1.0".to_owned()) .version("1.0".to_owned())
.warc_type(warc::RecordType::WarcInfo) .warc_type(warc::RecordType::WarcInfo)
.header(WarcHeader::ContentType, "application/warc-fields") .header(WarcHeader::ContentType, "application/warc-fields")
.body(format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html").into()) .body(warc_fields.into())
.build() .build()
.unwrap(), .unwrap(),
) )
.unwrap(); .unwrap();
while let Ok(res) = rx.recv().await { while let Ok(res) = rx.recv().await {
let uri = res.request.uri().to_string(); let uri = res.request.uri().to_string();
let req_record = {
let mut builder = RecordBuilder::default()
.version("1.0".to_owned())
.warc_type(warc::RecordType::Request)
.header(WarcHeader::TargetURI, uri.clone())
.header(WarcHeader::ContentType, "application/http;msgtype=request")
.header(
WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
"the request contains other headers not included here",
);
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_request(res.request).into_bytes())
.build()
.unwrap()
};
writer.write(&req_record).unwrap();
writer writer
.write(&{ .write(
let mut builder = RecordBuilder::default() &RecordBuilder::default()
.version("1.0".to_owned())
.warc_type(warc::RecordType::Request)
.header(WarcHeader::TargetURI, uri.clone())
.header(WarcHeader::IPAddress, res.socket_addr.ip().to_string())
.header(WarcHeader::ContentType, "application/http;msgtype=request")
.header(
WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
"the request contains other headers not included here",
)
.body(format_http11_request(res.request).into_bytes())
.build()
.unwrap(),
)
.unwrap();
writer
.write(
&RecordBuilder::default()
.version("1.0".to_owned()) .version("1.0".to_owned())
.warc_type(warc::RecordType::Response) .warc_type(warc::RecordType::Response)
.header(WarcHeader::TargetURI, uri) .header(WarcHeader::TargetURI, uri)
.header(WarcHeader::ConcurrentTo, req_record.warc_id()) .header(WarcHeader::IPAddress, res.socket_addr.ip().to_string())
.header(WarcHeader::ContentType, "application/http;msgtype=response"); .header(WarcHeader::ContentType, "application/http;msgtype=response")
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_response(res.response)) .body(format_http11_response(res.response))
.build() .build()
.unwrap() .unwrap(),
}) )
.unwrap(); .unwrap();
} }
} }