mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-26 03:26:19 +00:00
Compare commits
No commits in common. "26b9f4b17f1494f3a390e3f0e406b0571fd4310f" and "6853b6389a9ccdd7ab9e8fecb7d3073b93793082" have entirely different histories.
26b9f4b17f
...
6853b6389a
10 changed files with 201 additions and 479 deletions
12
.github/workflows/container.yml
vendored
12
.github/workflows/container.yml
vendored
|
@ -62,26 +62,24 @@ jobs:
|
|||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
- name: Log in to the Container registry
|
||||
uses: docker/login-action@v3
|
||||
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5
|
||||
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/scraper
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
|
||||
with:
|
||||
context: .
|
||||
file: Dockerfile.scraper
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache
|
||||
cache-to: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache,mode=max
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
|
|
@ -1,7 +1,14 @@
|
|||
FROM cgr.dev/chainguard/wolfi-base AS base
|
||||
WORKDIR /usr/src/app
|
||||
RUN apk add --no-cache libgcc
|
||||
RUN apk add --no-cache bun libgcc
|
||||
|
||||
FROM base as build
|
||||
ENV NODE_ENV=production
|
||||
COPY . .
|
||||
RUN bun install --frozen-lockfile
|
||||
RUN bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js
|
||||
|
||||
# nightly porque usamos tl con `simd` activado
|
||||
FROM base as rs-build
|
||||
RUN apk add --no-cache rust build-base sqlite-dev
|
||||
|
||||
|
@ -12,8 +19,11 @@ FROM base
|
|||
RUN apk add --no-cache sqlite sqlite-libs
|
||||
|
||||
# Scraper
|
||||
COPY --from=build /tmp/cli.build.js /bin/scraper
|
||||
COPY --from=build /usr/src/app/db-datos/drizzle /bin/drizzle
|
||||
COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs
|
||||
|
||||
ENV NODE_ENV=production
|
||||
ENV DB_PATH=/db/db.db
|
||||
|
||||
CMD ["scraper-rs", "cron"]
|
||||
CMD ["bun", "/bin/scraper", "cron"]
|
||||
|
|
157
scraper-rs/Cargo.lock
generated
157
scraper-rs/Cargo.lock
generated
|
@ -115,6 +115,19 @@ version = "1.0.79"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"event-listener-strategy",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.5"
|
||||
|
@ -129,17 +142,6 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.77"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.1.0"
|
||||
|
@ -273,6 +275,15 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.9.4"
|
||||
|
@ -299,44 +310,12 @@ dependencies = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool"
|
||||
version = "0.10.0"
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
|
||||
checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"deadpool-runtime",
|
||||
"num_cpus",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-runtime"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
|
||||
dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-sqlite"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8010e36e12f3be22543a5e478b4af20aeead9a700dd69581a5e050a070fc22c"
|
||||
dependencies = [
|
||||
"deadpool",
|
||||
"deadpool-sync",
|
||||
"rusqlite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-sync"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8db70494c13cae4ce67b4b4dafdaf828cf0df7237ab5b9e2fcabee4965d0a0a"
|
||||
dependencies = [
|
||||
"deadpool-runtime",
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -360,6 +339,27 @@ version = "1.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "4.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84f2cdcf274580f2d63697192d744727b3198894b1bf02923643bf59e2c26712"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener-strategy"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fallible-iterator"
|
||||
version = "0.3.0"
|
||||
|
@ -671,15 +671,6 @@ version = "2.9.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.10"
|
||||
|
@ -815,6 +806,12 @@ version = "0.1.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.2"
|
||||
|
@ -911,6 +908,28 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2_sqlite"
|
||||
version = "0.23.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6"
|
||||
dependencies = [
|
||||
"r2d2",
|
||||
"rusqlite",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
|
@ -1114,6 +1133,15 @@ version = "1.0.16"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
|
@ -1126,12 +1154,11 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"again",
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"clap",
|
||||
"deadpool",
|
||||
"deadpool-sqlite",
|
||||
"futures",
|
||||
"itertools",
|
||||
"nanoid",
|
||||
"r2d2",
|
||||
"r2d2_sqlite",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"rusqlite",
|
||||
|
@ -1520,6 +1547,16 @@ version = "0.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
|
||||
dependencies = [
|
||||
"getrandom 0.2.11",
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -8,13 +8,13 @@ edition = "2021"
|
|||
[dependencies]
|
||||
again = "0.1.2"
|
||||
anyhow = "1.0.79"
|
||||
async-channel = "2.1.1"
|
||||
clap = { version = "4.4.15", features = ["derive"] }
|
||||
deadpool = "0.10.0"
|
||||
deadpool-sqlite = "0.7.0"
|
||||
futures = "0.3.30"
|
||||
itertools = "0.12.0"
|
||||
nanoid = "0.4.0"
|
||||
r2d2 = "0.8.10"
|
||||
r2d2_sqlite = "0.23.0"
|
||||
rand = "0.8.5"
|
||||
# lol_html = "1.2.0"
|
||||
reqwest = { version = "0.11.23", default-features = false, features = [
|
||||
"rustls-tls",
|
||||
"gzip",
|
||||
|
@ -22,6 +22,7 @@ reqwest = { version = "0.11.23", default-features = false, features = [
|
|||
"socks",
|
||||
] }
|
||||
rusqlite = "0.30.0"
|
||||
# scraper = "0.18.1"
|
||||
serde = { version = "1.0.193", features = ["derive"] }
|
||||
serde_json = "1.0.109"
|
||||
simple-error = "0.3.0"
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use again::RetryPolicy;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use deadpool_sqlite::Pool;
|
||||
use futures::{future, stream, StreamExt};
|
||||
use async_channel::Receiver;
|
||||
use clap::Parser;
|
||||
use nanoid::nanoid;
|
||||
use r2d2::Pool;
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use reqwest::{StatusCode, Url};
|
||||
use simple_error::{bail, SimpleError};
|
||||
use std::{
|
||||
|
@ -12,23 +13,11 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::time;
|
||||
|
||||
#[derive(ValueEnum, Clone, Debug)]
|
||||
enum Supermercado {
|
||||
Dia,
|
||||
Jumbo,
|
||||
Carrefour,
|
||||
Coto,
|
||||
}
|
||||
|
||||
#[derive(Parser)] // requires `derive` feature
|
||||
enum Args {
|
||||
FetchList(FetchListArgs),
|
||||
ParseFile(ParseFileArgs),
|
||||
GetUrlList(GetUrlListArgs),
|
||||
Auto(AutoArgs),
|
||||
Cron(AutoArgs),
|
||||
}
|
||||
#[derive(clap::Args)]
|
||||
struct FetchListArgs {
|
||||
|
@ -38,13 +27,6 @@ struct FetchListArgs {
|
|||
struct ParseFileArgs {
|
||||
file_path: String,
|
||||
}
|
||||
#[derive(clap::Args)]
|
||||
struct GetUrlListArgs {
|
||||
#[arg(value_enum)]
|
||||
supermercado: Supermercado,
|
||||
}
|
||||
#[derive(clap::Args)]
|
||||
struct AutoArgs {}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
|
@ -53,9 +35,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
match Args::parse() {
|
||||
Args::FetchList(a) => fetch_list_cli(a.list_path).await,
|
||||
Args::ParseFile(a) => parse_file_cli(a.file_path).await,
|
||||
Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await,
|
||||
Args::Auto(_) => auto_cli().await,
|
||||
Args::Cron(_) => cron_cli().await,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,47 +47,40 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
|
|||
.map(|s| s.to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let pool = connect_db();
|
||||
let counters = fetch_list(&pool, links).await;
|
||||
let (sender, receiver) = async_channel::bounded::<String>(1);
|
||||
|
||||
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
|
||||
let manager = SqliteConnectionManager::file(db_path);
|
||||
let pool = Pool::new(manager).unwrap();
|
||||
|
||||
let n_coroutines = env::var("N_COROUTINES")
|
||||
.map_or(Ok(128), |s| s.parse::<usize>())
|
||||
.expect("N_COROUTINES no es un número");
|
||||
let handles = (1..n_coroutines)
|
||||
.map(|_| {
|
||||
let rx = receiver.clone();
|
||||
let pool = pool.clone();
|
||||
tokio::spawn(worker(rx, pool))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for link in links {
|
||||
sender.send_blocking(link).unwrap();
|
||||
}
|
||||
sender.close();
|
||||
|
||||
let mut counters = Counters::default();
|
||||
for handle in handles {
|
||||
let c = handle.await.unwrap();
|
||||
counters.success += c.success;
|
||||
counters.errored += c.errored;
|
||||
counters.skipped += c.skipped;
|
||||
}
|
||||
|
||||
println!("Finished: {:?}", counters);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
|
||||
let n_coroutines = env::var("N_COROUTINES")
|
||||
.map_or(Ok(24), |s| s.parse::<usize>())
|
||||
.expect("N_COROUTINES no es un número");
|
||||
|
||||
let client = build_client();
|
||||
|
||||
stream::iter(links)
|
||||
.map(|url| {
|
||||
let pool = pool.clone();
|
||||
let client = client.clone();
|
||||
tokio::spawn(fetch_and_save(client, url, pool))
|
||||
})
|
||||
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
|
||||
.boxed()
|
||||
.buffer_unordered(n_coroutines)
|
||||
.fold(Counters::default(), move |x, y| {
|
||||
let ret = y.unwrap();
|
||||
future::ready(Counters {
|
||||
success: x.success + ret.success,
|
||||
errored: x.errored + ret.errored,
|
||||
skipped: x.skipped + ret.skipped,
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
fn connect_db() -> Pool {
|
||||
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
|
||||
let cfg = deadpool_sqlite::Config::new(db_path);
|
||||
let pool = cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap();
|
||||
pool
|
||||
}
|
||||
|
||||
fn build_client() -> reqwest::Client {
|
||||
reqwest::ClientBuilder::default().build().unwrap()
|
||||
}
|
||||
|
@ -120,15 +92,16 @@ struct Counters {
|
|||
skipped: u64,
|
||||
}
|
||||
|
||||
async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters {
|
||||
let res = fetch_and_parse(&client, url.clone()).await;
|
||||
async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Counters {
|
||||
let client = build_client();
|
||||
|
||||
let mut counters = Counters::default();
|
||||
while let Ok(url) = rx.recv().await {
|
||||
let res = fetch_and_parse(&client, url.clone()).await;
|
||||
match res {
|
||||
Ok(res) => {
|
||||
counters.success += 1;
|
||||
pool.get().await.unwrap().interact(move |conn| conn.execute(
|
||||
"INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
|
||||
rusqlite::params![
|
||||
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![
|
||||
res.ean,
|
||||
res.fetched_at,
|
||||
res.precio_centavos,
|
||||
|
@ -138,21 +111,19 @@ async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Cou
|
|||
res.parser_version,
|
||||
res.name,
|
||||
res.image_url,
|
||||
]
|
||||
)).await.unwrap().unwrap();
|
||||
]).unwrap();
|
||||
}
|
||||
Err(err) => {
|
||||
match err.downcast_ref::<FetchError>() {
|
||||
Some(FetchError::Http(e)) => match e.status() {
|
||||
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
|
||||
_ => counters.errored += 1,
|
||||
},
|
||||
Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1,
|
||||
_ => counters.errored += 1,
|
||||
}
|
||||
|
||||
tracing::error!(error=%err, url=url);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
counters
|
||||
}
|
||||
|
||||
|
@ -160,39 +131,34 @@ async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Cou
|
|||
enum FetchError {
|
||||
#[error("reqwest error")]
|
||||
Http(#[from] reqwest::Error),
|
||||
#[error("http status: {0}")]
|
||||
HttpStatus(reqwest::StatusCode),
|
||||
#[error("parse error")]
|
||||
Parse(#[from] SimpleError),
|
||||
#[error("tl error")]
|
||||
Tl(#[from] tl::ParseError),
|
||||
}
|
||||
|
||||
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
|
||||
let request = client.get(url).build()?;
|
||||
let response = client.execute(request).await?.error_for_status()?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn get_retry_policy() -> again::RetryPolicy {
|
||||
RetryPolicy::exponential(Duration::from_millis(300))
|
||||
.with_max_retries(10)
|
||||
.with_jitter(true)
|
||||
}
|
||||
|
||||
pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool {
|
||||
!err.status().is_some_and(|s| s == StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(client))]
|
||||
async fn fetch_and_parse(
|
||||
client: &reqwest::Client,
|
||||
url: String,
|
||||
) -> Result<PrecioPoint, anyhow::Error> {
|
||||
let body = get_retry_policy()
|
||||
.retry_if(|| do_request(client, &url), retry_if_wasnt_not_found)
|
||||
.await?
|
||||
.text()
|
||||
let policy = RetryPolicy::exponential(Duration::from_millis(300))
|
||||
.with_max_retries(10)
|
||||
.with_jitter(true);
|
||||
|
||||
let response = policy
|
||||
.retry(|| {
|
||||
let request = client.get(url.as_str()).build().unwrap();
|
||||
client.execute(request)
|
||||
})
|
||||
.await
|
||||
.map_err(FetchError::Http)?;
|
||||
if !response.status().is_success() {
|
||||
bail!(FetchError::HttpStatus(response.status()));
|
||||
}
|
||||
let body = response.text().await.map_err(FetchError::Http)?;
|
||||
|
||||
let maybe_point = { scrap_url(client, url, &body).await };
|
||||
|
||||
|
@ -221,7 +187,8 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
|
|||
dom.query_selector("link[rel=\"canonical\"]")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.find_map(|n| n.as_tag())
|
||||
.filter_map(|n| n.as_tag())
|
||||
.next()
|
||||
.and_then(|t| t.attributes().get("href").flatten())
|
||||
.expect("No meta canonical")
|
||||
.as_utf8_str()
|
||||
|
@ -233,24 +200,6 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
|
||||
let urls = get_urls(&supermercado).await?;
|
||||
urls.iter().for_each(|s| {
|
||||
println!("{}", s);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
|
||||
Ok(match supermercado {
|
||||
Supermercado::Dia => sites::dia::get_urls().await?,
|
||||
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
|
||||
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
|
||||
Supermercado::Coto => sites::coto::get_urls().await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn scrap_url(
|
||||
client: &reqwest::Client,
|
||||
url: String,
|
||||
|
@ -272,119 +221,6 @@ async fn scrap_url(
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Auto {
|
||||
pool: Pool,
|
||||
telegram_token: String,
|
||||
telegram_chat_id: String,
|
||||
}
|
||||
impl Auto {
|
||||
async fn download_supermercado(self: Self, supermercado: Supermercado) -> anyhow::Result<()> {
|
||||
{
|
||||
let t0 = now_sec();
|
||||
self.get_and_save_urls(&supermercado).await?;
|
||||
self.inform(&format!(
|
||||
"Downloaded url list {:?} (took {})",
|
||||
&supermercado,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await;
|
||||
}
|
||||
let links: Vec<String> = {
|
||||
self.pool
|
||||
.get()
|
||||
.await?
|
||||
.interact(|conn| -> anyhow::Result<Vec<String>> {
|
||||
Ok(conn
|
||||
.prepare(r#"SELECT url FROM producto_urls;"#)?
|
||||
.query_map([], |r| r.get::<_, String>(0))?
|
||||
.map(|r| r.unwrap())
|
||||
.collect())
|
||||
})
|
||||
.await
|
||||
.unwrap()?
|
||||
};
|
||||
{
|
||||
let t0 = now_sec();
|
||||
let counters = fetch_list(&self.pool, links).await;
|
||||
self.inform(&format!(
|
||||
"Downloaded {:?}: {:?} (took {})",
|
||||
&supermercado,
|
||||
counters,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> {
|
||||
let urls = get_urls(supermercado).await?;
|
||||
self.pool
|
||||
.get()
|
||||
.await?
|
||||
.interact(|conn| -> Result<(), anyhow::Error> {
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
let mut stmt = tx.prepare(
|
||||
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
|
||||
VALUES (?1, ?2, ?2)
|
||||
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
|
||||
)?;
|
||||
let now: u64 = now_ms().try_into()?;
|
||||
for url in urls {
|
||||
stmt.execute(rusqlite::params![url, now])?;
|
||||
}
|
||||
}
|
||||
tx.commit()?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn inform(self: &Self, msg: &str) {
|
||||
println!("{}", msg);
|
||||
let u = Url::parse_with_params(
|
||||
&format!(
|
||||
"https://api.telegram.org/bot{}/sendMessage",
|
||||
self.telegram_token
|
||||
),
|
||||
&[
|
||||
("chat_id", self.telegram_chat_id.clone()),
|
||||
("text", msg.to_string()),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
reqwest::get(u).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn auto_cli() -> anyhow::Result<()> {
|
||||
let db = connect_db();
|
||||
let auto = Auto {
|
||||
pool: db,
|
||||
telegram_token: env::var("TELEGRAM_BOT_TOKEN")?,
|
||||
telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?,
|
||||
};
|
||||
auto.inform("[auto] Empezando scrap").await;
|
||||
let handles: Vec<_> = Supermercado::value_variants()
|
||||
.iter()
|
||||
.map(|s| tokio::spawn(auto.clone().download_supermercado(s.to_owned())))
|
||||
.collect();
|
||||
future::try_join_all(handles).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn cron_cli() -> anyhow::Result<()> {
|
||||
let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
tokio::spawn(auto_cli());
|
||||
}
|
||||
}
|
||||
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
mod sites;
|
||||
|
@ -403,14 +239,9 @@ struct PrecioPoint {
|
|||
}
|
||||
|
||||
fn now_sec() -> u64 {
|
||||
since_the_epoch().as_secs()
|
||||
}
|
||||
fn now_ms() -> u128 {
|
||||
since_the_epoch().as_millis()
|
||||
}
|
||||
|
||||
fn since_the_epoch() -> Duration {
|
||||
SystemTime::now()
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.expect("Time went backwards");
|
||||
since_the_epoch.as_secs()
|
||||
}
|
||||
|
|
|
@ -66,19 +66,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
url,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
||||
let urls = vec![
|
||||
"https://www.carrefour.com.ar/sitemap/product-0.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-1.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-2.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-3.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-4.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-5.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-6.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-7.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-8.xml",
|
||||
"https://www.carrefour.com.ar/sitemap/product-9.xml",
|
||||
];
|
||||
vtex::get_urls_from_sitemap(urls).await
|
||||
}
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
use anyhow::{anyhow, Context};
|
||||
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use reqwest::Url;
|
||||
use anyhow::Context;
|
||||
|
||||
use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found, PrecioPoint};
|
||||
use crate::PrecioPoint;
|
||||
|
||||
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
|
||||
let ean = dom
|
||||
|
@ -27,7 +24,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
.query_selector(".atg_store_newPrice")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.find_map(|n| n.as_tag())
|
||||
.filter_map(|n| n.as_tag())
|
||||
.next()
|
||||
.map(|t| t.inner_text(dom.parser()))
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| {
|
||||
|
@ -43,7 +41,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
dom.query_selector(".product_not_available")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.find_map(|n| n.as_tag())
|
||||
.filter_map(|n| n.as_tag())
|
||||
.next()
|
||||
.is_some(),
|
||||
);
|
||||
|
||||
|
@ -51,7 +50,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
.query_selector("h1.product_page")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.find_map(|n| n.as_tag())
|
||||
.filter_map(|n| n.as_tag())
|
||||
.next()
|
||||
.map(|t| t.inner_text(dom.parser()))
|
||||
.map(|s| s.trim().to_string());
|
||||
|
||||
|
@ -59,7 +59,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
.query_selector(".zoomImage1")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.find_map(|n| n.as_tag())
|
||||
.filter_map(|n| n.as_tag())
|
||||
.next()
|
||||
.and_then(|t| t.attributes().get("src").flatten())
|
||||
.map(|s| s.as_utf8_str().to_string());
|
||||
|
||||
|
@ -74,64 +75,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
url,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
||||
let client = build_client();
|
||||
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
|
||||
|
||||
let page_size = 100;
|
||||
let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size)
|
||||
.map(|i| {
|
||||
let mut u = initial.clone();
|
||||
u.query_pairs_mut()
|
||||
.append_pair("No", &(i * page_size).to_string())
|
||||
.append_pair("Nrpp", &(page_size).to_string())
|
||||
.finish();
|
||||
let client = &client;
|
||||
async move {
|
||||
let text = get_retry_policy()
|
||||
.retry_if(
|
||||
|| do_request(client, u.as_str()).and_then(|r| r.text()),
|
||||
retry_if_wasnt_not_found,
|
||||
)
|
||||
.await?;
|
||||
let dom = tl::parse(&text, tl::ParserOptions::default())?;
|
||||
|
||||
let list: Vec<String> = dom
|
||||
.query_selector(".product_info_container")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.filter_map(|n| n.as_tag())
|
||||
.filter_map(|t| -> Option<anyhow::Result<String>> {
|
||||
t.children()
|
||||
.top()
|
||||
.iter()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.filter_map(|n| n.as_tag())
|
||||
.find(|t| t.name() == "a")
|
||||
.map(|t| {
|
||||
t.attributes()
|
||||
.get("href")
|
||||
.flatten()
|
||||
.ok_or(anyhow!("No tiene href="))
|
||||
})
|
||||
.map(|s| {
|
||||
Ok(Url::options()
|
||||
.base_url(Some(&Url::parse("https://www.cotodigital3.com.ar")?))
|
||||
.parse(s?.as_utf8_str().as_ref())?
|
||||
.to_string())
|
||||
})
|
||||
})
|
||||
.try_collect()?;
|
||||
Ok::<Vec<String>, anyhow::Error>(list)
|
||||
}
|
||||
})
|
||||
.buffer_unordered(8)
|
||||
.try_collect()
|
||||
.await?;
|
||||
let mut total: Vec<String> = vec![];
|
||||
for mut urls in handles {
|
||||
total.append(&mut urls);
|
||||
}
|
||||
Ok(total.into_iter().unique().collect())
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ use simple_error::bail;
|
|||
use crate::sites::common;
|
||||
use crate::PrecioPoint;
|
||||
|
||||
use super::vtex;
|
||||
use super::vtex::find_product_ld;
|
||||
use super::vtex::AvailabilityLd;
|
||||
|
||||
|
@ -40,14 +39,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
url,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
||||
let urls = vec![
|
||||
"https://diaonline.supermercadosdia.com.ar/sitemap/product-1.xml",
|
||||
"https://diaonline.supermercadosdia.com.ar/sitemap/product-2.xml",
|
||||
"https://diaonline.supermercadosdia.com.ar/sitemap/product-3.xml",
|
||||
"https://diaonline.supermercadosdia.com.ar/sitemap/product-4.xml",
|
||||
"https://diaonline.supermercadosdia.com.ar/sitemap/product-5.xml",
|
||||
];
|
||||
vtex::get_urls_from_sitemap(urls).await
|
||||
}
|
||||
|
|
|
@ -90,25 +90,3 @@ pub async fn scrap(
|
|||
url,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
||||
// de https://www.jumbo.com.ar/sitemap.xml
|
||||
let urls = vec![
|
||||
"https://www.jumbo.com.ar/sitemap/product-1.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-10.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-11.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-12.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-13.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-14.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-15.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-2.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-3.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-4.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-5.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-6.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-7.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-8.xml",
|
||||
"https://www.jumbo.com.ar/sitemap/product-9.xml",
|
||||
];
|
||||
vtex::get_urls_from_sitemap(urls).await
|
||||
}
|
||||
|
|
|
@ -1,12 +1,8 @@
|
|||
use anyhow::{bail, Context};
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use serde::Deserialize;
|
||||
use simple_error::SimpleError;
|
||||
use tl::VDom;
|
||||
|
||||
use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found};
|
||||
|
||||
use super::common;
|
||||
|
||||
pub fn parse_script_json(dom: &VDom, varname: &str) -> Result<serde_json::Value, anyhow::Error> {
|
||||
|
@ -104,44 +100,3 @@ pub fn in_stock_from_meta(dom: &VDom) -> anyhow::Result<bool> {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn parse_urls_from_sitemap(sitemap: &str) -> anyhow::Result<Vec<String>> {
|
||||
let dom = tl::parse(sitemap, tl::ParserOptions::default())?;
|
||||
Ok(dom
|
||||
.query_selector("loc")
|
||||
.unwrap()
|
||||
.filter_map(|h| h.get(dom.parser()))
|
||||
.filter_map(|n| n.as_tag())
|
||||
.map(|t| t.inner_text(dom.parser()))
|
||||
.map(|s| s.to_string())
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn get_urls_from_sitemap(sitemaps: Vec<&str>) -> anyhow::Result<Vec<String>> {
|
||||
let mut total: Vec<String> = vec![];
|
||||
let client = build_client();
|
||||
let handles = stream::iter(sitemaps)
|
||||
.map(|url| {
|
||||
let client = client.clone();
|
||||
let url = url.to_string();
|
||||
async move {
|
||||
let client = client;
|
||||
let url = url;
|
||||
let text = get_retry_policy()
|
||||
.retry_if(|| do_request(&client, &url), retry_if_wasnt_not_found)
|
||||
.await?
|
||||
.text()
|
||||
.await?;
|
||||
parse_urls_from_sitemap(&text)
|
||||
}
|
||||
})
|
||||
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
|
||||
.boxed()
|
||||
.buffer_unordered(8)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
for mut urls in handles {
|
||||
total.append(&mut urls);
|
||||
}
|
||||
Ok(total.into_iter().unique().collect())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue