Compare commits

..

No commits in common. "26b9f4b17f1494f3a390e3f0e406b0571fd4310f" and "6853b6389a9ccdd7ab9e8fecb7d3073b93793082" have entirely different histories.

10 changed files with 201 additions and 479 deletions

View file

@ -62,26 +62,24 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/scraper
- name: Build and push Docker image
uses: docker/build-push-action@v5
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
file: Dockerfile.scraper
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache
cache-to: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache,mode=max
cache-from: type=gha
cache-to: type=gha,mode=max

View file

@ -1,7 +1,14 @@
FROM cgr.dev/chainguard/wolfi-base AS base
WORKDIR /usr/src/app
RUN apk add --no-cache libgcc
RUN apk add --no-cache bun libgcc
FROM base as build
ENV NODE_ENV=production
COPY . .
RUN bun install --frozen-lockfile
RUN bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js
# nightly porque usamos tl con `simd` activado
FROM base as rs-build
RUN apk add --no-cache rust build-base sqlite-dev
@ -12,8 +19,11 @@ FROM base
RUN apk add --no-cache sqlite sqlite-libs
# Scraper
COPY --from=build /tmp/cli.build.js /bin/scraper
COPY --from=build /usr/src/app/db-datos/drizzle /bin/drizzle
COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs
ENV NODE_ENV=production
ENV DB_PATH=/db/db.db
CMD ["scraper-rs", "cron"]
CMD ["bun", "/bin/scraper", "cron"]

157
scraper-rs/Cargo.lock generated
View file

@ -115,6 +115,19 @@ version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.5"
@ -129,17 +142,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-trait"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -273,6 +275,15 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "concurrent-queue"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -299,44 +310,12 @@ dependencies = [
]
[[package]]
name = "deadpool"
version = "0.10.0"
name = "crossbeam-utils"
version = "0.8.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]]
name = "deadpool-sqlite"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8010e36e12f3be22543a5e478b4af20aeead9a700dd69581a5e050a070fc22c"
dependencies = [
"deadpool",
"deadpool-sync",
"rusqlite",
]
[[package]]
name = "deadpool-sync"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8db70494c13cae4ce67b4b4dafdaf828cf0df7237ab5b9e2fcabee4965d0a0a"
dependencies = [
"deadpool-runtime",
"cfg-if",
]
[[package]]
@ -360,6 +339,27 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "event-listener"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f2cdcf274580f2d63697192d744727b3198894b1bf02923643bf59e2c26712"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
@ -671,15 +671,6 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.10"
@ -815,6 +806,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -911,6 +908,28 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]]
name = "rand"
version = "0.7.3"
@ -1114,6 +1133,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -1126,12 +1154,11 @@ version = "0.1.0"
dependencies = [
"again",
"anyhow",
"async-channel",
"clap",
"deadpool",
"deadpool-sqlite",
"futures",
"itertools",
"nanoid",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5",
"reqwest",
"rusqlite",
@ -1520,6 +1547,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom 0.2.11",
"rand 0.8.5",
]
[[package]]
name = "valuable"
version = "0.1.0"

View file

