mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-26 03:26:19 +00:00
Compare commits
No commits in common. "8fa70d13005d86aec916acbd40c1dab9ec186fe8" and "50873cca8cbbcf35ff4553564ef324d37dc32230" have entirely different histories.
8fa70d1300
...
50873cca8c
4 changed files with 131 additions and 117 deletions
|
@ -8,7 +8,7 @@ scrapeo "masivo" de precios y datos en supermercados argentinos
|
||||||
|
|
||||||
(no hace falta correrlos porque ya hay listas armadas en [data/](./data/))
|
(no hace falta correrlos porque ya hay listas armadas en [data/](./data/))
|
||||||
|
|
||||||
- [warcificator](./warcificator/) descarga las paginas de productos y genera un archivo [WARC](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.0/) con ellas
|
- se usa wget (potencialmente reemplazado por algo custom en el futuro) que genera un archivo [WARC](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.0/) con todas las paginas de productos
|
||||||
- el [scraper](./scraper/) procesa estos WARCs, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts))
|
- el [scraper](./scraper/) procesa estos WARCs, extrayendo varios datos y guardandolos en una base de datos SQLite (definida en [db-datos](./db-datos/schema.ts))
|
||||||
- el [sitio](./sitio/) renderiza páginas a partir de la base de datos y hace gráficos lindos
|
- el [sitio](./sitio/) renderiza páginas a partir de la base de datos y hace gráficos lindos
|
||||||
|
|
||||||
|
|
|
@ -8,27 +8,12 @@ RUN bun install --frozen-lockfile \
|
||||||
&& bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \
|
&& bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \
|
||||||
&& rm -rf node_modules/
|
&& rm -rf node_modules/
|
||||||
|
|
||||||
# https://dev.to/deciduously/use-multi-stage-docker-builds-for-statically-linked-rust-binaries-3jgd
|
|
||||||
FROM docker.io/rust:1.74 AS warcificator-builder
|
|
||||||
WORKDIR /usr/src/
|
|
||||||
RUN rustup target add x86_64-unknown-linux-musl
|
|
||||||
RUN apt-get update && apt-get install -y musl-tools musl-dev
|
|
||||||
|
|
||||||
RUN USER=root cargo new warcificator
|
|
||||||
WORKDIR /usr/src/warcificator
|
|
||||||
COPY ./warcificator/Cargo.toml ./warcificator/Cargo.lock ./
|
|
||||||
RUN cargo build --release
|
|
||||||
|
|
||||||
COPY ./warcificator/src ./src
|
|
||||||
RUN cargo install --target x86_64-unknown-linux-musl --path .
|
|
||||||
|
|
||||||
FROM base
|
FROM base
|
||||||
RUN apk add --no-cache wget zstd tini
|
RUN apk add --no-cache wget zstd tini
|
||||||
RUN printf "#!/bin/sh\nexec bun /bin/scraper auto\n" > /etc/periodic/daily/scraper \
|
RUN printf "#!/bin/sh\nexec bun /bin/scraper auto\n" > /etc/periodic/daily/scraper \
|
||||||
&& chmod +x /etc/periodic/daily/scraper
|
&& chmod +x /etc/periodic/daily/scraper
|
||||||
|
|
||||||
COPY --from=builder /tmp/cli.build.js /bin/scraper
|
COPY --from=builder /tmp/cli.build.js /bin/scraper
|
||||||
COPY --from=warcificator-builder /usr/local/cargo/bin/warcificator /bin/
|
|
||||||
COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle
|
COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle
|
||||||
COPY --from=builder /usr/src/app/data /listas
|
COPY --from=builder /usr/src/app/data /listas
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
|
@ -22,6 +22,9 @@ const supermercados: Supermercado[] = [
|
||||||
Supermercado.Dia,
|
Supermercado.Dia,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
// hacemos una cola para la compresión para no sobrecargar la CPU
|
||||||
|
const compressionQueue = new PQueue({ concurrency: 1 });
|
||||||
|
|
||||||
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
|
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
|
||||||
const scrapQueue = new PQueue({ concurrency: 1 });
|
const scrapQueue = new PQueue({ concurrency: 1 });
|
||||||
|
|
||||||
|
@ -74,7 +77,7 @@ class Auto {
|
||||||
}
|
}
|
||||||
|
|
||||||
async downloadList(supermercado: Supermercado) {
|
async downloadList(supermercado: Supermercado) {
|
||||||
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-download-"));
|
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-"));
|
||||||
|
|
||||||
let listPath: string;
|
let listPath: string;
|
||||||
{
|
{
|
||||||
|
@ -114,7 +117,15 @@ class Auto {
|
||||||
)}.warc.zst`;
|
)}.warc.zst`;
|
||||||
const zstdWarcPath = join(ctxPath, zstdWarcName);
|
const zstdWarcPath = join(ctxPath, zstdWarcName);
|
||||||
const subproc = Bun.spawn({
|
const subproc = Bun.spawn({
|
||||||
cmd: ["warcificator", listPath, zstdWarcPath],
|
cmd: [
|
||||||
|
"wget",
|
||||||
|
"--no-verbose",
|
||||||
|
"--tries=3",
|
||||||
|
"--delete-after",
|
||||||
|
"--input-file",
|
||||||
|
listPath,
|
||||||
|
`--warc-file=temp`,
|
||||||
|
],
|
||||||
stderr: "ignore",
|
stderr: "ignore",
|
||||||
stdout: "ignore",
|
stdout: "ignore",
|
||||||
cwd: ctxPath,
|
cwd: ctxPath,
|
||||||
|
@ -122,9 +133,18 @@ class Auto {
|
||||||
const t0 = performance.now();
|
const t0 = performance.now();
|
||||||
await subproc.exited;
|
await subproc.exited;
|
||||||
this.inform(
|
this.inform(
|
||||||
`[downloader] ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`
|
`[wget] ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const gzippedWarcPath = join(ctxPath, "temp.warc.gz");
|
||||||
|
if (!(await fileExists(gzippedWarcPath))) {
|
||||||
|
const err = this.report(`no encontré el ${gzippedWarcPath}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
|
await compressionQueue.add(() =>
|
||||||
|
this.recompress(gzippedWarcPath, zstdWarcPath)
|
||||||
|
);
|
||||||
if (!(await fileExists(zstdWarcPath))) {
|
if (!(await fileExists(zstdWarcPath))) {
|
||||||
const err = this.report(`no encontré el ${zstdWarcPath}`);
|
const err = this.report(`no encontré el ${zstdWarcPath}`);
|
||||||
throw err;
|
throw err;
|
||||||
|
@ -170,6 +190,49 @@ class Auto {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* toma un archivo gzippeado y lo recomprime con zstd.
|
||||||
|
* borra el archivo original.
|
||||||
|
*/
|
||||||
|
recompress(inputPath: string, outputPath: string) {
|
||||||
|
// XXX: por alguna razón no funciona en Bun 1.0.20
|
||||||
|
// const decompressor = Bun.spawn({
|
||||||
|
// cmd: ["gzip", "-dc", inputPath],
|
||||||
|
// stderr: "inherit",
|
||||||
|
// });
|
||||||
|
// const compressor = Bun.spawn({
|
||||||
|
// cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath],
|
||||||
|
// stdin: decompressor.stdout,
|
||||||
|
// // stderr: "inherit",
|
||||||
|
// });
|
||||||
|
// const errorCode = await compressor.exited;
|
||||||
|
// if (errorCode !== 0) {
|
||||||
|
// const err = report(`zstd threw error code ${errorCode}`);
|
||||||
|
// throw err;
|
||||||
|
// }
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const decompressor = spawn("gzip", ["-dc", inputPath], {
|
||||||
|
stdio: [null, "pipe", null],
|
||||||
|
});
|
||||||
|
const compressor = spawn(
|
||||||
|
"zstd",
|
||||||
|
["-T0", "-15", "--long", "-o", outputPath],
|
||||||
|
{
|
||||||
|
stdio: ["pipe", null, null],
|
||||||
|
}
|
||||||
|
);
|
||||||
|
decompressor.stdout.pipe(compressor.stdin);
|
||||||
|
compressor.on("close", (code) => {
|
||||||
|
if (code !== 0) {
|
||||||
|
const err = this.report(`zstd threw error code ${code}`);
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
resolve(void 0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async uploadToBucket({
|
async uploadToBucket({
|
||||||
fileName,
|
fileName,
|
||||||
file,
|
file,
|
||||||
|
@ -215,6 +278,7 @@ class Auto {
|
||||||
await fetch(url);
|
await fetch(url);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// await recompress("sqlite.db.gz", "sqlite.db.zst");
|
||||||
|
|
||||||
// no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists
|
// no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists
|
||||||
async function fileExists(path: string) {
|
async function fileExists(path: string) {
|
||||||
|
|
|
@ -1,24 +1,16 @@
|
||||||
use async_channel::{Receiver, Sender};
|
use async_channel::{Receiver, Sender};
|
||||||
use std::{
|
use std::{env::args, fs, io::stdout, net::SocketAddr};
|
||||||
env::args,
|
|
||||||
fs,
|
|
||||||
net::SocketAddr,
|
|
||||||
process::{Command, Stdio},
|
|
||||||
};
|
|
||||||
use tokio::io::{stderr, AsyncWriteExt};
|
|
||||||
use warc::{RecordBuilder, WarcHeader, WarcWriter};
|
use warc::{RecordBuilder, WarcHeader, WarcWriter};
|
||||||
|
|
||||||
struct FullExchange {
|
struct FullExchange {
|
||||||
socket_addr: Option<SocketAddr>,
|
socket_addr: SocketAddr,
|
||||||
request: http::Request<&'static str>,
|
request: http::Request<&'static str>,
|
||||||
response: http::Response<Vec<u8>>,
|
response: http::Response<Vec<u8>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let mut args = args().skip(1);
|
let links_list_path = args().skip(1).next().unwrap();
|
||||||
let links_list_path = args.next().unwrap();
|
|
||||||
let output_zstd_path = args.next().unwrap();
|
|
||||||
let links_str = fs::read_to_string(links_list_path).unwrap();
|
let links_str = fs::read_to_string(links_list_path).unwrap();
|
||||||
let links = links_str
|
let links = links_str
|
||||||
.split("\n")
|
.split("\n")
|
||||||
|
@ -26,7 +18,6 @@ async fn main() {
|
||||||
.filter(|s| s.len() > 0)
|
.filter(|s| s.len() > 0)
|
||||||
.map(|s| s.to_owned())
|
.map(|s| s.to_owned())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let handle = {
|
let handle = {
|
||||||
let (sender, receiver) = async_channel::bounded::<String>(1);
|
let (sender, receiver) = async_channel::bounded::<String>(1);
|
||||||
let (res_sender, res_receiver) = async_channel::unbounded::<FullExchange>();
|
let (res_sender, res_receiver) = async_channel::unbounded::<FullExchange>();
|
||||||
|
@ -38,7 +29,7 @@ async fn main() {
|
||||||
handles.push(tokio::spawn(worker(rx, tx)));
|
handles.push(tokio::spawn(worker(rx, tx)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let warc_writer_handle = tokio::spawn(warc_writer(res_receiver, output_zstd_path));
|
let warc_writer_handle = tokio::spawn(warc_writer(res_receiver));
|
||||||
|
|
||||||
for link in links {
|
for link in links {
|
||||||
sender.send_blocking(link).unwrap();
|
sender.send_blocking(link).unwrap();
|
||||||
|
@ -57,22 +48,6 @@ async fn main() {
|
||||||
async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) {
|
async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) {
|
||||||
let client = reqwest::ClientBuilder::default().build().unwrap();
|
let client = reqwest::ClientBuilder::default().build().unwrap();
|
||||||
while let Ok(url) = rx.recv().await {
|
while let Ok(url) = rx.recv().await {
|
||||||
let res = fetch(&client, url.clone()).await;
|
|
||||||
match res {
|
|
||||||
Ok(ex) => {
|
|
||||||
tx.send(ex).await.unwrap();
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
stderr()
|
|
||||||
.write_all(format!("Failed to fetch {}: {:#?}", url.as_str(), err).as_bytes())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, reqwest::Error> {
|
|
||||||
let request = client.get(url).build().unwrap();
|
let request = client.get(url).build().unwrap();
|
||||||
let mut http_request_builder = http::Request::builder()
|
let mut http_request_builder = http::Request::builder()
|
||||||
.method(request.method())
|
.method(request.method())
|
||||||
|
@ -80,9 +55,9 @@ async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, re
|
||||||
for (key, val) in request.headers() {
|
for (key, val) in request.headers() {
|
||||||
http_request_builder = http_request_builder.header(key, val);
|
http_request_builder = http_request_builder.header(key, val);
|
||||||
}
|
}
|
||||||
let response = client.execute(request).await?;
|
let response = client.execute(request).await.unwrap();
|
||||||
|
|
||||||
let ip_address = response.remote_addr();
|
let ip_address = response.remote_addr().unwrap();
|
||||||
|
|
||||||
let http_request = {
|
let http_request = {
|
||||||
http_request_builder
|
http_request_builder
|
||||||
|
@ -98,75 +73,65 @@ async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, re
|
||||||
for (key, val) in response.headers() {
|
for (key, val) in response.headers() {
|
||||||
http_response_builder = http_response_builder.header(key, val);
|
http_response_builder = http_response_builder.header(key, val);
|
||||||
}
|
}
|
||||||
let body = response.bytes().await?;
|
let body = response.bytes().await.unwrap();
|
||||||
http_response_builder.body(body.to_vec()).unwrap()
|
http_response_builder.body(body.to_vec()).unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(FullExchange {
|
tx.send(FullExchange {
|
||||||
socket_addr: ip_address,
|
socket_addr: ip_address,
|
||||||
request: http_request,
|
request: http_request,
|
||||||
response: http_response,
|
response: http_response,
|
||||||
})
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn warc_writer(rx: Receiver<FullExchange>, output_zstd_path: String) {
|
async fn warc_writer(rx: Receiver<FullExchange>) {
|
||||||
let zstd_proc = Command::new("zstd")
|
let mut writer = WarcWriter::new(stdout());
|
||||||
.args(&["-T0", "-15", "--long", "-o", &output_zstd_path])
|
let warc_fields = format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html");
|
||||||
.stdin(Stdio::piped())
|
|
||||||
.stderr(Stdio::null())
|
|
||||||
.stdout(Stdio::null())
|
|
||||||
.spawn()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut writer = WarcWriter::new(zstd_proc.stdin.unwrap());
|
|
||||||
writer
|
writer
|
||||||
.write(
|
.write(
|
||||||
&RecordBuilder::default()
|
&RecordBuilder::default()
|
||||||
.version("1.0".to_owned())
|
.version("1.0".to_owned())
|
||||||
.warc_type(warc::RecordType::WarcInfo)
|
.warc_type(warc::RecordType::WarcInfo)
|
||||||
.header(WarcHeader::ContentType, "application/warc-fields")
|
.header(WarcHeader::ContentType, "application/warc-fields")
|
||||||
.body(format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html").into())
|
.body(warc_fields.into())
|
||||||
.build()
|
.build()
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
while let Ok(res) = rx.recv().await {
|
while let Ok(res) = rx.recv().await {
|
||||||
let uri = res.request.uri().to_string();
|
let uri = res.request.uri().to_string();
|
||||||
let req_record = {
|
writer
|
||||||
let mut builder = RecordBuilder::default()
|
.write(
|
||||||
|
&RecordBuilder::default()
|
||||||
.version("1.0".to_owned())
|
.version("1.0".to_owned())
|
||||||
.warc_type(warc::RecordType::Request)
|
.warc_type(warc::RecordType::Request)
|
||||||
.header(WarcHeader::TargetURI, uri.clone())
|
.header(WarcHeader::TargetURI, uri.clone())
|
||||||
|
.header(WarcHeader::IPAddress, res.socket_addr.ip().to_string())
|
||||||
.header(WarcHeader::ContentType, "application/http;msgtype=request")
|
.header(WarcHeader::ContentType, "application/http;msgtype=request")
|
||||||
.header(
|
.header(
|
||||||
WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
|
WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
|
||||||
"the request contains other headers not included here",
|
"the request contains other headers not included here",
|
||||||
);
|
)
|
||||||
if let Some(addr) = res.socket_addr {
|
|
||||||
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
|
|
||||||
}
|
|
||||||
builder
|
|
||||||
.body(format_http11_request(res.request).into_bytes())
|
.body(format_http11_request(res.request).into_bytes())
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap(),
|
||||||
};
|
)
|
||||||
writer.write(&req_record).unwrap();
|
.unwrap();
|
||||||
writer
|
writer
|
||||||
.write(&{
|
.write(
|
||||||
let mut builder = RecordBuilder::default()
|
&RecordBuilder::default()
|
||||||
.version("1.0".to_owned())
|
.version("1.0".to_owned())
|
||||||
.warc_type(warc::RecordType::Response)
|
.warc_type(warc::RecordType::Response)
|
||||||
.header(WarcHeader::TargetURI, uri)
|
.header(WarcHeader::TargetURI, uri)
|
||||||
.header(WarcHeader::ConcurrentTo, req_record.warc_id())
|
.header(WarcHeader::IPAddress, res.socket_addr.ip().to_string())
|
||||||
.header(WarcHeader::ContentType, "application/http;msgtype=response");
|
.header(WarcHeader::ContentType, "application/http;msgtype=response")
|
||||||
if let Some(addr) = res.socket_addr {
|
|
||||||
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
|
|
||||||
}
|
|
||||||
builder
|
|
||||||
.body(format_http11_response(res.response))
|
.body(format_http11_response(res.response))
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap(),
|
||||||
})
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue