This commit is contained in:
Cat /dev/Nulo 2024-01-30 10:52:31 -03:00 committed by Nulo
parent 80c783eb84
commit 244dddf6d4
13 changed files with 948 additions and 183 deletions

1
scraper-rs/.env Normal file
View file

@ -0,0 +1 @@
DATABASE_URL=sqlite:../sqlite.db

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO producto_urls(url, first_seen, last_seen)\n VALUES (?1, ?2, ?2)\n ON CONFLICT(url) DO UPDATE SET last_seen=?2;",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "08d55fc80c8a6ad73d311e8b1cd535425e5c2f39cf98735b5f67cb91d01937ce"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO db_best_selling(fetched_at, category, eans_json)\n VALUES (?1, ?2, ?3);",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "144f4622ac9a937aa4885ceb5a67f6c0a78e7e025cf152a8c176b9fd1de241da"
}

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT url FROM producto_urls WHERE url LIKE ?1;",
"describe": {
"columns": [
{
"name": "url",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "aa5c2a04aec149d88f6e25a9bd7df4e257f3c9b0efa62c8342d077d69d826a69"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
"describe": {
"columns": [],
"parameters": {
"Right": 9
},
"nullable": []
},
"hash": "d0c3a557a81f6685b242ed0be8e8c67b47ec8a575d2a14a487b3294e0faec438"
}

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT ean FROM precios WHERE url = ?1;",
"describe": {
"columns": [
{
"name": "ean",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "f249765f2fb013a81a4157a6ce19744a8d5f70c83ed9ddddfd55009136088a52"
}

755
scraper-rs/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -12,8 +12,7 @@ base64 = "0.21.7"
chrono = "0.4.32" chrono = "0.4.32"
clap = { version = "4.4.15", features = ["derive"] } clap = { version = "4.4.15", features = ["derive"] }
cron = "0.12.0" cron = "0.12.0"
deadpool = "0.10.0" sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite" ] }
deadpool-sqlite = "0.7.0"
futures = "0.3.30" futures = "0.3.30"
html-escape = "0.2.13" html-escape = "0.2.13"
itertools = "0.12.0" itertools = "0.12.0"

View file

@ -1,9 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::{build_client, sites::vtex, supermercado::Supermercado}; use crate::{build_client, db::Db, sites::vtex, supermercado::Supermercado};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use clap::ValueEnum; use clap::ValueEnum;
use deadpool_sqlite::Pool;
use futures::{stream, FutureExt, StreamExt, TryStreamExt}; use futures::{stream, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools; use itertools::Itertools;
use tracing::warn; use tracing::warn;
@ -49,21 +48,11 @@ pub struct BestSellingRecord {
pub eans: Vec<String>, pub eans: Vec<String>,
} }
async fn get_best_selling_eans(pool: &Pool, urls: Vec<String>) -> anyhow::Result<Vec<String>> { async fn get_best_selling_eans(db: &Db, urls: Vec<String>) -> anyhow::Result<Vec<String>> {
let mut eans: Vec<String> = Vec::new(); let mut eans: Vec<String> = Vec::new();
for url in urls { for url in urls {
let q = url.clone(); let ean = db.get_ean_by_url(&url).await?;
let ean = pool
.get()
.await?
.interact(move |conn| {
conn.prepare(r#"SELECT ean FROM precios WHERE url = ?1;"#)?
.query_map(rusqlite::params![q], |r| r.get::<_, String>(0))
.map(|r| r.map(|r| r.unwrap()).next())
})
.await
.unwrap()?;
match ean { match ean {
Some(e) => eans.push(e), Some(e) => eans.push(e),
None => warn!("No encontré EAN para {}", url), None => warn!("No encontré EAN para {}", url),
@ -75,13 +64,13 @@ async fn get_best_selling_eans(pool: &Pool, urls: Vec<String>) -> anyhow::Result
async fn try_get_best_selling_eans( async fn try_get_best_selling_eans(
client: reqwest::Client, client: reqwest::Client,
pool: Pool, db: Db,
supermercado: &Supermercado, supermercado: &Supermercado,
category: &Category, category: &Category,
) -> anyhow::Result<Option<Vec<String>>> { ) -> anyhow::Result<Option<Vec<String>>> {
if let Some(query) = category.query(supermercado) { if let Some(query) = category.query(supermercado) {
let urls = vtex::get_best_selling_by_category(&client, supermercado.host(), query).await?; let urls = vtex::get_best_selling_by_category(&client, supermercado.host(), query).await?;
let eans = get_best_selling_eans(&pool, urls).await?; let eans = get_best_selling_eans(&db, urls).await?;
Ok(Some(eans)) Ok(Some(eans))
} else { } else {
Ok(None) Ok(None)
@ -107,18 +96,18 @@ fn rank_eans(eans: Vec<Vec<String>>) -> Vec<String> {
.collect_vec() .collect_vec()
} }
pub async fn get_all_best_selling(pool: &Pool) -> anyhow::Result<Vec<BestSellingRecord>> { pub async fn get_all_best_selling(db: &Db) -> anyhow::Result<Vec<BestSellingRecord>> {
let client = &build_client(); let client = &build_client();
stream::iter(Category::value_variants()) stream::iter(Category::value_variants())
.map(|category| { .map(|category| {
stream::iter(Supermercado::value_variants()) stream::iter(Supermercado::value_variants())
.map(|supermercado| { .map(|supermercado| {
let pool = pool.clone(); let db = db.clone();
let client = client.clone(); let client = client.clone();
tokio::spawn(try_get_best_selling_eans( tokio::spawn(try_get_best_selling_eans(
client, client,
pool, db,
supermercado, supermercado,
category, category,
)) ))

109
scraper-rs/src/db.rs Normal file
View file

@ -0,0 +1,109 @@
use std::{
env,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
use crate::{best_selling::BestSellingRecord, PrecioPoint};
#[derive(Clone)]
pub struct Db {
pool: SqlitePool,
}
impl Db {
pub async fn connect() -> anyhow::Result<Self> {
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
let pool = sqlx::pool::PoolOptions::new()
.max_connections(1)
.connect_with(
SqliteConnectOptions::from_str(&db_path)?
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.optimize_on_close(true, None),
)
.await?;
Ok(Self { pool })
}
pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> {
sqlx::query!("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
point.ean,
point.fetched_at,
point.precio_centavos,
point.in_stock,
point.url,
None::<String>,
point.parser_version,
point.name,
point.image_url,
).execute(&self.pool).await?;
Ok(())
}
pub async fn get_ean_by_url(&self, url: &str) -> anyhow::Result<Option<String>> {
Ok(sqlx::query!("SELECT ean FROM precios WHERE url = ?1;", url)
.fetch_optional(&self.pool)
.await?
.map(|r| r.ean))
}
pub async fn get_urls_by_domain(&self, domain: &str) -> anyhow::Result<Vec<String>> {
let query = format!("%{}%", domain);
Ok(
sqlx::query!("SELECT url FROM producto_urls WHERE url LIKE ?1;", query)
.fetch_all(&self.pool)
.await?
.into_iter()
.map(|r| r.url)
.collect(),
)
}
pub async fn save_producto_urls(&self, urls: Vec<String>) -> anyhow::Result<()> {
let now: i64 = now_ms().try_into()?;
let mut tx = self.pool.begin().await?;
for url in urls {
sqlx::query!(
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
VALUES (?1, ?2, ?2)
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
url,
now
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn save_best_selling(&self, records: Vec<BestSellingRecord>) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?;
for record in records {
let fetched_at = record.fetched_at.timestamp_millis();
let category = record.category.id();
let eans_json = serde_json::Value::from(record.eans).to_string();
sqlx::query!(
r#"INSERT INTO db_best_selling(fetched_at, category, eans_json)
VALUES (?1, ?2, ?3);"#,
fetched_at,
category,
eans_json
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
}
fn now_ms() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis()
}

View file

@ -1,8 +1,7 @@
use again::RetryPolicy; use again::RetryPolicy;
use best_selling::BestSellingRecord;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use cron::Schedule; use cron::Schedule;
use deadpool_sqlite::Pool; use db::Db;
use futures::{future, stream, Future, StreamExt}; use futures::{future, stream, Future, StreamExt};
use nanoid::nanoid; use nanoid::nanoid;
use reqwest::{header::HeaderMap, StatusCode, Url}; use reqwest::{header::HeaderMap, StatusCode, Url};
@ -73,7 +72,7 @@ async fn scrap_url_cli(url: String) -> anyhow::Result<()> {
} }
mod best_selling; mod best_selling;
async fn scrap_best_selling_cli() -> anyhow::Result<()> { async fn scrap_best_selling_cli() -> anyhow::Result<()> {
let db = connect_db(); let db = Db::connect().await?;
let res = best_selling::get_all_best_selling(&db).await; let res = best_selling::get_all_best_selling(&db).await;
println!("Result: {:#?}", res); println!("Result: {:#?}", res);
@ -89,14 +88,14 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let pool = connect_db(); let db = Db::connect().await?;
let counters = fetch_list(&pool, links).await; let counters = fetch_list(&db, links).await;
println!("Finished: {:?}", counters); println!("Finished: {:?}", counters);
Ok(()) Ok(())
} }
async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters { async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
let n_coroutines = env::var("N_COROUTINES") let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(24), |s| s.parse::<usize>()) .map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número"); .expect("N_COROUTINES no es un número");
@ -105,9 +104,9 @@ async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
stream::iter(links) stream::iter(links)
.map(|url| { .map(|url| {
let pool = pool.clone(); let db = db.clone();
let client = client.clone(); let client = client.clone();
tokio::spawn(fetch_and_save(client, url, pool)) tokio::spawn(fetch_and_save(client, url, db))
}) })
.buffer_unordered(n_coroutines) .buffer_unordered(n_coroutines)
.fold(Counters::default(), move |x, y| { .fold(Counters::default(), move |x, y| {
@ -121,11 +120,7 @@ async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
.await .await
} }
fn connect_db() -> Pool { mod db;
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
let cfg = deadpool_sqlite::Config::new(db_path);
cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap()
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct Counters { struct Counters {
@ -134,26 +129,13 @@ struct Counters {
skipped: u64, skipped: u64,
} }
async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters { async fn fetch_and_save(client: reqwest::Client, url: String, db: Db) -> Counters {
let res = fetch_and_parse(&client, url.clone()).await; let res = fetch_and_parse(&client, url.clone()).await;
let mut counters = Counters::default(); let mut counters = Counters::default();
match res { match res {
Ok(res) => { Ok(res) => {
counters.success += 1; counters.success += 1;
pool.get().await.unwrap().interact(move |conn| conn.execute( db.insert_precio(res).await.unwrap();
"INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
rusqlite::params![
res.ean,
res.fetched_at,
res.precio_centavos,
res.in_stock,
res.url,
None::<String>,
res.parser_version,
res.name,
res.image_url,
]
)).await.unwrap().unwrap();
} }
Err(err) => { Err(err) => {
match err.downcast_ref::<reqwest::Error>() { match err.downcast_ref::<reqwest::Error>() {
@ -301,7 +283,7 @@ struct AutoTelegram {
#[derive(Clone)] #[derive(Clone)]
struct Auto { struct Auto {
pool: Pool, db: Db,
telegram: Option<AutoTelegram>, telegram: Option<AutoTelegram>,
} }
impl Auto { impl Auto {
@ -316,24 +298,7 @@ impl Auto {
)) ))
.await; .await;
} }
let links: Vec<String> = { let links: Vec<String> = self.db.get_urls_by_domain(supermercado.host()).await?;
let search = format!("%{}%", supermercado.host());
self.pool
.get()
.await?
.interact(move |conn| -> anyhow::Result<Vec<String>> {
Ok(conn
.prepare(
r#"SELECT url FROM producto_urls
WHERE url LIKE ?1;"#,
)?
.query_map(rusqlite::params![search], |r| r.get::<_, String>(0))?
.map(|r| r.unwrap())
.collect())
})
.await
.unwrap()?
};
// { // {
// let debug_path = PathBuf::from("debug/"); // let debug_path = PathBuf::from("debug/");
// tokio::fs::create_dir_all(&debug_path).await.unwrap(); // tokio::fs::create_dir_all(&debug_path).await.unwrap();
@ -345,7 +310,7 @@ impl Auto {
// } // }
{ {
let t0 = now_sec(); let t0 = now_sec();
let counters = fetch_list(&self.pool, links).await; let counters = fetch_list(&self.db, links).await;
self.inform(&format!( self.inform(&format!(
"Downloaded {:?}: {:?} (took {})", "Downloaded {:?}: {:?} (took {})",
&supermercado, &supermercado,
@ -368,56 +333,7 @@ impl Auto {
async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> { async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?; let urls = get_urls(supermercado).await?;
self.pool self.db.save_producto_urls(urls).await?;
.get()
.await?
.interact(|conn| -> Result<(), anyhow::Error> {
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare(
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
VALUES (?1, ?2, ?2)
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
)?;
let now: u64 = now_ms().try_into()?;
for url in urls {
stmt.execute(rusqlite::params![url, now])?;
}
}
tx.commit()?;
Ok(())
})
.await
.unwrap()?;
Ok(())
}
async fn save_best_selling(&self, best_selling: Vec<BestSellingRecord>) -> anyhow::Result<()> {
self.pool
.get()
.await?
.interact(move |conn| -> Result<(), anyhow::Error> {
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare(
r#"INSERT INTO db_best_selling(fetched_at, category, eans_json)
VALUES (?1, ?2, ?3);"#,
)?;
for record in best_selling {
let eans_json = serde_json::Value::from(record.eans).to_string();
let fetched_at = record.fetched_at.timestamp_millis();
stmt.execute(rusqlite::params![
fetched_at,
record.category.id(),
eans_json
])?;
}
}
tx.commit()?;
Ok(())
})
.await
.unwrap()?;
Ok(()) Ok(())
} }
@ -438,7 +354,8 @@ impl Auto {
} }
async fn auto_cli() -> anyhow::Result<()> { async fn auto_cli() -> anyhow::Result<()> {
let db = connect_db(); let auto = {
let db = Db::connect().await?;
let telegram = { let telegram = {
match ( match (
env::var("TELEGRAM_BOT_TOKEN"), env::var("TELEGRAM_BOT_TOKEN"),
@ -451,7 +368,8 @@ async fn auto_cli() -> anyhow::Result<()> {
} }
} }
}; };
let auto = Auto { pool: db, telegram }; Auto { db, telegram }
};
auto.inform("[auto] Empezando scrap").await; auto.inform("[auto] Empezando scrap").await;
let handles: Vec<_> = Supermercado::value_variants() let handles: Vec<_> = Supermercado::value_variants()
.iter() .iter()
@ -462,10 +380,10 @@ async fn auto_cli() -> anyhow::Result<()> {
let best_selling = auto let best_selling = auto
.inform_time( .inform_time(
"Downloaded best selling", "Downloaded best selling",
best_selling::get_all_best_selling(&auto.pool), best_selling::get_all_best_selling(&auto.db),
) )
.await?; .await?;
auto.save_best_selling(best_selling).await?; auto.db.save_best_selling(best_selling).await?;
Ok(()) Ok(())
} }
@ -494,8 +412,8 @@ mod sites;
struct PrecioPoint { struct PrecioPoint {
ean: String, ean: String,
// unix // unix
fetched_at: u64, fetched_at: i64,
precio_centavos: Option<u64>, precio_centavos: Option<i64>,
in_stock: Option<bool>, in_stock: Option<bool>,
url: String, url: String,
parser_version: u16, parser_version: u16,
@ -503,13 +421,9 @@ struct PrecioPoint {
image_url: Option<String>, image_url: Option<String>,
} }
fn now_sec() -> u64 { fn now_sec() -> i64 {
since_the_epoch().as_secs() since_the_epoch().as_secs().try_into().unwrap()
} }
fn now_ms() -> u128 {
since_the_epoch().as_millis()
}
fn since_the_epoch() -> Duration { fn since_the_epoch() -> Duration {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)

View file

@ -11,9 +11,9 @@ pub fn get_meta_content<'a>(dom: &'a VDom<'a>, prop: &str) -> Option<Cow<'a, str
.map(|s| s.as_utf8_str()) .map(|s| s.as_utf8_str())
} }
pub fn price_from_meta(dom: &tl::VDom<'_>) -> Result<Option<u64>, anyhow::Error> { pub fn price_from_meta(dom: &tl::VDom<'_>) -> Result<Option<i64>, anyhow::Error> {
let precio_centavos = get_meta_content(dom, "product:price:amount") let precio_centavos = get_meta_content(dom, "product:price:amount")
.map(|s| s.parse::<f64>().map(|f| (f * 100.0) as u64)) .map(|s| s.parse::<f64>().map(|f| (f * 100.0) as i64))
.transpose()?; .transpose()?;
Ok(precio_centavos) Ok(precio_centavos)
} }

View file

@ -37,7 +37,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
}) })
.transpose() .transpose()
.context("Parseando precio")? .context("Parseando precio")?
.map(|f| (f * 100.0) as u64); .map(|f| (f * 100.0) as i64);
let in_stock = Some( let in_stock = Some(
dom.query_selector(".product_not_available") dom.query_selector(".product_not_available")