WIP: postgres

This commit is contained in:
Cat /dev/Nulo 2024-07-13 12:43:24 -03:00
parent 1e38f4ddea
commit 0843a28f48
11 changed files with 131 additions and 43 deletions

View file

@ -1 +1,2 @@
DATABASE_URL=sqlite://../sqlite.db # DATABASE_URL=sqlite://../sqlite.db
DATABASE_URL=postgres://preciazo:rezgVdLJik8HRX9heS9dDt5nRXyy9n@localhost:54325/preciazo

1
rust/Cargo.lock generated
View file

@ -1391,6 +1391,7 @@ dependencies = [
"chrono", "chrono",
"clap", "clap",
"cron", "cron",
"dotenvy",
"futures", "futures",
"html-escape", "html-escape",
"itertools", "itertools",

View file

@ -12,7 +12,7 @@ base64 = "0.21.7"
chrono = "0.4" chrono = "0.4"
clap = { version = "4.4.15", features = ["derive"] } clap = { version = "4.4.15", features = ["derive"] }
cron = "0.12.0" cron = "0.12.0"
sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite", "chrono" ] } sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite", "chrono", "postgres" ] }
futures = "0.3.30" futures = "0.3.30"
html-escape = "0.2.13" html-escape = "0.2.13"
itertools = "0.12.0" itertools = "0.12.0"
@ -36,6 +36,7 @@ tokio = { version = "1.35", features = ["full"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
axum = "0.7.5" axum = "0.7.5"
dotenvy = "0.15.7"
[[bin]] [[bin]]
name = "api" name = "api"

5
rust/build.rs Normal file
View file

@ -0,0 +1,5 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}

11
rust/docker-compose.yml Normal file
View file

@ -0,0 +1,11 @@
services:
postgres:
image: docker.io/postgres:16
restart: always
shm_size: 128mb
ports:
- 127.0.0.1:54325:5432
environment:
POSTGRES_USER: preciazo
POSTGRES_PASSWORD: rezgVdLJik8HRX9heS9dDt5nRXyy9n

View file

@ -0,0 +1,21 @@
CREATE TABLE
precios (
id serial PRIMARY KEY NOT NULL,
ean text NOT NULL,
fetched_at integer NOT NULL,
precio_centavos integer,
in_stock integer,
url text NOT NULL,
warc_record_id text,
parser_version integer,
name text,
image_url text
);
CREATE TABLE
producto_urls (
id serial PRIMARY KEY NOT NULL,
url text NOT NULL,
first_seen integer NOT NULL,
last_seen integer NOT NULL
);

View file

@ -0,0 +1,7 @@
CREATE TABLE
db_best_selling (
id serial PRIMARY KEY NOT NULL,
fetched_at integer NOT NULL,
category text NOT NULL,
eans_json text NOT NULL
);

33
rust/src/db.rs Normal file
View file

@ -0,0 +1,33 @@
use std::env;
use sqlx::{postgres::PgPoolOptions, PgPool};
pub async fn connect_db() -> anyhow::Result<PgPool> {
dotenvy::dotenv()?;
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&env::var("DATABASE_URL")?)
.await?;
sqlx::migrate!("./migrations/").run(&pool).await?;
Ok(pool)
// let pool = SqlitePoolOptions::new()
// .max_connections(1)
// .connect_with(
// SqliteConnectOptions::from_str(&format!(
// "sqlite://{}",
// env::var("DB_PATH").unwrap_or("../sqlite.db".to_string())
// ))
// .unwrap()
// .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
// .pragma("journal_size_limit", "67108864")
// .pragma("mmap_size", "134217728")
// .synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
// .busy_timeout(Duration::from_secs(15))
// .pragma("cache_size", "2000")
// .optimize_on_close(true, None),
// )
// .await
// .expect("can't connect to database");
}

View file

@ -1 +1,2 @@
pub mod db;
pub mod supermercado; pub mod supermercado;

View file

