probemos así

This commit is contained in:
Cat /dev/Nulo 2024-01-12 16:21:20 -03:00
parent 75b2297a07
commit 6b315f9af2

View file

@ -1,7 +1,6 @@
use again::RetryPolicy;
use async_channel::Receiver;
use clap::{Parser, ValueEnum};
use futures::future;
use futures::{future, stream, StreamExt};
use nanoid::nanoid;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
@ -78,32 +77,30 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
}
async fn fetch_list(pool: &Pool<SqliteConnectionManager>, links: Vec<String>) -> Counters {
let (sender, receiver) = async_channel::bounded::<String>(1);
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let handles = (1..n_coroutines)
.map(|_| {
let rx = receiver.clone();
let client = build_client();
stream::iter(links)
.map(|url| {
let pool = pool.clone();
tokio::spawn(worker(rx, pool))
let client = client.clone();
tokio::spawn(fetch_and_save(client, url, pool))
})
.collect::<Vec<_>>();
for link in links {
sender.send_blocking(link).unwrap();
}
sender.close();
let mut counters = Counters::default();
for handle in handles {
let c = handle.await.unwrap();
counters.success += c.success;
counters.errored += c.errored;
counters.skipped += c.skipped;
}
counters
// https://github.com/rust-lang/rust/issues/89976#issuecomment-1073115246
.boxed()
.buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| {
let ret = y.unwrap();
future::ready(Counters {
success: x.success + ret.success,
errored: x.errored + ret.errored,
skipped: x.skipped + ret.skipped,
})
})
.await
}
fn connect_db() -> Pool<SqliteConnectionManager> {
@ -124,42 +121,40 @@ struct Counters {
skipped: u64,
}
async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Counters {
let client = build_client();
async fn fetch_and_save(
client: reqwest::Client,
url: String,
pool: Pool<SqliteConnectionManager>,
) -> Counters {
let res = fetch_and_parse(&client, url.clone()).await;
let mut counters = Counters::default();
while let Ok(url) = rx.recv().await {
let client = &client;
let res = fetch_and_parse(client, url.clone()).await;
match res {
Ok(res) => {
counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![
res.ean,
res.fetched_at,
res.precio_centavos,
res.in_stock,
res.url,
None::<String>,
res.parser_version,
res.name,
res.image_url,
]).unwrap();
}
Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::Http(e)) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
match res {
Ok(res) => {
counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![
res.ean,
res.fetched_at,
res.precio_centavos,
res.in_stock,
res.url,
None::<String>,
res.parser_version,
res.name,
res.image_url,
]).unwrap();
}
Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::Http(e)) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url);
},
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url);
}
}
counters
}
@ -370,8 +365,8 @@ async fn cron_cli() -> anyhow::Result<()> {
let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24));
loop {
interval.tick().await;
auto_cli().await.unwrap();
interval.tick().await;
}
}