Compare commits

...

3 commits

Author SHA1 Message Date
fba3072d5f sqlx 2024-07-07 13:05:34 -03:00
abf501bd5e solo usar últimos 60 días de urls 2024-07-07 13:03:07 -03:00
1230f34aca split read/write pool
https://github.com/launchbadge/sqlx/issues/451
2024-07-07 12:01:08 -03:00
3 changed files with 57 additions and 34 deletions

View file

@ -1,6 +1,6 @@
{ {
"db_name": "SQLite", "db_name": "SQLite",
"query": "SELECT url FROM producto_urls WHERE url LIKE ?1;", "query": "SELECT url FROM producto_urls WHERE url LIKE ?1 AND last_seen > ?2;",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -10,11 +10,11 @@
} }
], ],
"parameters": { "parameters": {
"Right": 1 "Right": 2
}, },
"nullable": [ "nullable": [
false false
] ]
}, },
"hash": "aa5c2a04aec149d88f6e25a9bd7df4e257f3c9b0efa62c8342d077d69d826a69" "hash": "8dac7e96d5dab0d6f48c5fa7d4844e9d4f9b3bf38e55cb56e3941bb8fbf0a9ff"
} }

View file

@ -41,7 +41,10 @@ impl Auto {
} }
} }
let links: Vec<String> = { let links: Vec<String> = {
let mut links = self.db.get_urls_by_domain(supermercado.host()).await?; let mut links = self
.db
.get_recent_urls_by_domain(supermercado.host())
.await?;
if let Some(n) = self.args.n_products { if let Some(n) = self.args.n_products {
links.truncate(n); links.truncate(n);
} }

View file

@ -11,29 +11,20 @@ use crate::{best_selling::BestSellingRecord, PrecioPoint};
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {
pool: SqlitePool, read_pool: SqlitePool,
write_pool: SqlitePool,
} }
impl Db { impl Db {
pub async fn connect() -> anyhow::Result<Self> { pub async fn connect() -> anyhow::Result<Self> {
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string()); let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
info!("Opening DB at {}", db_path); info!("Opening DB at {}", db_path);
let pool = sqlx::pool::PoolOptions::new() let read_pool = connect_to_db(&db_path, 32).await?;
.max_connections(1) let write_pool = connect_to_db(&db_path, 1).await?;
.connect_with( Ok(Self {
SqliteConnectOptions::from_str(&format!("sqlite://{}", db_path))? read_pool,
// https://fractaledmind.github.io/2023/09/07/enhancing-rails-sqlite-fine-tuning/ write_pool,
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) })
.pragma("journal_size_limit", "67108864")
.pragma("mmap_size", "134217728")
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(15))
.pragma("cache_size", "2000")
.pragma("temp_store", "memory")
.optimize_on_close(true, None),
)
.await?;
Ok(Self { pool })
} }
pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> { pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> {
@ -47,32 +38,37 @@ impl Db {
point.parser_version, point.parser_version,
point.name, point.name,
point.image_url, point.image_url,
).execute(&self.pool).await?; ).execute(&self.write_pool).await?;
Ok(()) Ok(())
} }
pub async fn get_ean_by_url(&self, url: &str) -> anyhow::Result<Option<String>> { pub async fn get_ean_by_url(&self, url: &str) -> anyhow::Result<Option<String>> {
Ok(sqlx::query!("SELECT ean FROM precios WHERE url = ?1;", url) Ok(sqlx::query!("SELECT ean FROM precios WHERE url = ?1;", url)
.fetch_optional(&self.pool) .fetch_optional(&self.read_pool)
.await? .await?
.map(|r| r.ean)) .map(|r| r.ean))
} }
pub async fn get_urls_by_domain(&self, domain: &str) -> anyhow::Result<Vec<String>> { pub async fn get_recent_urls_by_domain(&self, domain: &str) -> anyhow::Result<Vec<String>> {
let query = format!("%{}%", domain); let query = format!("%{}%", domain);
Ok( let last_60_days: i64 = (now() - Duration::from_secs(60 * 60 * 24 * 60))
sqlx::query!("SELECT url FROM producto_urls WHERE url LIKE ?1;", query) .as_millis()
.fetch_all(&self.pool) .try_into()?;
.await? Ok(sqlx::query!(
.into_iter() "SELECT url FROM producto_urls WHERE url LIKE ?1 AND last_seen > ?2;",
.map(|r| r.url) query,
.collect(), last_60_days
) )
.fetch_all(&self.read_pool)
.await?
.into_iter()
.map(|r| r.url)
.collect())
} }
pub async fn save_producto_urls(&self, urls: Vec<String>) -> anyhow::Result<()> { pub async fn save_producto_urls(&self, urls: Vec<String>) -> anyhow::Result<()> {
let now: i64 = now_ms().try_into()?; let now: i64 = now_ms().try_into()?;
let mut tx = self.pool.begin().await?; let mut tx = self.write_pool.begin().await?;
for url in urls { for url in urls {
sqlx::query!( sqlx::query!(
r#"INSERT INTO producto_urls(url, first_seen, last_seen) r#"INSERT INTO producto_urls(url, first_seen, last_seen)
@ -89,7 +85,7 @@ impl Db {
} }
pub async fn save_best_selling(&self, records: Vec<BestSellingRecord>) -> anyhow::Result<()> { pub async fn save_best_selling(&self, records: Vec<BestSellingRecord>) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?; let mut tx = self.write_pool.begin().await?;
for record in records { for record in records {
let fetched_at = record.fetched_at.timestamp_millis(); let fetched_at = record.fetched_at.timestamp_millis();
let category = record.category.id(); let category = record.category.id();
@ -109,9 +105,33 @@ impl Db {
} }
} }
async fn connect_to_db(
db_path: &str,
max_connections: u32,
) -> Result<sqlx::Pool<sqlx::Sqlite>, anyhow::Error> {
Ok(sqlx::pool::PoolOptions::new()
.max_connections(max_connections)
.connect_with(
SqliteConnectOptions::from_str(&format!("sqlite://{}", db_path))?
// https://fractaledmind.github.io/2023/09/07/enhancing-rails-sqlite-fine-tuning/
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.pragma("journal_size_limit", "67108864")
.pragma("mmap_size", "134217728")
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(15))
.pragma("cache_size", "2000")
.pragma("temp_store", "memory")
.optimize_on_close(true, None),
)
.await?)
}
fn now_ms() -> u128 { fn now_ms() -> u128 {
now().as_millis()
}
fn now() -> Duration {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("Time went backwards") .expect("Time went backwards")
.as_millis()
} }