@ -8,13 +8,13 @@ edition = "2021"
[dependencies]
again = "0.1.2"
anyhow = "1.0.79"
async-channel = "2.1.1"
clap = { version = "4.4.15", features = ["derive"] }
deadpool = "0.10.0"
deadpool-sqlite = "0.7.0"
futures = "0.3.30"
itertools = "0.12.0"
nanoid = "0.4.0"
r2d2 = "0.8.10"
r2d2_sqlite = "0.23.0"
rand = "0.8.5"
# lol_html = "1.2.0"
reqwest = { version = "0.11.23", default-features = false, features = [
"rustls-tls",
"gzip",
@ -22,6 +22,7 @@ reqwest = { version = "0.11.23", default-features = false, features = [
"socks",
] }
rusqlite = "0.30.0"
# scraper = "0.18.1"
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.109"
simple-error = "0.3.0"

View file

@ -1,8 +1,9 @@
use again::RetryPolicy;
use clap::{Parser, ValueEnum};
use deadpool_sqlite::Pool;
use futures::{future, stream, StreamExt};
use async_channel::Receiver;
use clap::Parser;
use nanoid::nanoid;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use reqwest::{StatusCode, Url};
use simple_error::{bail, SimpleError};
use std::{
@ -12,23 +13,11 @@ use std::{
time::Duration,
};
use thiserror::Error;
use tokio::time;
#[derive(ValueEnum, Clone, Debug)]
enum Supermercado {
Dia,
Jumbo,
Carrefour,
Coto,
}
#[derive(Parser)] // requires `derive` feature
enum Args {
FetchList(FetchListArgs),
ParseFile(ParseFileArgs),
GetUrlList(GetUrlListArgs),
Auto(AutoArgs),
Cron(AutoArgs),
}
#[derive(clap::Args)]
struct FetchListArgs {
@ -38,13 +27,6 @@ struct FetchListArgs {
struct ParseFileArgs {
file_path: String,
}
#[derive(clap::Args)]
struct GetUrlListArgs {
#[arg(value_enum)]
supermercado: Supermercado,
}
#[derive(clap::Args)]
struct AutoArgs {}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -53,9 +35,6 @@ async fn main() -> anyhow::Result<()> {
match Args::parse() {
Args::FetchList(a) => fetch_list_cli(a.list_path).await,
Args::ParseFile(a) => parse_file_cli(a.file_path).await,
Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await,
Args::Auto(_) => auto_cli().await,
Args::Cron(_) => cron_cli().await,
}
}
@ -68,47 +47,40 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.map(|s| s.to_owned())
.collect::<Vec<_>>();
let pool = connect_db();
let counters = fetch_list(&pool, links).await;
let (sender, receiver) = async_channel::bounded::<String>(1);
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::new(manager).unwrap();
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(128), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let handles = (1..n_coroutines)
.map(|_| {
let rx = receiver.clone();
let pool = pool.clone();
tokio::spawn(worker(rx, pool))
})
.collect::<Vec<_>>();
for link in links {
sender.send_blocking(link).unwrap();
}
sender.close();
let mut counters = Counters::default();
for handle in handles {
let c = handle.await.unwrap();
counters.success += c.success;
counters.errored += c.errored;
counters.skipped += c.skipped;
}
println!("Finished: {:?}", counters);
Ok(())
}
async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let client = build_client();
stream::iter(links)
.map(|url| {
let pool = pool.clone();
let client = client.clone();
tokio::spawn(fetch_and_save(client, url, pool))
})
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| {
let ret = y.unwrap();
future::ready(Counters {
success: x.success + ret.success,
errored: x.errored + ret.errored,
skipped: x.skipped + ret.skipped,
})
})
.await
}
fn connect_db() -> Pool {
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let cfg = deadpool_sqlite::Config::new(db_path);
let pool = cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap();
pool
}
fn build_client() -> reqwest::Client {
reqwest::ClientBuilder::default().build().unwrap()
}
@ -120,15 +92,16 @@ struct Counters {
skipped: u64,
}
async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters {
let res = fetch_and_parse(&client, url.clone()).await;
async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Counters {
let client = build_client();
let mut counters = Counters::default();
match res {
Ok(res) => {
counters.success += 1;
pool.get().await.unwrap().interact(move |conn| conn.execute(
"INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
rusqlite::params![
while let Ok(url) = rx.recv().await {
let res = fetch_and_parse(&client, url.clone()).await;
match res {
Ok(res) => {
counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![
res.ean,
res.fetched_at,
res.precio_centavos,
@ -138,21 +111,19 @@ async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Cou
res.parser_version,
res.name,
res.image_url,
]
)).await.unwrap().unwrap();
}
Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::Http(e)) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
_ => counters.errored += 1,
]).unwrap();
}
Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1,
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url);
tracing::error!(error=%err, url=url);
}
}
}
counters
}
@ -160,39 +131,34 @@ async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Cou
enum FetchError {
#[error("reqwest error")]
Http(#[from] reqwest::Error),
#[error("http status: {0}")]
HttpStatus(reqwest::StatusCode),
#[error("parse error")]
Parse(#[from] SimpleError),
#[error("tl error")]
Tl(#[from] tl::ParseError),
}
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
let request = client.get(url).build()?;
let response = client.execute(request).await?.error_for_status()?;
Ok(response)
}
pub fn get_retry_policy() -> again::RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(300))
.with_max_retries(10)
.with_jitter(true)
}
pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool {
!err.status().is_some_and(|s| s == StatusCode::NOT_FOUND)
}
#[tracing::instrument(skip(client))]
async fn fetch_and_parse(
client: &reqwest::Client,
url: String,
) -> Result<PrecioPoint, anyhow::Error> {
let body = get_retry_policy()
.retry_if(|| do_request(client, &url), retry_if_wasnt_not_found)
.await?
.text()
let policy = RetryPolicy::exponential(Duration::from_millis(300))
.with_max_retries(10)
.with_jitter(true);
let response = policy
.retry(|| {
let request = client.get(url.as_str()).build().unwrap();
client.execute(request)
})
.await
.map_err(FetchError::Http)?;
if !response.status().is_success() {
bail!(FetchError::HttpStatus(response.status()));
}
let body = response.text().await.map_err(FetchError::Http)?;
let maybe_point = { scrap_url(client, url, &body).await };
@ -221,7 +187,8 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
dom.query_selector("link[rel=\"canonical\"]")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.find_map(|n| n.as_tag())
.filter_map(|n| n.as_tag())
.next()
.and_then(|t| t.attributes().get("href").flatten())
.expect("No meta canonical")
.as_utf8_str()
@ -233,24 +200,6 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
Ok(())
}
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
let urls = get_urls(&supermercado).await?;
urls.iter().for_each(|s| {
println!("{}", s);
});
Ok(())
}
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
Ok(match supermercado {
Supermercado::Dia => sites::dia::get_urls().await?,
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
Supermercado::Coto => sites::coto::get_urls().await?,
})
}
async fn scrap_url(
client: &reqwest::Client,
url: String,
@ -272,119 +221,6 @@ async fn scrap_url(
}
}
#[derive(Clone)]
struct Auto {
pool: Pool,
telegram_token: String,
telegram_chat_id: String,
}
impl Auto {
async fn download_supermercado(self: Self, supermercado: Supermercado) -> anyhow::Result<()> {
{
let t0 = now_sec();
self.get_and_save_urls(&supermercado).await?;
self.inform(&format!(
"Downloaded url list {:?} (took {})",
&supermercado,
now_sec() - t0
))
.await;
}
let links: Vec<String> = {
self.pool
.get()
.await?
.interact(|conn| -> anyhow::Result<Vec<String>> {
Ok(conn
.prepare(r#"SELECT url FROM producto_urls;"#)?
.query_map([], |r| r.get::<_, String>(0))?
.map(|r| r.unwrap())
.collect())
})
.await
.unwrap()?
};
{
let t0 = now_sec();
let counters = fetch_list(&self.pool, links).await;
self.inform(&format!(
"Downloaded {:?}: {:?} (took {})",
&supermercado,
counters,
now_sec() - t0
))
.await;
}
Ok(())
}
async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?;
self.pool
.get()
.await?
.interact(|conn| -> Result<(), anyhow::Error> {
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare(
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
VALUES (?1, ?2, ?2)
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
)?;
let now: u64 = now_ms().try_into()?;
for url in urls {
stmt.execute(rusqlite::params![url, now])?;
}
}
tx.commit()?;
Ok(())
})
.await
.unwrap()?;
Ok(())
}
async fn inform(self: &Self, msg: &str) {
println!("{}", msg);
let u = Url::parse_with_params(
&format!(
"https://api.telegram.org/bot{}/sendMessage",
self.telegram_token
),
&[
("chat_id", self.telegram_chat_id.clone()),
("text", msg.to_string()),
],
)
.unwrap();
reqwest::get(u).await.unwrap();
}
}
async fn auto_cli() -> anyhow::Result<()> {
let db = connect_db();
let auto = Auto {
pool: db,
telegram_token: env::var("TELEGRAM_BOT_TOKEN")?,
telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?,
};
auto.inform("[auto] Empezando scrap").await;
let handles: Vec<_> = Supermercado::value_variants()
.iter()
.map(|s| tokio::spawn(auto.clone().download_supermercado(s.to_owned())))
.collect();
future::try_join_all(handles).await?;
Ok(())
}
async fn cron_cli() -> anyhow::Result<()> {
let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24));
loop {
interval.tick().await;
tokio::spawn(auto_cli());
}
}
use std::time::{SystemTime, UNIX_EPOCH};
mod sites;
@ -403,14 +239,9 @@ struct PrecioPoint {
}
fn now_sec() -> u64 {
since_the_epoch().as_secs()
}
fn now_ms() -> u128 {
since_the_epoch().as_millis()
}
fn since_the_epoch() -> Duration {
SystemTime::now()
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.expect("Time went backwards");
since_the_epoch.as_secs()
}

