borrar r2d2, usar deadpool

debería funcionar bien con Tokio
This commit is contained in:
Cat /dev/Nulo 2024-01-12 17:00:01 -03:00
parent dbb149aef3
commit 028dc30606
3 changed files with 112 additions and 90 deletions

97
scraper-rs/Cargo.lock generated
View file

@ -142,6 +142,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-trait"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -318,6 +329,47 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "deadpool"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]]
name = "deadpool-sqlite"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8010e36e12f3be22543a5e478b4af20aeead9a700dd69581a5e050a070fc22c"
dependencies = [
"deadpool",
"deadpool-sync",
"rusqlite",
]
[[package]]
name = "deadpool-sync"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8db70494c13cae4ce67b4b4dafdaf828cf0df7237ab5b9e2fcabee4965d0a0a"
dependencies = [
"deadpool-runtime",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.9.0" version = "1.9.0"
@ -917,28 +969,6 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.7.3" version = "0.7.3"
@ -1142,15 +1172,6 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -1165,11 +1186,11 @@ dependencies = [
"anyhow", "anyhow",
"async-channel", "async-channel",
"clap", "clap",
"deadpool",
"deadpool-sqlite",
"futures", "futures",
"itertools", "itertools",
"nanoid", "nanoid",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"rusqlite", "rusqlite",
@ -1558,16 +1579,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom 0.2.11",
"rand 0.8.5",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View file

@ -10,11 +10,11 @@ again = "0.1.2"
anyhow = "1.0.79" anyhow = "1.0.79"
async-channel = "2.1.1" async-channel = "2.1.1"
clap = { version = "4.4.15", features = ["derive"] } clap = { version = "4.4.15", features = ["derive"] }
deadpool = "0.10.0"
deadpool-sqlite = "0.7.0"
futures = "0.3.30" futures = "0.3.30"
itertools = "0.12.0" itertools = "0.12.0"
nanoid = "0.4.0" nanoid = "0.4.0"
r2d2 = "0.8.10"
r2d2_sqlite = "0.23.0"
rand = "0.8.5" rand = "0.8.5"
# lol_html = "1.2.0" # lol_html = "1.2.0"
reqwest = { version = "0.11.23", default-features = false, features = [ reqwest = { version = "0.11.23", default-features = false, features = [

View file

@ -1,9 +1,8 @@
use again::RetryPolicy; use again::RetryPolicy;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use deadpool_sqlite::Pool;
use futures::{future, stream, StreamExt}; use futures::{future, stream, StreamExt};
use nanoid::nanoid; use nanoid::nanoid;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use reqwest::{StatusCode, Url}; use reqwest::{StatusCode, Url};
use simple_error::{bail, SimpleError}; use simple_error::{bail, SimpleError};
use std::{ use std::{
@ -76,7 +75,7 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn fetch_list(pool: &Pool<SqliteConnectionManager>, links: Vec<String>) -> Counters { async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES") let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>()) .map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número"); .expect("N_COROUTINES no es un número");
@ -103,10 +102,10 @@ async fn fetch_list(pool: &Pool<SqliteConnectionManager>, links: Vec<String>) ->
.await .await
} }
fn connect_db() -> Pool<SqliteConnectionManager> { fn connect_db() -> Pool {
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string()); let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let manager = SqliteConnectionManager::file(db_path); let cfg = deadpool_sqlite::Config::new(db_path);
let pool = Pool::new(manager).unwrap(); let pool = cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap();
pool pool
} }
@ -121,27 +120,26 @@ struct Counters {
skipped: u64, skipped: u64,
} }
async fn fetch_and_save( async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters {
client: reqwest::Client,
url: String,
pool: Pool<SqliteConnectionManager>,
) -> Counters {
let res = fetch_and_parse(&client, url.clone()).await; let res = fetch_and_parse(&client, url.clone()).await;
let mut counters = Counters::default(); let mut counters = Counters::default();
match res { match res {
Ok(res) => { Ok(res) => {
counters.success += 1; counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![ pool.get().await.unwrap().interact(move |conn| conn.execute(
res.ean, "INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
res.fetched_at, rusqlite::params![
res.precio_centavos, res.ean,
res.in_stock, res.fetched_at,
res.url, res.precio_centavos,
None::<String>, res.in_stock,
res.parser_version, res.url,
res.name, None::<String>,
res.image_url, res.parser_version,
]).unwrap(); res.name,
res.image_url,
]
)).await.unwrap().unwrap();
} }
Err(err) => { Err(err) => {
match err.downcast_ref::<FetchError>() { match err.downcast_ref::<FetchError>() {
@ -272,7 +270,7 @@ async fn scrap_url(
#[derive(Clone)] #[derive(Clone)]
struct Auto { struct Auto {
pool: Pool<SqliteConnectionManager>, pool: Pool,
telegram_token: String, telegram_token: String,
telegram_chat_id: String, telegram_chat_id: String,
} }
@ -288,13 +286,20 @@ impl Auto {
)) ))
.await; .await;
} }
let links: Vec<String> = self let links: Vec<String> = {
.pool self.pool
.get()? .get()
.prepare(r#"SELECT url FROM producto_urls;"#)? .await?
.query_map([], |r| r.get::<_, String>(0))? .interact(|conn| -> anyhow::Result<Vec<String>> {
.map(|r| r.unwrap()) Ok(conn
.collect(); .prepare(r#"SELECT url FROM producto_urls;"#)?
.query_map([], |r| r.get::<_, String>(0))?
.map(|r| r.unwrap())
.collect())
})
.await
.unwrap()?
};
{ {
let t0 = now_sec(); let t0 = now_sec();
let counters = fetch_list(&self.pool, links).await; let counters = fetch_list(&self.pool, links).await;
@ -311,21 +316,27 @@ impl Auto {
async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> { async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?; let urls = get_urls(supermercado).await?;
let connection = &mut self.pool.get()?; self.pool
let tx = connection.transaction()?; .get()
{ .await?
let mut stmt = tx.prepare( .interact(|conn| -> Result<(), anyhow::Error> {
r#"INSERT INTO producto_urls(url, first_seen, last_seen) let tx = conn.transaction()?;
VALUES (?1, ?2, ?2) {
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#, let mut stmt = tx.prepare(
)?; r#"INSERT INTO producto_urls(url, first_seen, last_seen)
let now: u64 = now_ms().try_into()?; VALUES (?1, ?2, ?2)
for url in urls { ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
stmt.execute(rusqlite::params![url, now])?; )?;
} let now: u64 = now_ms().try_into()?;
} for url in urls {
tx.commit()?; stmt.execute(rusqlite::params![url, now])?;
}
}
tx.commit()?;
Ok(())
})
.await
.unwrap()?;
Ok(()) Ok(())
} }