This commit is contained in:
Cat /dev/Nulo 2024-01-12 13:51:32 -03:00
parent 0144a56158
commit 75b2297a07
5 changed files with 21 additions and 14 deletions

View file

@ -1,7 +1,7 @@
use again::RetryPolicy; use again::RetryPolicy;
use async_channel::Receiver; use async_channel::Receiver;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use futures::{stream, StreamExt, TryStreamExt}; use futures::future;
use nanoid::nanoid; use nanoid::nanoid;
use r2d2::Pool; use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager; use r2d2_sqlite::SqliteConnectionManager;
@ -129,7 +129,8 @@ async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Co
let mut counters = Counters::default(); let mut counters = Counters::default();
while let Ok(url) = rx.recv().await { while let Ok(url) = rx.recv().await {
let res = fetch_and_parse(&client, url.clone()).await; let client = &client;
let res = fetch_and_parse(client, url.clone()).await;
match res { match res {
Ok(res) => { Ok(res) => {
counters.success += 1; counters.success += 1;
@ -274,13 +275,14 @@ async fn scrap_url(
} }
} }
#[derive(Clone)]
struct Auto { struct Auto {
pool: Pool<SqliteConnectionManager>, pool: Pool<SqliteConnectionManager>,
telegram_token: String, telegram_token: String,
telegram_chat_id: String, telegram_chat_id: String,
} }
impl Auto { impl Auto {
async fn download_supermercado(self: &Self, supermercado: Supermercado) -> anyhow::Result<()> { async fn download_supermercado(self: Self, supermercado: Supermercado) -> anyhow::Result<()> {
{ {
let t0 = now_sec(); let t0 = now_sec();
self.get_and_save_urls(&supermercado).await?; self.get_and_save_urls(&supermercado).await?;
@ -357,11 +359,11 @@ async fn auto_cli() -> anyhow::Result<()> {
telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?, telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?,
}; };
auto.inform("[auto] Empezando scrap").await; auto.inform("[auto] Empezando scrap").await;
stream::iter(Supermercado::value_variants().iter()) let handles: Vec<_> = Supermercado::value_variants()
.map(|s| auto.download_supermercado(s.to_owned())) .iter()
.buffer_unordered(64) .map(|s| tokio::spawn(auto.clone().download_supermercado(s.to_owned())))
.try_collect() .collect();
.await?; future::try_join_all(handles).await?;
Ok(()) Ok(())
} }
async fn cron_cli() -> anyhow::Result<()> { async fn cron_cli() -> anyhow::Result<()> {

View file

@ -80,5 +80,5 @@ pub async fn get_urls() -> anyhow::Result<Vec<String>> {
"https://www.carrefour.com.ar/sitemap/product-8.xml", "https://www.carrefour.com.ar/sitemap/product-8.xml",
"https://www.carrefour.com.ar/sitemap/product-9.xml", "https://www.carrefour.com.ar/sitemap/product-9.xml",
]; ];
vtex::get_urls_from_sitemap(&urls).await vtex::get_urls_from_sitemap(urls).await
} }

View file

@ -49,5 +49,5 @@ pub async fn get_urls() -> anyhow::Result<Vec<String>> {
"https://diaonline.supermercadosdia.com.ar/sitemap/product-4.xml", "https://diaonline.supermercadosdia.com.ar/sitemap/product-4.xml",
"https://diaonline.supermercadosdia.com.ar/sitemap/product-5.xml", "https://diaonline.supermercadosdia.com.ar/sitemap/product-5.xml",
]; ];
vtex::get_urls_from_sitemap(&urls).await vtex::get_urls_from_sitemap(urls).await
} }

View file

@ -110,5 +110,5 @@ pub async fn get_urls() -> anyhow::Result<Vec<String>> {
"https://www.jumbo.com.ar/sitemap/product-8.xml", "https://www.jumbo.com.ar/sitemap/product-8.xml",
"https://www.jumbo.com.ar/sitemap/product-9.xml", "https://www.jumbo.com.ar/sitemap/product-9.xml",
]; ];
vtex::get_urls_from_sitemap(&urls).await vtex::get_urls_from_sitemap(urls).await
} }

View file

@ -117,21 +117,26 @@ pub fn parse_urls_from_sitemap(sitemap: &str) -> anyhow::Result<Vec<String>> {
.collect()) .collect())
} }
pub async fn get_urls_from_sitemap<'a>(sitemaps: &[&str]) -> anyhow::Result<Vec<String>> { pub async fn get_urls_from_sitemap(sitemaps: Vec<&str>) -> anyhow::Result<Vec<String>> {
let mut total: Vec<String> = vec![]; let mut total: Vec<String> = vec![];
let client = build_client(); let client = build_client();
let handles = stream::iter(sitemaps) let handles = stream::iter(sitemaps)
.map(|url| { .map(|url| {
let client = &client; let client = client.clone();
let url = url.to_string();
async move { async move {
let client = client;
let url = url;
let text = get_retry_policy() let text = get_retry_policy()
.retry(|| do_request(client, url)) .retry(|| do_request(&client, &url))
.await? .await?
.text() .text()
.await?; .await?;
parse_urls_from_sitemap(&text) parse_urls_from_sitemap(&text)
} }
}) })
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(8) .buffer_unordered(8)
.try_collect::<Vec<_>>() .try_collect::<Vec<_>>()
.await?; .await?;