View file

@ -66,19 +66,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url,
})
}
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let urls = vec![
"https://www.carrefour.com.ar/sitemap/product-0.xml",
"https://www.carrefour.com.ar/sitemap/product-1.xml",
"https://www.carrefour.com.ar/sitemap/product-2.xml",
"https://www.carrefour.com.ar/sitemap/product-3.xml",
"https://www.carrefour.com.ar/sitemap/product-4.xml",
"https://www.carrefour.com.ar/sitemap/product-5.xml",
"https://www.carrefour.com.ar/sitemap/product-6.xml",
"https://www.carrefour.com.ar/sitemap/product-7.xml",
"https://www.carrefour.com.ar/sitemap/product-8.xml",
"https://www.carrefour.com.ar/sitemap/product-9.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -1,9 +1,6 @@
use anyhow::{anyhow, Context};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use reqwest::Url;
use anyhow::Context;
use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found, PrecioPoint};
use crate::PrecioPoint;
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
let ean = dom
@ -27,7 +24,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector(".atg_store_newPrice")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.find_map(|n| n.as_tag())
.filter_map(|n| n.as_tag())
.next()
.map(|t| t.inner_text(dom.parser()))
.filter(|s| !s.is_empty())
.map(|s| {
@ -43,7 +41,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
dom.query_selector(".product_not_available")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.find_map(|n| n.as_tag())
.filter_map(|n| n.as_tag())
.next()
.is_some(),
);
@ -51,7 +50,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector("h1.product_page")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.find_map(|n| n.as_tag())
.filter_map(|n| n.as_tag())
.next()
.map(|t| t.inner_text(dom.parser()))
.map(|s| s.trim().to_string());
@ -59,7 +59,8 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
.query_selector(".zoomImage1")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.find_map(|n| n.as_tag())
.filter_map(|n| n.as_tag())
.next()
.and_then(|t| t.attributes().get("src").flatten())
.map(|s| s.as_utf8_str().to_string());
@ -74,64 +75,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url,
})
}
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let client = build_client();
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
let page_size = 100;
let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size)
.map(|i| {
let mut u = initial.clone();
u.query_pairs_mut()
.append_pair("No", &(i * page_size).to_string())
.append_pair("Nrpp", &(page_size).to_string())
.finish();
let client = &client;
async move {
let text = get_retry_policy()
.retry_if(
|| do_request(client, u.as_str()).and_then(|r| r.text()),
retry_if_wasnt_not_found,
)
.await?;
let dom = tl::parse(&text, tl::ParserOptions::default())?;
let list: Vec<String> = dom
.query_selector(".product_info_container")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.filter_map(|t| -> Option<anyhow::Result<String>> {
t.children()
.top()
.iter()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.find(|t| t.name() == "a")
.map(|t| {
t.attributes()
.get("href")
.flatten()
.ok_or(anyhow!("No tiene href="))
})
.map(|s| {
Ok(Url::options()
.base_url(Some(&Url::parse("https://www.cotodigital3.com.ar")?))
.parse(s?.as_utf8_str().as_ref())?
.to_string())
})
})
.try_collect()?;
Ok::<Vec<String>, anyhow::Error>(list)
}
})
.buffer_unordered(8)
.try_collect()
.await?;
let mut total: Vec<String> = vec![];
for mut urls in handles {
total.append(&mut urls);
}
Ok(total.into_iter().unique().collect())
}