@ -4,31 +4,36 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool}; use preciazo::db::connect_db;
use sqlx::{sqlite::SqliteConnectOptions, PgPool, SqlitePool};
use tracing::info; use tracing::info;
use crate::{best_selling::BestSellingRecord, PrecioPoint}; use crate::{best_selling::BestSellingRecord, PrecioPoint};
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {
read_pool: SqlitePool, pool: PgPool,
write_pool: SqlitePool, // read_pool: SqlitePool,
// write_pool: SqlitePool,
} }
impl Db { impl Db {
pub async fn connect() -> anyhow::Result<Self> { pub async fn connect() -> anyhow::Result<Self> {
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
info!("Opening DB at {}", db_path);
let read_pool = connect_to_db(&db_path, 32).await?;
let write_pool = connect_to_db(&db_path, 1).await?;
Ok(Self { Ok(Self {
read_pool, pool: connect_db().await?,
write_pool,
}) })
// let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
// info!("Opening DB at {}", db_path);
// let read_pool = connect_to_db(&db_path, 32).await?;
// let write_pool = connect_to_db(&db_path, 1).await?;
// Ok(Self {
// read_pool,
// write_pool,
// })
} }
pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> { pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> {
sqlx::query!("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);", sqlx::query!("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);",
point.ean, point.ean,
point.fetched_at, point.fetched_at,
point.precio_centavos, point.precio_centavos,
@ -38,7 +43,7 @@ impl Db {
point.parser_version, point.parser_version,
point.name, point.name,
point.image_url, point.image_url,
).execute(&self.write_pool).await?; ).execute(&self.pool).await?;
Ok(()) Ok(())
} }
@ -105,26 +110,26 @@ impl Db {
} }
} }
async fn connect_to_db( // async fn connect_to_db(
db_path: &str, // db_path: &str,
max_connections: u32, // max_connections: u32,
) -> Result<sqlx::Pool<sqlx::Sqlite>, anyhow::Error> { // ) -> Result<sqlx::Pool<sqlx::Sqlite>, anyhow::Error> {
Ok(sqlx::pool::PoolOptions::new() // Ok(sqlx::pool::PoolOptions::new()
.max_connections(max_connections) // .max_connections(max_connections)
.connect_with( // .connect_with(
SqliteConnectOptions::from_str(&format!("sqlite://{}", db_path))? // SqliteConnectOptions::from_str(&format!("sqlite://{}", db_path))?
// https://fractaledmind.github.io/2023/09/07/enhancing-rails-sqlite-fine-tuning/ // // https://fractaledmind.github.io/2023/09/07/enhancing-rails-sqlite-fine-tuning/
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) // .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.pragma("journal_size_limit", "67108864") // .pragma("journal_size_limit", "67108864")
.pragma("mmap_size", "134217728") // .pragma("mmap_size", "134217728")
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal) // .synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(15)) // .busy_timeout(Duration::from_secs(15))
.pragma("cache_size", "2000") // .pragma("cache_size", "2000")
.pragma("temp_store", "memory") // .pragma("temp_store", "memory")
.optimize_on_close(true, None), // .optimize_on_close(true, None),
) // )
.await?) // .await?)
} // }
fn now_ms() -> u128 { fn now_ms() -> u128 {
now().as_millis() now().as_millis()

View file

@ -60,16 +60,18 @@ struct AutoArgs {
async fn main() { async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
match Args::parse() { preciazo::db::connect_db().await.unwrap();
Args::FetchList(a) => fetch_list_cli(a.list_path).await,
Args::ParseFile(a) => parse_file_cli(a.file_path).await, // match Args::parse() {
Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await, // Args::FetchList(a) => fetch_list_cli(a.list_path).await,
Args::ScrapUrl(a) => scrap_url_cli(a.url).await, // Args::ParseFile(a) => parse_file_cli(a.file_path).await,
Args::ScrapBestSelling => scrap_best_selling_cli().await, // Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await,
Args::Auto(a) => auto_cli(a).await, // Args::ScrapUrl(a) => scrap_url_cli(a.url).await,
Args::Cron(_) => cron_cli().await, // Args::ScrapBestSelling => scrap_best_selling_cli().await,
} // Args::Auto(a) => auto_cli(a).await,
.unwrap() // Args::Cron(_) => cron_cli().await,
// }
// .unwrap()
} }
async fn scrap_url_cli(url: String) -> anyhow::Result<()> { async fn scrap_url_cli(url: String) -> anyhow::Result<()> {