Compare commits

..

No commits in common. "d495acfc9dd085ac483b14f6483eb68f95832e3e" and "3ec056645db681bf82b1d90769208ada162a62c5" have entirely different histories.

11 changed files with 709 additions and 976 deletions

View file

@ -1,5 +1,4 @@
{ {
"spellright.language": ["es_AR"], "spellright.language": ["es_AR"],
"spellright.documentTypes": ["markdown", "latex", "plaintext"], "spellright.documentTypes": ["markdown", "latex", "plaintext"]
"editor.formatOnSave": true
} }

915
scraper-rs/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -18,13 +18,12 @@ html-escape = "0.2.13"
itertools = "0.12.0" itertools = "0.12.0"
nanoid = "0.4.0" nanoid = "0.4.0"
quick-xml = "0.31.0" quick-xml = "0.31.0"
rand = "0.8" rand = "0.8.5"
reqwest = { version = "0.12", default-features = false, features = [ reqwest = { version = "0.11.23", default-features = false, features = [
"rustls-tls", "rustls-tls",
"gzip", "gzip",
"brotli", "brotli",
"socks", "socks",
"json",
] } ] }
rusqlite = "0.30.0" rusqlite = "0.30.0"
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.193", features = ["derive"] }

View file

@ -1,123 +0,0 @@
use super::now_sec;
use super::supermercado::Supermercado;
use super::AutoArgs;
use super::AutoTelegram;
use crate::best_selling;
use crate::db::Db;
use crate::scraper::Scraper;
use futures::Future;
use reqwest::Url;
#[derive(Clone)]
pub struct Auto {
pub db: Db,
pub telegram: Option<AutoTelegram>,
pub args: AutoArgs,
pub scraper: Scraper,
}
impl Auto {
pub async fn download_supermercado(self, supermercado: Supermercado) -> anyhow::Result<()> {
{
let t0 = now_sec();
match self.get_and_save_urls(&supermercado).await {
Ok(_) => {
self.inform(&format!(
"Downloaded url list {:?} (took {})",
&supermercado,
now_sec() - t0
))
.await
}
Err(err) => {
self.inform(&format!(
"[{:?}] FAILED url list: {:?} (took {})",
&supermercado,
err,
now_sec() - t0
))
.await
}
}
}
let links: Vec<String> = {
let mut links = self.db.get_urls_by_domain(supermercado.host()).await?;
if let Some(n) = self.args.n_products {
links.truncate(n);
}
links
};
// {
// let debug_path = PathBuf::from("debug/");
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
// let file_path = debug_path.join(format!("{}.txt", nanoid!()));
// tokio::fs::write(&file_path, &links.join("\n"))
// .await
// .unwrap();
// tracing::info!("Lista de {:?}: {:?}", &supermercado, file_path.display());
// }
{
let t0 = now_sec();
let counters = self.scraper.fetch_list(&self.db, links).await;
self.inform(&format!(
"Downloaded {:?}: {:?} (took {})",
&supermercado,
counters,
now_sec() - t0
))
.await;
}
Ok(())
}
pub async fn download_best_selling(&self) -> anyhow::Result<()> {
// let best_selling: Vec<best_selling::BestSellingRecord> =
match self
.inform_time(
"Downloaded best selling",
best_selling::get_all_best_selling(&self.db),
)
.await
{
Ok(best_selling) => {
self.db.save_best_selling(best_selling).await?;
}
Err(err) => {
self.inform(&format!("FAILED best selling: {:?}", err))
.await
}
}
Ok(())
}
pub async fn inform_time<T: Future<Output = R>, R>(&self, msg: &str, action: T) -> R {
let t0 = now_sec();
let res = action.await;
self.inform(&format!("{} (took {})", msg, now_sec() - t0))
.await;
res
}
pub async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = self.scraper.get_urls_for_supermercado(supermercado).await?;
self.db.save_producto_urls(urls).await?;
Ok(())
}
pub async fn inform(&self, msg: &str) {
tracing::info!("{}", msg);
if let Some(telegram) = &self.telegram {
let u = Url::parse_with_params(
&format!("https://api.telegram.org/bot{}/sendMessage", telegram.token),
&[
("chat_id", telegram.chat_id.clone()),
("text", msg.to_string()),
],
)
.unwrap();
reqwest::get(u).await.unwrap();
}
}
}