View file

@ -4,7 +4,6 @@ use simple_error::bail;
use crate::sites::common;
use crate::PrecioPoint;
use super::vtex;
use super::vtex::find_product_ld;
use super::vtex::AvailabilityLd;
@ -40,14 +39,3 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url,
})
}
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let urls = vec![
"https://diaonline.supermercadosdia.com.ar/sitemap/product-1.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-2.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-3.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-4.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-5.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -90,25 +90,3 @@ pub async fn scrap(
url,
})
}
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
// de https://www.jumbo.com.ar/sitemap.xml
let urls = vec![
"https://www.jumbo.com.ar/sitemap/product-1.xml",
"https://www.jumbo.com.ar/sitemap/product-10.xml",
"https://www.jumbo.com.ar/sitemap/product-11.xml",
"https://www.jumbo.com.ar/sitemap/product-12.xml",
"https://www.jumbo.com.ar/sitemap/product-13.xml",
"https://www.jumbo.com.ar/sitemap/product-14.xml",
"https://www.jumbo.com.ar/sitemap/product-15.xml",
"https://www.jumbo.com.ar/sitemap/product-2.xml",
"https://www.jumbo.com.ar/sitemap/product-3.xml",
"https://www.jumbo.com.ar/sitemap/product-4.xml",
"https://www.jumbo.com.ar/sitemap/product-5.xml",
"https://www.jumbo.com.ar/sitemap/product-6.xml",
"https://www.jumbo.com.ar/sitemap/product-7.xml",
"https://www.jumbo.com.ar/sitemap/product-8.xml",
"https://www.jumbo.com.ar/sitemap/product-9.xml",
];
vtex::get_urls_from_sitemap(urls).await
}

