Compare commits

..

15 commits

Author SHA1 Message Date
26b9f4b17f no reintentar si no se encontró 2024-01-12 17:24:00 -03:00
a697f5cb0c no usar async channel 2024-01-12 17:12:07 -03:00
f256b3bc73 el primer tick es instantaneo 2024-01-12 17:05:59 -03:00
028dc30606 borrar r2d2, usar deadpool
debería funcionar bien con Tokio
2024-01-12 17:00:01 -03:00
dbb149aef3 spawnear en vez de correr inline 2024-01-12 16:28:44 -03:00
6b315f9af2 probemos así 2024-01-12 16:21:20 -03:00
75b2297a07 wtf 2024-01-12 13:51:32 -03:00
0144a56158 rustificar todo 2024-01-12 13:01:27 -03:00
d233dbd259 limpiar código 2024-01-12 10:52:34 -03:00
c1111b112e ci: probar de nuevo 2024-01-12 10:35:54 -03:00
6b04dee2e4 paralelizar 2024-01-12 10:06:44 -03:00
ba484709f8 scraper-rs: WIP: fetchear urls 2024-01-12 09:47:56 -03:00
7806c0ba6f no unwrappear al crear request
crashea en urls invalidas
2024-01-12 09:14:28 -03:00
80462d02bb ci: reintentar 2024-01-12 09:02:59 -03:00
e09ed7bedf ci: probar registry cache 2024-01-12 00:27:06 -03:00
10 changed files with 480 additions and 202 deletions

View file

@ -62,24 +62,26 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry - name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 uses: docker/login-action@v3
with: with:
registry: ${{ env.REGISTRY }} registry: ${{ env.REGISTRY }}
username: ${{ github.actor }} username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker - name: Extract metadata (tags, labels) for Docker
id: meta id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 uses: docker/metadata-action@v5
with: with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/scraper images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/scraper
- name: Build and push Docker image - name: Build and push Docker image
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 uses: docker/build-push-action@v5
with: with:
context: . context: .
file: Dockerfile.scraper file: Dockerfile.scraper
push: true push: true
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache
cache-to: type=gha,mode=max cache-to: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache,mode=max

View file

@ -1,14 +1,7 @@
FROM cgr.dev/chainguard/wolfi-base AS base FROM cgr.dev/chainguard/wolfi-base AS base
WORKDIR /usr/src/app WORKDIR /usr/src/app
RUN apk add --no-cache bun libgcc RUN apk add --no-cache libgcc
FROM base as build
ENV NODE_ENV=production
COPY . .
RUN bun install --frozen-lockfile
RUN bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js
# nightly porque usamos tl con `simd` activado
FROM base as rs-build FROM base as rs-build
RUN apk add --no-cache rust build-base sqlite-dev RUN apk add --no-cache rust build-base sqlite-dev
@ -19,11 +12,8 @@ FROM base
RUN apk add --no-cache sqlite sqlite-libs RUN apk add --no-cache sqlite sqlite-libs
# Scraper # Scraper
COPY --from=build /tmp/cli.build.js /bin/scraper
COPY --from=build /usr/src/app/db-datos/drizzle /bin/drizzle
COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs
ENV NODE_ENV=production
ENV DB_PATH=/db/db.db ENV DB_PATH=/db/db.db
CMD ["bun", "/bin/scraper", "cron"] CMD ["scraper-rs", "cron"]

157
scraper-rs/Cargo.lock generated
View file

