diff --git a/Dockerfile.scraper b/Dockerfile.scraper index 7054cc2..f321650 100644 --- a/Dockerfile.scraper +++ b/Dockerfile.scraper @@ -1,14 +1,7 @@ FROM cgr.dev/chainguard/wolfi-base AS base WORKDIR /usr/src/app -RUN apk add --no-cache bun libgcc +RUN apk add --no-cache libgcc -FROM base as build -ENV NODE_ENV=production -COPY . . -RUN bun install --frozen-lockfile -RUN bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js - -# nightly porque usamos tl con `simd` activado FROM base as rs-build RUN apk add --no-cache rust build-base sqlite-dev @@ -19,11 +12,8 @@ FROM base RUN apk add --no-cache sqlite sqlite-libs # Scraper -COPY --from=build /tmp/cli.build.js /bin/scraper -COPY --from=build /usr/src/app/db-datos/drizzle /bin/drizzle COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs -ENV NODE_ENV=production ENV DB_PATH=/db/db.db -CMD ["bun", "/bin/scraper", "cron"] +CMD ["scraper-rs", "cron"] diff --git a/scraper-rs/src/main.rs b/scraper-rs/src/main.rs index 20860a6..bc89c91 100644 --- a/scraper-rs/src/main.rs +++ b/scraper-rs/src/main.rs @@ -1,6 +1,7 @@ use again::RetryPolicy; use async_channel::Receiver; use clap::{Parser, ValueEnum}; +use futures::{stream, StreamExt, TryStreamExt}; use nanoid::nanoid; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -13,8 +14,9 @@ use std::{ time::Duration, }; use thiserror::Error; +use tokio::time; -#[derive(ValueEnum, Clone)] +#[derive(ValueEnum, Clone, Debug)] enum Supermercado { Dia, Jumbo, @@ -27,6 +29,8 @@ enum Args { FetchList(FetchListArgs), ParseFile(ParseFileArgs), GetUrlList(GetUrlListArgs), + Auto(AutoArgs), + Cron(AutoArgs), } #[derive(clap::Args)] struct FetchListArgs { @@ -41,6 +45,8 @@ struct GetUrlListArgs { #[arg(value_enum)] supermercado: Supermercado, } +#[derive(clap::Args)] +struct AutoArgs {} #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -50,6 +56,8 @@ async fn main() -> anyhow::Result<()> { Args::FetchList(a) => fetch_list_cli(a.list_path).await, Args::ParseFile(a) => parse_file_cli(a.file_path).await, Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await, + Args::Auto(_) => auto_cli().await, + Args::Cron(_) => cron_cli().await, } } @@ -62,14 +70,18 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> { .map(|s| s.to_owned()) .collect::>(); + let pool = connect_db(); + let counters = fetch_list(&pool, links).await; + + println!("Finished: {:?}", counters); + Ok(()) +} + +async fn fetch_list(pool: &Pool, links: Vec) -> Counters { let (sender, receiver) = async_channel::bounded::(1); - let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string()); - let manager = SqliteConnectionManager::file(db_path); - let pool = Pool::new(manager).unwrap(); - let n_coroutines = env::var("N_COROUTINES") - .map_or(Ok(128), |s| s.parse::()) + .map_or(Ok(24), |s| s.parse::()) .expect("N_COROUTINES no es un nĂºmero"); let handles = (1..n_coroutines) .map(|_| { @@ -91,9 +103,14 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> { counters.errored += c.errored; counters.skipped += c.skipped; } + counters +} - println!("Finished: {:?}", counters); - Ok(()) +fn connect_db() -> Pool { + let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string()); + let manager = SqliteConnectionManager::file(db_path); + let pool = Pool::new(manager).unwrap(); + pool } fn build_client() -> reqwest::Client { @@ -130,7 +147,10 @@ async fn worker(rx: Receiver, pool: Pool) -> Co } Err(err) => { match err.downcast_ref::() { - Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1, + Some(FetchError::Http(e)) => match e.status() { + Some(StatusCode::NOT_FOUND) => counters.skipped += 1, + _ => counters.errored += 1, + }, _ => counters.errored += 1, } @@ -146,20 +166,15 @@ async fn worker(rx: Receiver, pool: Pool) -> Co enum FetchError { #[error("reqwest error")] Http(#[from] reqwest::Error), - #[error("http status: {0}")] - HttpStatus(reqwest::StatusCode), #[error("parse error")] Parse(#[from] SimpleError), #[error("tl error")] Tl(#[from] tl::ParseError), } -pub async fn do_request(client: &reqwest::Client, url: &str) -> anyhow::Result { +pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result { let request = client.get(url).build()?; - let response = client.execute(request).await?; - if !response.status().is_success() { - bail!(FetchError::HttpStatus(response.status())); - } + let response = client.execute(request).await?.error_for_status()?; Ok(response) } @@ -221,12 +236,7 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> { } async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> { - let urls = match supermercado { - Supermercado::Dia => sites::dia::get_urls().await?, - Supermercado::Jumbo => sites::jumbo::get_urls().await?, - Supermercado::Carrefour => sites::carrefour::get_urls().await?, - _ => todo!(), - }; + let urls = get_urls(&supermercado).await?; urls.iter().for_each(|s| { println!("{}", s); }); @@ -234,6 +244,15 @@ async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> { Ok(()) } +async fn get_urls(supermercado: &Supermercado) -> Result, anyhow::Error> { + Ok(match supermercado { + Supermercado::Dia => sites::dia::get_urls().await?, + Supermercado::Jumbo => sites::jumbo::get_urls().await?, + Supermercado::Carrefour => sites::carrefour::get_urls().await?, + Supermercado::Coto => sites::coto::get_urls().await?, + }) +} + async fn scrap_url( client: &reqwest::Client, url: String, @@ -255,6 +274,105 @@ async fn scrap_url( } } +struct Auto { + pool: Pool, + telegram_token: String, + telegram_chat_id: String, +} +impl Auto { + async fn download_supermercado(self: &Self, supermercado: Supermercado) -> anyhow::Result<()> { + { + let t0 = now_sec(); + self.get_and_save_urls(&supermercado).await?; + self.inform(&format!( + "Downloaded url list {:?} (took {})", + &supermercado, + now_sec() - t0 + )) + .await; + } + let links: Vec = self + .pool + .get()? + .prepare(r#"SELECT url FROM producto_urls;"#)? + .query_map([], |r| r.get::<_, String>(0))? + .map(|r| r.unwrap()) + .collect(); + { + let t0 = now_sec(); + let counters = fetch_list(&self.pool, links).await; + self.inform(&format!( + "Downloaded {:?}: {:?} (took {})", + &supermercado, + counters, + now_sec() - t0 + )) + .await; + } + Ok(()) + } + + async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> { + let urls = get_urls(supermercado).await?; + let connection = &mut self.pool.get()?; + let tx = connection.transaction()?; + { + let mut stmt = tx.prepare( + r#"INSERT INTO producto_urls(url, first_seen, last_seen) + VALUES (?1, ?2, ?2) + ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#, + )?; + let now: u64 = now_ms().try_into()?; + for url in urls { + stmt.execute(rusqlite::params![url, now])?; + } + } + tx.commit()?; + + Ok(()) + } + + async fn inform(self: &Self, msg: &str) { + println!("{}", msg); + let u = Url::parse_with_params( + &format!( + "https://api.telegram.org/bot{}/sendMessage", + self.telegram_token + ), + &[ + ("chat_id", self.telegram_chat_id.clone()), + ("text", msg.to_string()), + ], + ) + .unwrap(); + reqwest::get(u).await.unwrap(); + } +} + +async fn auto_cli() -> anyhow::Result<()> { + let db = connect_db(); + let auto = Auto { + pool: db, + telegram_token: env::var("TELEGRAM_BOT_TOKEN")?, + telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?, + }; + auto.inform("[auto] Empezando scrap").await; + stream::iter(Supermercado::value_variants().iter()) + .map(|s| auto.download_supermercado(s.to_owned())) + .buffer_unordered(64) + .try_collect() + .await?; + Ok(()) +} +async fn cron_cli() -> anyhow::Result<()> { + let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24)); + + loop { + interval.tick().await; + auto_cli().await.unwrap(); + } +} + use std::time::{SystemTime, UNIX_EPOCH}; mod sites; @@ -273,9 +391,14 @@ struct PrecioPoint { } fn now_sec() -> u64 { - let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - since_the_epoch.as_secs() + since_the_epoch().as_secs() +} +fn now_ms() -> u128 { + since_the_epoch().as_millis() +} + +fn since_the_epoch() -> Duration { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") } diff --git a/scraper-rs/src/sites/coto.rs b/scraper-rs/src/sites/coto.rs index 302c62a..7bfcb8a 100644 --- a/scraper-rs/src/sites/coto.rs +++ b/scraper-rs/src/sites/coto.rs @@ -1,6 +1,9 @@ -use anyhow::Context; +use anyhow::{anyhow, Context}; +use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; +use itertools::Itertools; +use reqwest::Url; -use crate::PrecioPoint; +use crate::{build_client, do_request, get_retry_policy, PrecioPoint}; pub fn parse(url: String, dom: &tl::VDom) -> Result { let ean = dom @@ -71,3 +74,62 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result url, }) } + +pub async fn get_urls() -> anyhow::Result> { + // let (sender, recv) = async_channel::unbounded(); + let client = build_client(); + let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?; + + let page_size = 100; + let handles: Vec> = stream::iter(0..29000 / page_size) + .map(|i| { + let mut u = initial.clone(); + u.query_pairs_mut() + .append_pair("No", &(i * page_size).to_string()) + .append_pair("Nrpp", &(page_size).to_string()) + .finish(); + let client = &client; + async move { + let text = get_retry_policy() + .retry(|| do_request(client, u.as_str()).and_then(|r| r.text())) + .await?; + let dom = tl::parse(&text, tl::ParserOptions::default())?; + + let list: Vec = dom + .query_selector(".product_info_container") + .unwrap() + .filter_map(|h| h.get(dom.parser())) + .filter_map(|n| n.as_tag()) + .filter_map(|t| -> Option> { + t.children() + .top() + .iter() + .filter_map(|h| h.get(dom.parser())) + .filter_map(|n| n.as_tag()) + .find(|t| t.name() == "a") + .map(|t| { + t.attributes() + .get("href") + .flatten() + .ok_or(anyhow!("No tiene href=")) + }) + .map(|s| { + Ok(Url::options() + .base_url(Some(&Url::parse("https://www.cotodigital3.com.ar")?)) + .parse(s?.as_utf8_str().as_ref())? + .to_string()) + }) + }) + .try_collect()?; + Ok::, anyhow::Error>(list) + } + }) + .buffer_unordered(8) + .try_collect() + .await?; + let mut total: Vec = vec![]; + for mut urls in handles { + total.append(&mut urls); + } + Ok(total.into_iter().unique().collect()) +}