View file

@ -1,12 +1,8 @@
use anyhow::{bail, Context};
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::Deserialize;
use simple_error::SimpleError;
use tl::VDom;
use crate::{build_client, do_request, get_retry_policy, retry_if_wasnt_not_found};
use super::common;
pub fn parse_script_json(dom: &VDom, varname: &str) -> Result<serde_json::Value, anyhow::Error> {
@ -104,44 +100,3 @@ pub fn in_stock_from_meta(dom: &VDom) -> anyhow::Result<bool> {
},
)
}
pub fn parse_urls_from_sitemap(sitemap: &str) -> anyhow::Result<Vec<String>> {
let dom = tl::parse(sitemap, tl::ParserOptions::default())?;
Ok(dom
.query_selector("loc")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.map(|t| t.inner_text(dom.parser()))
.map(|s| s.to_string())
.collect())
}
pub async fn get_urls_from_sitemap(sitemaps: Vec<&str>) -> anyhow::Result<Vec<String>> {
let mut total: Vec<String> = vec![];
let client = build_client();
let handles = stream::iter(sitemaps)
.map(|url| {
let client = client.clone();
let url = url.to_string();
async move {
let client = client;
let url = url;
let text = get_retry_policy()
.retry_if(|| do_request(&client, &url), retry_if_wasnt_not_found)
.await?
.text()
.await?;
parse_urls_from_sitemap(&text)
}
})
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;
for mut urls in handles {
total.append(&mut urls);
}
Ok(total.into_iter().unique().collect())
}