View file

@ -3,9 +3,8 @@ use std::collections::HashMap;
use crate::{build_client, db::Db, sites::vtex, supermercado::Supermercado}; use crate::{build_client, db::Db, sites::vtex, supermercado::Supermercado};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use clap::ValueEnum; use clap::ValueEnum;
use futures::{stream, FutureExt, StreamExt}; use futures::{stream, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools; use itertools::Itertools;
use simple_error::SimpleError;
use tracing::warn; use tracing::warn;
#[derive(ValueEnum, Clone, Debug)] #[derive(ValueEnum, Clone, Debug)]
@ -78,6 +77,10 @@ async fn try_get_best_selling_eans(
} }
} }
async fn noop<T>(t: T) -> anyhow::Result<T> {
Ok(t)
}
fn rank_eans(eans: Vec<Vec<String>>) -> Vec<String> { fn rank_eans(eans: Vec<Vec<String>>) -> Vec<String> {
let mut map: HashMap<String, usize> = HashMap::new(); let mut map: HashMap<String, usize> = HashMap::new();
for eans in eans { for eans in eans {
@ -95,49 +98,34 @@ fn rank_eans(eans: Vec<Vec<String>>) -> Vec<String> {
pub async fn get_all_best_selling(db: &Db) -> anyhow::Result<Vec<BestSellingRecord>> { pub async fn get_all_best_selling(db: &Db) -> anyhow::Result<Vec<BestSellingRecord>> {
let client = &build_client(); let client = &build_client();
let records = stream::iter(Category::value_variants())
stream::iter(Category::value_variants())
.map(|category| { .map(|category| {
stream::iter(Supermercado::value_variants()) stream::iter(Supermercado::value_variants())
.map(|supermercado| { .map(|supermercado| {
let db = db.clone();
let client = client.clone();
tokio::spawn(try_get_best_selling_eans( tokio::spawn(try_get_best_selling_eans(
client.clone(), client,
db.clone(), db,
supermercado, supermercado,
category, category,
)) ))
}) })
.buffer_unordered(5) .buffer_unordered(5)
.map(|f| f.unwrap()) .map(|f| f.unwrap())
.filter_map(|r| async { .try_filter_map(noop)
match r { .try_collect::<Vec<Vec<String>>>()
Err(err) => {
tracing::error!("Error getting best selling: {}", err);
None
}
Ok(v) => v,
}
})
.collect::<Vec<Vec<String>>>()
.map(|r| { .map(|r| {
let ranked = rank_eans(r); r.map(rank_eans).map(|eans| BestSellingRecord {
if ranked.is_empty() {
return None;
}
Some(BestSellingRecord {
fetched_at: Utc::now(), fetched_at: Utc::now(),
category: category.clone(), category: category.clone(),
eans: ranked, eans,
}) })
}) })
}) })
.buffer_unordered(5) .buffer_unordered(5)
.boxed() .boxed()
.filter_map(|f| async { f }) .try_collect()
.collect::<Vec<BestSellingRecord>>() .await
.await;
if records.len() < Category::value_variants().len() {
Err(SimpleError::new("Too few BestSellingRecords").into())
} else {
Ok(records)
}
} }

View file

@ -1,12 +1,12 @@
use again::RetryPolicy; use again::RetryPolicy;
use chrono::{DateTime, Utc};
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use cron::Schedule; use cron::Schedule;
use db::Db; use db::Db;
use futures::{future, TryFutureExt}; use futures::{future, stream, Future, StreamExt, TryFutureExt};
use reqwest::{header::HeaderMap, IntoUrl, StatusCode}; use reqwest::{header::HeaderMap, StatusCode, Url};
use scraper::Scraper; use simple_error::{bail, SimpleError};
use simple_error::SimpleError;
use std::{ use std::{
env::{self}, env::{self},
fs, fs,
@ -17,10 +17,6 @@ use thiserror::Error;
mod supermercado; mod supermercado;
use supermercado::Supermercado; use supermercado::Supermercado;
mod auto;
use auto::Auto;
mod proxy_client;
mod scraper;
#[derive(Parser)] // requires `derive` feature #[derive(Parser)] // requires `derive` feature
enum Args { enum Args {
@ -58,7 +54,7 @@ struct AutoArgs {
} }
#[tokio::main] #[tokio::main]
async fn main() -> () { async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
match Args::parse() { match Args::parse() {
@ -70,12 +66,11 @@ async fn main() -> () {
Args::Auto(a) => auto_cli(a).await, Args::Auto(a) => auto_cli(a).await,
Args::Cron(_) => cron_cli().await, Args::Cron(_) => cron_cli().await,
} }
.unwrap()
} }
async fn scrap_url_cli(url: String) -> anyhow::Result<()> { async fn scrap_url_cli(url: String) -> anyhow::Result<()> {
let scraper = Scraper::from_env().await?; let client = build_client();
let res = scraper.fetch_and_scrap(url.clone()).await; let res = fetch_and_parse(&client, url.clone()).await;
println!("Result: {:#?}", res); println!("Result: {:#?}", res);
res.map(|_| ()) res.map(|_| ())
@ -99,13 +94,37 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let db = Db::connect().await?; let db = Db::connect().await?;
let scraper = Scraper::from_env().await?; let counters = fetch_list(&db, links).await;
let counters = scraper.fetch_list(&db, links).await;
println!("Finished: {:?}", counters); println!("Finished: {:?}", counters);
Ok(()) Ok(())
} }
async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let client = build_client();
stream::iter(links)
.map(|url| {
let db = db.clone();
let client = client.clone();
tokio::spawn(fetch_and_save(client, url, db))
})
.buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| {
let ret = y.unwrap();
future::ready(Counters {
success: x.success + ret.success,
errored: x.errored + ret.errored,
skipped: x.skipped + ret.skipped,
})
})
.await
}
mod db; mod db;
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -115,6 +134,29 @@ struct Counters {
skipped: u64, skipped: u64,
} }
async fn fetch_and_save(client: reqwest::Client, url: String, db: Db) -> Counters {
let res = fetch_and_parse(&client, url.clone()).await;
let mut counters = Counters::default();
match res {
Ok(res) => {
counters.success += 1;
db.insert_precio(res).await.unwrap();
}
Err(err) => {
match err.downcast_ref::<reqwest::Error>() {
Some(e) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url);
}
}
counters
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum FetchError { enum FetchError {
#[error("parse error")] #[error("parse error")]
@ -140,14 +182,28 @@ fn build_client() -> reqwest::Client {
.build() .build()
.unwrap() .unwrap()
} }
pub async fn do_request<U: IntoUrl>( fn build_coto_client() -> reqwest::Client {
client: &reqwest::Client, reqwest::ClientBuilder::default()
url: U, .timeout(Duration::from_secs(300))
) -> reqwest::Result<reqwest::Response> { .connect_timeout(Duration::from_secs(150))
.default_headers(build_header_map())
.build()
.unwrap()
}
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
let request = client.get(url).build()?; let request = client.get(url).build()?;
let response = client.execute(request).await?.error_for_status()?; let response = client.execute(request).await?.error_for_status()?;
Ok(response) Ok(response)
} }
async fn request_and_body(client: &reqwest::Client, url: &str) -> reqwest::Result<String> {
let res = do_request(client, url).await?;
res.text().await
}
pub async fn fetch_body(client: &reqwest::Client, url: &str) -> reqwest::Result<String> {
get_fetch_retry_policy()
.retry_if(|| request_and_body(client, url), retry_if_wasnt_not_found)
.await
}
pub fn get_fetch_retry_policy() -> again::RetryPolicy { pub fn get_fetch_retry_policy() -> again::RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(300)) RetryPolicy::exponential(Duration::from_millis(300))
@ -166,17 +222,51 @@ pub fn get_parse_retry_policy() -> again::RetryPolicy {
pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool { pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool {
!err.status().is_some_and(|s| s == StatusCode::NOT_FOUND) !err.status().is_some_and(|s| s == StatusCode::NOT_FOUND)
} }
pub fn anyhow_retry_if_wasnt_not_found(err: &anyhow::Error) -> bool {
match err.downcast_ref::<reqwest::Error>() { #[tracing::instrument(skip(client))]
Some(e) => retry_if_wasnt_not_found(e), async fn fetch_and_parse(
None => true, client: &reqwest::Client,
url: String,
) -> Result<PrecioPoint, anyhow::Error> {
async fn fetch_and_scrap(
client: &reqwest::Client,
url: String,
) -> Result<PrecioPoint, anyhow::Error> {
let body = fetch_body(client, &url).await?;
let maybe_point = { scrap_url(client, url, &body).await };
let point = match maybe_point {
Ok(p) => Ok(p),
Err(err) => {
let now: DateTime<Utc> = Utc::now();
// let debug_path = PathBuf::from(format!("debug-{}/", now.format("%Y-%m-%d")));
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
// let file_path = debug_path.join(format!("{}.html", nanoid!()));
// tokio::fs::write(&file_path, &body).await.unwrap();
// tracing::debug!(error=%err, "Failed to parse, saved body at {}",file_path.display());
tracing::debug!(error=%err, "Failed to parse");
Err(err)
}
}?;
Ok(point)
} }
get_parse_retry_policy()
.retry_if(
|| fetch_and_scrap(client, url.clone()),
|err: &anyhow::Error| match err.downcast_ref::<reqwest::Error>() {
Some(e) => !e.status().is_some_and(|s| s == StatusCode::NOT_FOUND),
None => true,
},
)
.await
} }
async fn parse_file_cli(file_path: String) -> anyhow::Result<()> { async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
let file = tokio::fs::read_to_string(file_path).await?; let file = tokio::fs::read_to_string(file_path).await?;
let scraper = Scraper::from_env().await?; let client = build_client();
let url = { let url = {
let dom = tl::parse(&file, tl::ParserOptions::default())?; let dom = tl::parse(&file, tl::ParserOptions::default())?;
@ -191,13 +281,12 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
}; };
println!("URL: {}", &url); println!("URL: {}", &url);
println!("{:?}", scraper.scrap_url(url, &file).await); println!("{:?}", scrap_url(&client, url, &file).await);
Ok(()) Ok(())
} }
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> { async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
let scraper = Scraper::from_env().await?; let urls = get_urls(&supermercado).await?;
let urls = scraper.get_urls_for_supermercado(&supermercado).await?;
urls.iter().for_each(|s| { urls.iter().for_each(|s| {
println!("{}", s); println!("{}", s);
}); });
@ -205,12 +294,137 @@ async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
Ok(match supermercado {
Supermercado::Dia => sites::dia::get_urls().await?,
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
Supermercado::Coto => sites::coto::get_urls().await?,
Supermercado::Farmacity => sites::farmacity::get_urls().await?,
})
}
async fn scrap_url(
client: &reqwest::Client,
url: String,
body: &str,
) -> anyhow::Result<PrecioPoint> {
let url_p = Url::parse(&url).unwrap();
match url_p.host_str().unwrap() {
"www.carrefour.com.ar" => {
sites::carrefour::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
}
"diaonline.supermercadosdia.com.ar" => {
sites::dia::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
}
"www.cotodigital3.com.ar" => {
sites::coto::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
}
"www.jumbo.com.ar" => sites::jumbo::scrap(client, url, body).await,
"www.farmacity.com" => {
sites::farmacity::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
}
s => bail!("Unknown host {}", s),
}
}
#[derive(Clone)] #[derive(Clone)]
struct AutoTelegram { struct AutoTelegram {
token: String, token: String,
chat_id: String, chat_id: String,
} }
#[derive(Clone)]
struct Auto {
db: Db,
telegram: Option<AutoTelegram>,
args: AutoArgs,
}
impl Auto {
async fn download_supermercado(self, supermercado: Supermercado) -> anyhow::Result<()> {
{
let t0 = now_sec();
match self.get_and_save_urls(&supermercado).await {
Ok(_) => {
self.inform(&format!(
"Downloaded url list {:?} (took {})",
&supermercado,
now_sec() - t0
))
.await
}
Err(err) => {
self.inform(&format!(
"[{:?}] FAILED url list: {:?} (took {})",
&supermercado,
err,
now_sec() - t0
))
.await
}
}
}
let links: Vec<String> = {
let mut links = self.db.get_urls_by_domain(supermercado.host()).await?;
if let Some(n) = self.args.n_products {
links.truncate(n);
}
links
};
// {
// let debug_path = PathBuf::from("debug/");
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
// let file_path = debug_path.join(format!("{}.txt", nanoid!()));
// tokio::fs::write(&file_path, &links.join("\n"))
// .await
// .unwrap();
// tracing::info!("Lista de {:?}: {:?}", &supermercado, file_path.display());
// }
{
let t0 = now_sec();
let counters = fetch_list(&self.db, links).await;
self.inform(&format!(
"Downloaded {:?}: {:?} (took {})",
&supermercado,
counters,
now_sec() - t0
))
.await;
}
Ok(())
}
async fn inform_time<T: Future<Output = R>, R>(&self, msg: &str, action: T) -> R {
let t0 = now_sec();
let res = action.await;
self.inform(&format!("{} (took {})", msg, now_sec() - t0))
.await;
res
}
async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?;
self.db.save_producto_urls(urls).await?;
Ok(())
}
async fn inform(&self, msg: &str) {
tracing::info!("{}", msg);
if let Some(telegram) = &self.telegram {
let u = Url::parse_with_params(
&format!("https://api.telegram.org/bot{}/sendMessage", telegram.token),
&[
("chat_id", telegram.chat_id.clone()),
("text", msg.to_string()),
],
)
.unwrap();
reqwest::get(u).await.unwrap();
}
}
}
async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> { async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
let auto = { let auto = {
let db = Db::connect().await?; let db = Db::connect().await?;
@ -226,12 +440,7 @@ async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
} }
} }
}; };
Auto { Auto { db, telegram, args }
db,
telegram,
args,
scraper: Scraper::from_env().await?,
}
}; };
auto.inform("[auto] Empezando scrap").await; auto.inform("[auto] Empezando scrap").await;
@ -243,7 +452,7 @@ async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
let handles: Vec<_> = supermercados let handles: Vec<_> = supermercados
.iter() .iter()
.map(|s| { .map(|s| {
let x = *s; let x = s.clone();
tokio::spawn( tokio::spawn(
auto.clone() auto.clone()
.download_supermercado(s.to_owned()) .download_supermercado(s.to_owned())
@ -256,7 +465,13 @@ async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
future::try_join_all(handles).await?; future::try_join_all(handles).await?;
auto.inform("[auto] Download supermercados finished").await; auto.inform("[auto] Download supermercados finished").await;
auto.download_best_selling().await?; let best_selling = auto
.inform_time(
"Downloaded best selling",
best_selling::get_all_best_selling(&auto.db),
)
.await?;
auto.db.save_best_selling(best_selling).await?;
Ok(()) Ok(())
} }

View file

@ -1,58 +0,0 @@
use std::time::Duration;
use itertools::Itertools;
use rand::Rng;
use reqwest::{IntoUrl, Url};
use crate::build_header_map;
#[derive(Debug, Clone)]
pub struct ProxyClient {
// proxies: Vec<Url>,
clients: Vec<reqwest::Client>,
}
impl ProxyClient {
pub fn from_proxy_list(proxies: &str) -> anyhow::Result<Self> {
let proxies = Self::parse_proxy_list(proxies)?;
let clients = if proxies.is_empty() {
tracing::warn!("No proxies available; using no proxy");
vec![Self::client_builder().build()?]
} else {
proxies
.clone()
.into_iter()
.map(Self::build_client_with_proxy)
.try_collect()?
};
Ok(Self { clients })
}
fn parse_proxy_list(proxies: &str) -> anyhow::Result<Vec<Url>> {
Ok(proxies
.split("\n")
.filter(|s| !s.trim().is_empty())
.map(Url::parse)
.try_collect()?)
}
fn client_builder() -> reqwest::ClientBuilder {
reqwest::ClientBuilder::default()
.timeout(Duration::from_secs(300))
.connect_timeout(Duration::from_secs(150))
.default_headers(build_header_map())
}
fn build_client_with_proxy(proxy: Url) -> reqwest::Result<reqwest::Client> {
Self::client_builder()
.proxy(reqwest::Proxy::all(proxy)?)
.build()
}
pub async fn do_request(
&self,
url: impl IntoUrl + Clone,
) -> reqwest::Result<reqwest::Response> {
let client = self.clients[rand::thread_rng().gen_range(0..self.clients.len())].clone();
let req = client.get(url.clone()).build()?;
client.execute(req).await
}
}

View file

@ -1,175 +0,0 @@
use std::env;
use futures::{future, stream, StreamExt};
use reqwest::{StatusCode, Url};
use simple_error::bail;
use tokio::fs;
use crate::{
anyhow_retry_if_wasnt_not_found, build_client, db::Db, get_fetch_retry_policy,
get_parse_retry_policy, proxy_client::ProxyClient, sites, supermercado::Supermercado, Counters,
PrecioPoint,
};
#[derive(Debug, Clone)]
pub struct Scraper {
default_client: reqwest::Client,
proxy_client: ProxyClient,
}
impl Scraper {
pub async fn from_env() -> anyhow::Result<Self> {
let proxy_list = match env::var("PROXY_LIST") {
Ok(list) => list,
Err(_) => match env::var("PROXY_LIST_PATH") {
Ok(path) => fs::read_to_string(path).await?,
Err(_) => "".to_owned(),
},
};
Self::build(&proxy_list)
}
pub fn build(proxy_list: &str) -> anyhow::Result<Self> {
Ok(Self {
default_client: build_client(),
proxy_client: ProxyClient::from_proxy_list(proxy_list)?,
})
}
pub async fn get_urls_for_supermercado(
&self,
supermercado: &Supermercado,
) -> anyhow::Result<Vec<String>> {
match supermercado {
Supermercado::Dia => sites::dia::get_urls().await,
Supermercado::Jumbo => sites::jumbo::get_urls().await,
Supermercado::Carrefour => sites::carrefour::get_urls().await,
Supermercado::Coto => sites::coto::get_urls(&self.proxy_client).await,
Supermercado::Farmacity => sites::farmacity::get_urls().await,
}
}
#[tracing::instrument(skip(self))]
pub async fn fetch_and_scrap(&self, url: String) -> Result<PrecioPoint, anyhow::Error> {
async fn fetch_and_scrap(
scraper: &Scraper,
url: String,
) -> Result<PrecioPoint, anyhow::Error> {
let body = scraper.fetch_body(&url).await?;
let maybe_point = { scraper.scrap_url(url, &body).await };
let point = match maybe_point {
Ok(p) => Ok(p),
Err(err) => {
// let now: DateTime<Utc> = Utc::now();
// let debug_path = PathBuf::from(format!("debug-{}/", now.format("%Y-%m-%d")));
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
// let file_path = debug_path.join(format!("{}.html", nanoid!()));
// tokio::fs::write(&file_path, &body).await.unwrap();
// tracing::debug!(error=%err, "Failed to parse, saved body at {}",file_path.display());
tracing::debug!(error=%err, "Failed to parse");
Err(err)
}
}?;
Ok(point)
}
get_parse_retry_policy()
.retry_if(
|| fetch_and_scrap(self, url.clone()),
anyhow_retry_if_wasnt_not_found,
)
.await
}
async fn fetch_body(&self, url_string: &str) -> anyhow::Result<String> {
let url = Url::parse(url_string)?;
get_fetch_retry_policy()
.retry_if(
|| self.request_and_body(url.clone()),
anyhow_retry_if_wasnt_not_found,
)
.await
}
async fn request_and_body(&self, url: Url) -> anyhow::Result<String> {
let res = match Supermercado::from_url(&url) {
Some(Supermercado::Coto) => self.proxy_client.do_request(url).await?,
_ => self
.default_client
.execute(self.default_client.get(url).build()?)
.await?
.error_for_status()?,
};
Ok(res.text().await?)
}
pub async fn fetch_and_save(&self, url: String, db: Db) -> Counters {
let res = self.fetch_and_scrap(url.clone()).await;
let mut counters = Counters::default();
match res {
Ok(res) => {
counters.success += 1;
db.insert_precio(res).await.unwrap();
}
Err(err) => {
match err.downcast_ref::<reqwest::Error>() {
Some(e) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url);
}
}
counters
}
pub async fn fetch_list(&self, db: &Db, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
stream::iter(links)
.map(|url| {
let db = db.clone();
let scraper = self.clone();
tokio::spawn(async move { scraper.fetch_and_save(url, db).await })
})
.buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| {
let ret = y.unwrap();
future::ready(Counters {
success: x.success + ret.success,
errored: x.errored + ret.errored,
skipped: x.skipped + ret.skipped,
})
})
.await
}
pub async fn scrap_url(&self, url: String, res_body: &str) -> anyhow::Result<PrecioPoint> {
let url_p = Url::parse(&url).unwrap();
match Supermercado::from_url(&url_p) {
Some(Supermercado::Carrefour) => {
sites::carrefour::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
}
Some(Supermercado::Dia) => {
sites::dia::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
}
Some(Supermercado::Coto) => {
sites::coto::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
}
Some(Supermercado::Jumbo) => {
sites::jumbo::scrap(&self.default_client, url, res_body).await
}
Some(Supermercado::Farmacity) => {
sites::farmacity::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
}
None => bail!("Unknown URL host {}", url),
}
}
}

View file

@ -1,10 +1,10 @@
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools; use itertools::Itertools;
use reqwest::Url; use reqwest::Url;
use crate::{ use crate::{
anyhow_retry_if_wasnt_not_found, get_fetch_retry_policy, proxy_client::ProxyClient, PrecioPoint, build_client, build_coto_client, do_request, get_fetch_retry_policy, retry_if_wasnt_not_found, PrecioPoint
}; };
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> { pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
@ -78,10 +78,11 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
}) })
} }
pub async fn get_urls(proxy_client: &ProxyClient) -> anyhow::Result<Vec<String>> { pub async fn get_urls() -> anyhow::Result<Vec<String>> {
let client = build_coto_client();
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?; let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
let page_size = 100; let page_size = 50;
let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size) let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size)
.map(|i| { .map(|i| {
let mut u = initial.clone(); let mut u = initial.clone();
@ -89,21 +90,12 @@ pub async fn get_urls(proxy_client: &ProxyClient) -> anyhow::Result<Vec<String>>
.append_pair("No", &(i * page_size).to_string()) .append_pair("No", &(i * page_size).to_string())
.append_pair("Nrpp", &(page_size).to_string()) .append_pair("Nrpp", &(page_size).to_string())
.finish(); .finish();
let client = &client;
async move { async move {
let text: String = get_fetch_retry_policy() let text = get_fetch_retry_policy()
.retry_if( .retry_if(
|| { || do_request(client, u.as_str()).and_then(|r| r.text()),
async fn asdf( retry_if_wasnt_not_found,
proxy_client: &ProxyClient,
url: Url,
) -> anyhow::Result<String> {
let res = proxy_client.do_request(url).await?.error_for_status()?;
Ok(res.text().await?)
}
let url = u.clone();
asdf(proxy_client, url)
},
anyhow_retry_if_wasnt_not_found,
) )
.await?; .await?;
let dom = tl::parse(&text, tl::ParserOptions::default())?; let dom = tl::parse(&text, tl::ParserOptions::default())?;
@ -134,7 +126,6 @@ pub async fn get_urls(proxy_client: &ProxyClient) -> anyhow::Result<Vec<String>>
}) })
}) })
.try_collect()?; .try_collect()?;
tracing::debug!("got {} products", list.len());
Ok::<Vec<String>, anyhow::Error>(list) Ok::<Vec<String>, anyhow::Error>(list)
} }
}) })