@ -115,19 +115,6 @@ version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.4.5" version = "0.4.5"
@ -142,6 +129,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-trait"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -275,15 +273,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "concurrent-queue"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -310,12 +299,44 @@ dependencies = [
] ]
[[package]] [[package]]
name = "crossbeam-utils" name = "deadpool"
version = "0.8.18" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [ dependencies = [
"cfg-if", "async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]]
name = "deadpool-sqlite"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8010e36e12f3be22543a5e478b4af20aeead9a700dd69581a5e050a070fc22c"
dependencies = [
"deadpool",
"deadpool-sync",
"rusqlite",
]
[[package]]
name = "deadpool-sync"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8db70494c13cae4ce67b4b4dafdaf828cf0df7237ab5b9e2fcabee4965d0a0a"
dependencies = [
"deadpool-runtime",
] ]
[[package]] [[package]]
@ -339,27 +360,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "event-listener"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f2cdcf274580f2d63697192d744727b3198894b1bf02923643bf59e2c26712"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.3.0" version = "0.3.0"
@ -671,6 +671,15 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.10" version = "1.0.10"
@ -806,12 +815,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -908,28 +911,6 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.7.3" version = "0.7.3"
@ -1133,15 +1114,6 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -1154,11 +1126,12 @@ version = "0.1.0"
dependencies = [ dependencies = [
"again", "again",
"anyhow", "anyhow",
"async-channel",
"clap", "clap",
"deadpool",
"deadpool-sqlite",
"futures",
"itertools",
"nanoid", "nanoid",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"rusqlite", "rusqlite",
@ -1547,16 +1520,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom 0.2.11",
"rand 0.8.5",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View file

@ -8,13 +8,13 @@ edition = "2021"
[dependencies] [dependencies]
again = "0.1.2" again = "0.1.2"
anyhow = "1.0.79" anyhow = "1.0.79"
async-channel = "2.1.1"
clap = { version = "4.4.15", features = ["derive"] } clap = { version = "4.4.15", features = ["derive"] }
deadpool = "0.10.0"
deadpool-sqlite = "0.7.0"
futures = "0.3.30"
itertools = "0.12.0"
nanoid = "0.4.0" nanoid = "0.4.0"
r2d2 = "0.8.10"
r2d2_sqlite = "0.23.0"
rand = "0.8.5" rand = "0.8.5"
# lol_html = "1.2.0"
reqwest = { version = "0.11.23", default-features = false, features = [ reqwest = { version = "0.11.23", default-features = false, features = [
"rustls-tls", "rustls-tls",
"gzip", "gzip",
@ -22,7 +22,6 @@ reqwest = { version = "0.11.23", default-features = false, features = [
"socks", "socks",
] } ] }
rusqlite = "0.30.0" rusqlite = "0.30.0"
# scraper = "0.18.1"
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.109" serde_json = "1.0.109"
simple-error = "0.3.0" simple-error = "0.3.0"

View file

@ -1,9 +1,8 @@
use again::RetryPolicy; use again::RetryPolicy;
use async_channel::Receiver; use clap::{Parser, ValueEnum};
use clap::Parser; use deadpool_sqlite::Pool;
use futures::{future, stream, StreamExt};
use nanoid::nanoid; use nanoid::nanoid;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use reqwest::{StatusCode, Url}; use reqwest::{StatusCode, Url};
use simple_error::{bail, SimpleError}; use simple_error::{bail, SimpleError};
use std::{ use std::{
@ -13,11 +12,23 @@ use std::{
time::Duration, time::Duration,
}; };
use thiserror::Error; use thiserror::Error;
use tokio::time;
#[derive(ValueEnum, Clone, Debug)]
enum Supermercado {
Dia,
Jumbo,
Carrefour,
Coto,
}
#[derive(Parser)] // requires `derive` feature #[derive(Parser)] // requires `derive` feature
enum Args { enum Args {
FetchList(FetchListArgs), FetchList(FetchListArgs),
ParseFile(ParseFileArgs), ParseFile(ParseFileArgs),
GetUrlList(GetUrlListArgs),
Auto(AutoArgs),
Cron(AutoArgs),
} }
#[derive(clap::Args)] #[derive(clap::Args)]
struct FetchListArgs { struct FetchListArgs {
@ -27,6 +38,13 @@ struct FetchListArgs {
struct ParseFileArgs { struct ParseFileArgs {
file_path: String, file_path: String,
} }
#[derive(clap::Args)]
struct GetUrlListArgs {
#[arg(value_enum)]
supermercado: Supermercado,
}
#[derive(clap::Args)]
struct AutoArgs {}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -35,6 +53,9 @@ async fn main() -> anyhow::Result<()> {
match Args::parse() { match Args::parse() {
Args::FetchList(a) => fetch_list_cli(a.list_path).await, Args::FetchList(a) => fetch_list_cli(a.list_path).await,
Args::ParseFile(a) => parse_file_cli(a.file_path).await, Args::ParseFile(a) => parse_file_cli(a.file_path).await,
Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await,
Args::Auto(_) => auto_cli().await,
Args::Cron(_) => cron_cli().await,
} }
} }
@ -47,40 +68,47 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let (sender, receiver) = async_channel::bounded::<String>(1); let pool = connect_db();
let counters = fetch_list(&pool, links).await;
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::new(manager).unwrap();
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(128), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let handles = (1..n_coroutines)
.map(|_| {
let rx = receiver.clone();
let pool = pool.clone();
tokio::spawn(worker(rx, pool))
})
.collect::<Vec<_>>();
for link in links {
sender.send_blocking(link).unwrap();
}
sender.close();
let mut counters = Counters::default();
for handle in handles {
let c = handle.await.unwrap();
counters.success += c.success;
counters.errored += c.errored;
counters.skipped += c.skipped;
}
println!("Finished: {:?}", counters); println!("Finished: {:?}", counters);
Ok(()) Ok(())
} }
async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let client = build_client();
stream::iter(links)
.map(|url| {
let pool = pool.clone();
let client = client.clone();
tokio::spawn(fetch_and_save(client, url, pool))
})
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| {
let ret = y.unwrap();
future::ready(Counters {
success: x.success + ret.success,
errored: x.errored + ret.errored,
skipped: x.skipped + ret.skipped,
})
})
.await
}
fn connect_db() -> Pool {
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let cfg = deadpool_sqlite::Config::new(db_path);
let pool = cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap();
pool
}
fn build_client() -> reqwest::Client { fn build_client() -> reqwest::Client {
reqwest::ClientBuilder::default().build().unwrap() reqwest::ClientBuilder::default().build().unwrap()
} }
@ -92,16 +120,15 @@ struct Counters {
skipped: u64, skipped: u64,
} }
async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Counters { async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters {
let client = build_client();
let mut counters = Counters::default();
while let Ok(url) = rx.recv().await {
let res = fetch_and_parse(&client, url.clone()).await; let res = fetch_and_parse(&client, url.clone()).await;
let mut counters = Counters::default();
match res { match res {
Ok(res) => { Ok(res) => {
counters.success += 1; counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![ pool.get().await.unwrap().interact(move |conn| conn.execute(
"INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
rusqlite::params![
res.ean, res.ean,
res.fetched_at, res.fetched_at,
res.precio_centavos, res.precio_centavos,
@ -111,19 +138,21 @@ async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Co
res.parser_version, res.parser_version,
res.name, res.name,
res.image_url, res.image_url,
]).unwrap(); ]
)).await.unwrap().unwrap();
} }
Err(err) => { Err(err) => {
match err.downcast_ref::<FetchError>() { match err.downcast_ref::<FetchError>() {
Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1, Some(FetchError::Http(e)) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
_ => counters.errored += 1, _ => counters.errored += 1,
} }
tracing::error!(error=%err, url=url); tracing::error!(error=%err, url=url);
} }
} }
}
counters counters
} }
@ -131,34 +160,39 @@ async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Co
enum FetchError { enum FetchError {
#[error("reqwest error")] #[error("reqwest error")]
Http(#[from] reqwest::Error), Http(#[from] reqwest::Error),
#[error("http status: {0}")]
HttpStatus(reqwest::StatusCode),
#[error("parse error")] #[error("parse error")]
Parse(#[from] SimpleError), Parse(#[from] SimpleError),
#[error("tl error")] #[error("tl error")]
Tl(#[from] tl::ParseError), Tl(#[from] tl::ParseError),
} }
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
let request = client.get(url).build()?;
let response = client.execute(request).await?.error_for_status()?;
Ok(response)
}
pub fn get_retry_policy() -> again::RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(300))
.with_max_retries(10)
.with_jitter(true)
}
pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool {
!err.status().is_some_and(|s| s == StatusCode::NOT_FOUND)
}
#[tracing::instrument(skip(client))] #[tracing::instrument(skip(client))]
async fn fetch_and_parse( async fn fetch_and_parse(
client: &reqwest::Client, client: &reqwest::Client,
url: String, url: String,
) -> Result<PrecioPoint, anyhow::Error> { ) -> Result<PrecioPoint, anyhow::Error> {
let policy = RetryPolicy::exponential(Duration::from_millis(300)) let body = get_retry_policy()
.with_max_retries(10) .retry_if(|| do_request(client, &url), retry_if_wasnt_not_found)
.with_jitter(true); .await?
.text()
let response = policy
.retry(|| {
let request = client.get(url.as_str()).build().unwrap();
client.execute(request)
})
.await .await
.map_err(FetchError::Http)?; .map_err(FetchError::Http)?;
if !response.status().is_success() {
bail!(FetchError::HttpStatus(response.status()));
}
let body = response.text().await.map_err(FetchError::Http)?;
let maybe_point = { scrap_url(client, url, &body).await }; let maybe_point = { scrap_url(client, url, &body).await };
@ -187,8 +221,7 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
dom.query_selector("link[rel=\"canonical\"]") dom.query_selector("link[rel=\"canonical\"]")
.unwrap() .unwrap()
.filter_map(|h| h.get(dom.parser())) .filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag()) .find_map(|n| n.as_tag())
.next()
.and_then(|t| t.attributes().get("href").flatten()) .and_then(|t| t.attributes().get("href").flatten())
.expect("No meta canonical") .expect("No meta canonical")
.as_utf8_str() .as_utf8_str()
@ -200,6 +233,24 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
let urls = get_urls(&supermercado).await?;
urls.iter().for_each(|s| {
println!("{}", s);
});
Ok(())
}
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
Ok(match supermercado {
Supermercado::Dia => sites::dia::get_urls().await?,
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
Supermercado::Coto => sites::coto::get_urls().await?,
})
}
async fn scrap_url( async fn scrap_url(
client: &reqwest::Client, client: &reqwest::Client,
url: String, url: String,
@ -221,6 +272,119 @@ async fn scrap_url(
} }
} }
#[derive(Clone)]
struct Auto {
pool: Pool,
telegram_token: String,
telegram_chat_id: String,
}
impl Auto {
async fn download_supermercado(self: Self, supermercado: Supermercado) -> anyhow::Result<()> {
{
let t0 = now_sec();
self.get_and_save_urls(&supermercado).await?;
self.inform(&format!(
"Downloaded url list {:?} (took {})",
&supermercado,
now_sec() - t0
))
.await;
}
let links: Vec<String> = {
self.pool
.get()
.await?
.interact(|conn| -> anyhow::Result<Vec<String>> {
Ok(conn
.prepare(r#"SELECT url FROM producto_urls;"#)?
.query_map([], |r| r.get::<_, String>(0))?
.map(|r| r.unwrap())
.collect())
})
.await
.unwrap()?
};
{
let t0 = now_sec();
let counters = fetch_list(&self.pool, links).await;
self.inform(&format!(
"Downloaded {:?}: {:?} (took {})",
&supermercado,
counters,
now_sec() - t0
))
.await;
}
Ok(())
}
async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?;
self.pool
.get()
.await?
.interact(|conn| -> Result<(), anyhow::Error> {
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare(
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
VALUES (?1, ?2, ?2)
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
)?;
let now: u64 = now_ms().try_into()?;
for url in urls {
stmt.execute(rusqlite::params![url, now])?;
}
}
tx.commit()?;
Ok(())
})
.await
.unwrap()?;
Ok(())
}
async fn inform(self: &Self, msg: &str) {
println!("{}", msg);
let u = Url::parse_with_params(
&format!(
"https://api.telegram.org/bot{}/sendMessage",
self.telegram_token
),
&[
("chat_id", self.telegram_chat_id.clone()),
("text", msg.to_string()),
],
)
.unwrap();
reqwest::get(u).await.unwrap();
}
}
async fn auto_cli() -> anyhow::Result<()> {
let db = connect_db();
let auto = Auto {
pool: db,
telegram_token: env::var("TELEGRAM_BOT_TOKEN")?,
telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?,
};
auto.inform("[auto] Empezando scrap").await;
let handles: Vec<_> = Supermercado::value_variants()
.iter()
.map(|s| tokio::spawn(auto.clone().download_supermercado(s.to_owned())))
.collect();
future::try_join_all(handles).await?;
Ok(())
}
async fn cron_cli() -> anyhow::Result<()> {
let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24));
loop {
interval.tick().await;
tokio::spawn(auto_cli());
}
}
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
mod sites; mod sites;
@ -239,9 +403,14 @@ struct PrecioPoint {
} }
fn now_sec() -> u64 { fn now_sec() -> u64 {
let start = SystemTime::now(); since_the_epoch().as_secs()
let since_the_epoch = start }
.duration_since(UNIX_EPOCH) fn now_ms() -> u128 {
.expect("Time went backwards"); since_the_epoch().as_millis()
since_the_epoch.as_secs() }
fn since_the_epoch() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
} }

View file

@ -66,3 +66,19 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url, url,
}) })
} }
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let urls = vec![
"https://www.carrefour.com.ar/sitemap/product-0.xml",
"https://www.carrefour.com.ar/sitemap/product-1.xml",
"https://www.carrefour.com.ar/sitemap/product-2.xml",
"https://www.carrefour.com.ar/sitemap/product-3.xml",
"https://www.carrefour.com.ar/sitemap/product-4.xml",
"https://www.carrefour.com.ar/sitemap/product-5.xml",
"https://www.carrefour.com.ar/sitemap/product-6.xml",
"https://www.carrefour.com.ar/sitemap/product-7.xml",
"https://www.carrefour.com.ar/sitemap/product-8.xml",
"https://www.carrefour.com.ar/sitemap/product-9.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -1,6 +1,9 @@
use anyhow::Context; use anyhow::{anyhow, Context};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use reqwest::Url;
use crate::PrecioPoint; use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found, PrecioPoint};
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> { pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
let ean = dom let ean = dom
@ -24,8 +27,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector(".atg_store_newPrice") .query_selector(".atg_store_newPrice")
.unwrap() .unwrap()
.filter_map(|h| h.get(dom.parser())) .filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag()) .find_map(|n| n.as_tag())
.next()
.map(|t| t.inner_text(dom.parser())) .map(|t| t.inner_text(dom.parser()))
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
.map(|s| { .map(|s| {
@ -41,8 +43,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
dom.query_selector(".product_not_available") dom.query_selector(".product_not_available")
.unwrap() .unwrap()
.filter_map(|h| h.get(dom.parser())) .filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag()) .find_map(|n| n.as_tag())
.next()
.is_some(), .is_some(),
); );
@ -50,8 +51,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector("h1.product_page") .query_selector("h1.product_page")
.unwrap() .unwrap()
.filter_map(|h| h.get(dom.parser())) .filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag()) .find_map(|n| n.as_tag())
.next()
.map(|t| t.inner_text(dom.parser())) .map(|t| t.inner_text(dom.parser()))
.map(|s| s.trim().to_string()); .map(|s| s.trim().to_string());
@ -59,8 +59,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector(".zoomImage1") .query_selector(".zoomImage1")
.unwrap() .unwrap()
.filter_map(|h| h.get(dom.parser())) .filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag()) .find_map(|n| n.as_tag())
.next()
.and_then(|t| t.attributes().get("src").flatten()) .and_then(|t| t.attributes().get("src").flatten())
.map(|s| s.as_utf8_str().to_string()); .map(|s| s.as_utf8_str().to_string());
@ -75,3 +74,64 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url, url,
}) })
} }
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let client = build_client();
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
let page_size = 100;
let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size)
.map(|i| {
let mut u = initial.clone();
u.query_pairs_mut()
.append_pair("No", &(i * page_size).to_string())
.append_pair("Nrpp", &(page_size).to_string())
.finish();
let client = &client;
async move {
let text = get_retry_policy()
.retry_if(
|| do_request(client, u.as_str()).and_then(|r| r.text()),
retry_if_wasnt_not_found,
)
.await?;
let dom = tl::parse(&text, tl::ParserOptions::default())?;
let list: Vec<String> = dom
.query_selector(".product_info_container")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.filter_map(|t| -> Option<anyhow::Result<String>> {
t.children()
.top()
.iter()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.find(|t| t.name() == "a")
.map(|t| {
t.attributes()
.get("href")
.flatten()
.ok_or(anyhow!("No tiene href="))
})
.map(|s| {
Ok(Url::options()
.base_url(Some(&Url::parse("https://www.cotodigital3.com.ar")?))
.parse(s?.as_utf8_str().as_ref())?
.to_string())
})
})
.try_collect()?;
Ok::<Vec<String>, anyhow::Error>(list)
}
})
.buffer_unordered(8)
.try_collect()
.await?;
let mut total: Vec<String> = vec![];
for mut urls in handles {
total.append(&mut urls);
}
Ok(total.into_iter().unique().collect())
}