View file

@ -81,8 +81,8 @@ pub struct OffersLd {
pub struct OfferLd { pub struct OfferLd {
#[serde(rename = "@type")] #[serde(rename = "@type")]
_type: OfferTypeLd, _type: OfferTypeLd,
// pub price: f64, pub price: f64,
// pub price_currency: String, pub price_currency: String,
pub availability: AvailabilityLd, pub availability: AvailabilityLd,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -207,14 +207,14 @@ pub async fn get_best_selling_by_category(
.append_pair("extensions", &{ .append_pair("extensions", &{
let variables_obj = json!({"hideUnavailableItems":true,"skusFilter":"FIRST_AVAILABLE","simulationBehavior":"default","installmentCriteria":"MAX_WITHOUT_INTEREST","productOriginVtex":false,"map":"c","query":query,"orderBy":"OrderByTopSaleDESC","from":0,"to":99,"selectedFacets": let variables_obj = json!({"hideUnavailableItems":true,"skusFilter":"FIRST_AVAILABLE","simulationBehavior":"default","installmentCriteria":"MAX_WITHOUT_INTEREST","productOriginVtex":false,"map":"c","query":query,"orderBy":"OrderByTopSaleDESC","from":0,"to":99,"selectedFacets":
query.split('/').map(|f| json!({"key":"c","value":f})).collect::<Vec<_>>() query.split('/').map(|f| json!({"key":"c","value":f})).collect::<Vec<_>>()
,"facetsBehavior":"Static","categoryTreeBehavior":"default","withFacets":false,"showSponsored":false}); ,"facetsBehavior":"Static","categoryTreeBehavior":"default","withFacets":false});
let b64=base64::prelude::BASE64_STANDARD.encode(variables_obj.to_string()); let b64=base64::prelude::BASE64_STANDARD.encode(variables_obj.to_string());
format!( format!(
r#"{{ r#"{{
"persistedQuery": {{ "persistedQuery": {{
"version": 1, "version": 1,
"sha256Hash": "fd92698fe375e8e4fa55d26fa62951d979b790fcf1032a6f02926081d199f550", "sha256Hash": "40b843ca1f7934d20d05d334916220a0c2cae3833d9f17bcb79cdd2185adceac",
"sender": "vtex.store-resources@0.x", "sender": "vtex.store-resources@0.x",
"provider": "vtex.search-graphql@0.x" "provider": "vtex.search-graphql@0.x"
}}, }},
@ -225,30 +225,19 @@ pub async fn get_best_selling_by_category(
url url
}; };
let body = fetch_body(client, url.as_str()).await?; let body = fetch_body(client, url.as_str()).await?;
tracing::debug!("best selling body: {}", body); let urls: Vec<String> = serde_json::from_str::<serde_json::Value>(&body)?
let json = &serde_json::from_str::<serde_json::Value>(&body)?;
if let Some(errors_array) = json.pointer("/errors") {
if let Some(error_messages) = errors_array.as_array().map(|a| {
a.into_iter()
.map(|obj| obj.get("message").and_then(|v| v.as_str()))
.collect_vec()
}) {
bail!("Errors from API: {:?}", error_messages);
} else {
bail!("Unknown error from API")
}
}
let urls: Vec<String> = json
.pointer("/data/productSearch/products") .pointer("/data/productSearch/products")
.and_then(|v| v.as_array()) .and_then(|v| v.as_array())
.ok_or(SimpleError::new("failed to get best selling product urls"))? .map(|a| {
.iter() a.iter()
.filter_map(|p| { .filter_map(|p| {
p.get("link") p.get("link")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.map(|s| format!("https://{}{}", domain, s)) .map(|s| format!("https://{}{}", domain, s))
})
.collect()
}) })
.collect(); .ok_or(SimpleError::new("failed to get best selling product urls"))?;
if urls.len() < 2 { if urls.len() < 2 {
bail!("Too few best selling"); bail!("Too few best selling");

View file

@ -1,5 +1,4 @@
use clap::ValueEnum; use clap::ValueEnum;
use reqwest::Url;
#[derive(ValueEnum, Clone, Debug, Copy)] #[derive(ValueEnum, Clone, Debug, Copy)]
pub enum Supermercado { pub enum Supermercado {
@ -19,14 +18,4 @@ impl Supermercado {
Self::Farmacity => "www.farmacity.com", Self::Farmacity => "www.farmacity.com",
} }
} }
pub fn from_url(url: &Url) -> Option<Self> {
match url.host_str().unwrap() {
"www.carrefour.com.ar" => Some(Self::Carrefour),
"diaonline.supermercadosdia.com.ar" => Some(Self::Dia),
"www.cotodigital3.com.ar" => Some(Self::Coto),
"www.jumbo.com.ar" => Some(Self::Jumbo),
"www.farmacity.com" => Some(Self::Farmacity),
_ => None,
}
}
} }