View file

@ -4,6 +4,7 @@ use simple_error::bail;
use crate::sites::common; use crate::sites::common;
use crate::PrecioPoint; use crate::PrecioPoint;
use super::vtex;
use super::vtex::find_product_ld; use super::vtex::find_product_ld;
use super::vtex::AvailabilityLd; use super::vtex::AvailabilityLd;
@ -39,3 +40,14 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url, url,
}) })
} }
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let urls = vec![
"https://diaonline.supermercadosdia.com.ar/sitemap/product-1.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-2.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-3.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-4.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-5.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -90,3 +90,25 @@ pub async fn scrap(
url, url,
}) })
} }
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
// de https://www.jumbo.com.ar/sitemap.xml
let urls = vec![
"https://www.jumbo.com.ar/sitemap/product-1.xml",
"https://www.jumbo.com.ar/sitemap/product-10.xml",
"https://www.jumbo.com.ar/sitemap/product-11.xml",
"https://www.jumbo.com.ar/sitemap/product-12.xml",
"https://www.jumbo.com.ar/sitemap/product-13.xml",
"https://www.jumbo.com.ar/sitemap/product-14.xml",
"https://www.jumbo.com.ar/sitemap/product-15.xml",
"https://www.jumbo.com.ar/sitemap/product-2.xml",
"https://www.jumbo.com.ar/sitemap/product-3.xml",
"https://www.jumbo.com.ar/sitemap/product-4.xml",
"https://www.jumbo.com.ar/sitemap/product-5.xml",
"https://www.jumbo.com.ar/sitemap/product-6.xml",
"https://www.jumbo.com.ar/sitemap/product-7.xml",
"https://www.jumbo.com.ar/sitemap/product-8.xml",
"https://www.jumbo.com.ar/sitemap/product-9.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -1,8 +1,12 @@
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::Deserialize; use serde::Deserialize;
use simple_error::SimpleError; use simple_error::SimpleError;
use tl::VDom; use tl::VDom;
use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found};
use super::common; use super::common;
pub fn parse_script_json(dom: &VDom, varname: &str) -> Result<serde_json::Value, anyhow::Error> { pub fn parse_script_json(dom: &VDom, varname: &str) -> Result<serde_json::Value, anyhow::Error> {
@ -100,3 +104,44 @@ pub fn in_stock_from_meta(dom: &VDom) -> anyhow::Result<bool> {
}, },
) )
} }
pub fn parse_urls_from_sitemap(sitemap: &str) -> anyhow::Result<Vec<String>> {
let dom = tl::parse(sitemap, tl::ParserOptions::default())?;
Ok(dom
.query_selector("loc")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.map(|t| t.inner_text(dom.parser()))
.map(|s| s.to_string())
.collect())
}
pub async fn get_urls_from_sitemap(sitemaps: Vec<&str>) -> anyhow::Result<Vec<String>> {
let mut total: Vec<String> = vec![];
let client = build_client();
let handles = stream::iter(sitemaps)
.map(|url| {
let client = client.clone();
let url = url.to_string();
async move {
let client = client;
let url = url;
let text = get_retry_policy()
.retry_if(|| do_request(&client, &url), retry_if_wasnt_not_found)
.await?
.text()
.await?;
parse_urls_from_sitemap(&text)
}
})
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;
for mut urls in handles {
total.append(&mut urls);
}
Ok(total.into_iter().unique().collect())
}