diff --git a/.vscode/settings.json b/.vscode/settings.json index a471a94..7d3e4b0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ { "spellright.language": ["es_AR"], "spellright.documentTypes": ["markdown", "latex", "plaintext"], - "editor.formatOnSave": true + "editor.formatOnSave": true, + "rust-analyzer.trace.server": "verbose" } diff --git a/sepa/importer-rs/Cargo.lock b/sepa/importer-rs/Cargo.lock index 1f3e8f6..c1d109c 100644 --- a/sepa/importer-rs/Cargo.lock +++ b/sepa/importer-rs/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "adler2" @@ -525,6 +525,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" + [[package]] name = "filetime" version = "0.2.25" @@ -654,6 +660,7 @@ dependencies = [ "regex", "serde", "tar", + "tempfile", "zstd", ] @@ -1279,6 +1286,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tempfile" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "tiny-keccak" version = "2.0.2" diff --git a/sepa/importer-rs/Cargo.toml b/sepa/importer-rs/Cargo.toml index 7c56b18..47b3492 100644 --- a/sepa/importer-rs/Cargo.toml +++ b/sepa/importer-rs/Cargo.toml @@ -11,4 +11,5 @@ rayon = "1.10.0" regex = "1.11.1" serde = { version = "1.0.215", features = ["derive"] } tar = "0.4.43" +tempfile = "3.14.0" zstd = "0.13.2" diff --git a/sepa/importer-rs/src/main.rs b/sepa/importer-rs/src/main.rs index 2449f57..85409b3 100644 --- a/sepa/importer-rs/src/main.rs +++ b/sepa/importer-rs/src/main.rs @@ -1,7 +1,9 @@ +use anyhow::anyhow; use rayon::prelude::*; use std::{ env::args, - io::{self, BufRead}, + fs, + io::{self, BufRead, Write}, path::{Path, PathBuf}, }; use tar::Archive; @@ -44,7 +46,10 @@ struct Bandera { fn read_csv_trimmed(path: PathBuf) -> anyhow::Result { Ok(io::BufReader::new(std::fs::File::open(path)?) .lines() - .map(|line| line.map(|l| l.trim().to_string())) + .map(|line| { + line.map(|l| l.trim().to_string()) + .map_err(anyhow::Error::from) + }) .take_while(|line| { if let Ok(l) = line { if l.starts_with('&') || l.starts_with(' ') || l.starts_with('\0') || l.is_empty() { @@ -52,6 +57,9 @@ fn read_csv_trimmed(path: PathBuf) -> anyhow::Result { .replace('\0', "") .replace(" ", "") .is_empty() + } else if l.starts_with("Ăšltima") { + // algunos datasets erroneamente les falta el newline + false } else { true } @@ -59,7 +67,16 @@ fn read_csv_trimmed(path: PathBuf) -> anyhow::Result { true } }) - .collect::, io::Error>>()? + .map(|line| { + line.and_then(|l| { + if l.starts_with("|") { + Err(anyhow!("Alberdi S.A.: newlines incorrectos")) + } else { + Ok(l) + } + }) + }) + .collect::, _>>()? .join("\n")) } @@ -72,8 +89,8 @@ struct Sucursal { sucursales_tipo: String, sucursales_calle: String, sucursales_numero: String, - sucursales_latitud: f64, - sucursales_longitud: f64, + sucursales_latitud: Option, + sucursales_longitud: Option, sucursales_observaciones: Option, sucursales_barrio: Option, sucursales_codigo_postal: String, @@ -85,10 +102,12 @@ struct Sucursal { sucursales_jueves_horario_atencion: String, sucursales_viernes_horario_atencion: String, sucursales_sabado_horario_atencion: String, + /// bruh. a veces lo escriben asi + #[serde(alias = "sucursales_domingohorario_atencion")] sucursales_domingo_horario_atencion: String, } -fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> { +fn import_dataset(conn: &Connection, dir_path: &Path) -> anyhow::Result<()> { conn.execute("BEGIN", duckdb::params![])?; let dataset_id = { @@ -150,9 +169,17 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> { let mut file = csv::ReaderBuilder::new() .delimiter(b'|') .from_reader(csv.as_bytes()); - let sucursales = file - .records() - .map(|r| r.unwrap().deserialize::(None).unwrap()); + let sucursales: Vec = file + .deserialize() + .map(|result: csv::Result| { + result.map_err(|e| { + println!("Error: {:?}", e); + println!("CSV content: {}", csv); + e + }) + }) + .collect::>()?; + for sucursal in sucursales { app.append_row(duckdb::params![ dataset_id, @@ -183,49 +210,58 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> { { let file = read_csv_trimmed(dir_path.join("productos.csv"))?; - let mut app = conn.appender("precios")?; + let mut temp = tempfile::NamedTempFile::new()?; + // get path to file + + temp.write_all(file.as_bytes())?; + // let mut app = conn.appender("precios")?; let start = std::time::Instant::now(); - let mut rdr = csv::ReaderBuilder::new() - .delimiter(b'|') - .from_reader(file.as_bytes()); - for result in rdr.records() { - match result { - Ok(record) => { - // println!("{:?}", record); - let producto: Producto = record.deserialize(None).unwrap(); - // println!("{:?}", producto); - app.append_row(duckdb::params![ - dataset_id, - producto.id_comercio, - producto.id_bandera, - producto.id_sucursal, - producto.id_producto, - producto.productos_ean, - producto.productos_descripcion, - producto.productos_cantidad_presentacion, - producto.productos_unidad_medida_presentacion, - producto.productos_marca, - producto.productos_precio_lista, - producto.productos_precio_referencia, - producto.productos_cantidad_referencia, - producto.productos_unidad_medida_referencia, - producto.productos_precio_unitario_promo1, - producto.productos_leyenda_promo1, - producto.productos_precio_unitario_promo2, - producto.productos_leyenda_promo2, - ])?; - } - Err(e) => { - println!("Error: {:?}", e); - println!( - "lines: {:?}", - &file[e.position().unwrap().byte() as usize..] - ); - panic!("Error parsing csv: {:#?}", e); - } - } - } - app.flush()?; + conn.execute( + " +insert into precios select ? as id_dataset, * from read_csv(?, delim='|', header=true, nullstr='')", + duckdb::params![dataset_id, &temp.path().to_string_lossy()], + )?; + // let mut rdr = csv::ReaderBuilder::new() + // .delimiter(b'|') + // .from_reader(file.as_bytes()); + // for result in rdr.records() { + // match result { + // Ok(record) => { + // // println!("{:?}", record); + // let producto: Producto = record.deserialize(None).unwrap(); + // // println!("{:?}", producto); + // // app.append_row(duckdb::params![ + // // dataset_id, + // // producto.id_comercio, + // // producto.id_bandera, + // // producto.id_sucursal, + // // producto.id_producto, + // // producto.productos_ean, + // // producto.productos_descripcion, + // // producto.productos_cantidad_presentacion, + // // producto.productos_unidad_medida_presentacion, + // // producto.productos_marca, + // // producto.productos_precio_lista, + // // producto.productos_precio_referencia, + // // producto.productos_cantidad_referencia, + // // producto.productos_unidad_medida_referencia, + // // producto.productos_precio_unitario_promo1, + // // producto.productos_leyenda_promo1, + // // producto.productos_precio_unitario_promo2, + // // producto.productos_leyenda_promo2, + // // ])?; + // } + // Err(e) => { + // println!("Error: {:?}", e); + // println!( + // "lines: {:?}", + // &file[e.position().unwrap().byte() as usize..] + // ); + // panic!("Error parsing csv: {:#?}", e); + // } + // } + // } + // app.flush()?; println!("Time taken flushed: {:?}", start.elapsed()); } conn.execute("COMMIT", duckdb::params![])?; @@ -233,6 +269,34 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> { Ok(()) } +/// a dataset dump is a dump of a single day with multiple datasets (one per comercio) +fn import_dataset_dump(conn: &Connection, dir_path: &Path) -> anyhow::Result<()> { + for entry in fs::read_dir(dir_path)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + let res = import_dataset(conn, &path); + match res { + Ok(_) => {} + Err(e) => { + println!("Error importing dataset: {:?}", e); + conn.execute("ROLLBACK", duckdb::params![])?; + } + } + } + } + Ok(()) +} + +fn is_dataset_dump(dir_path: &Path) -> anyhow::Result { + Ok(fs::read_dir(dir_path)?.any(|entry| { + entry + .as_ref() + .map(|e| e.file_name() == "dataset-info.json") + .unwrap_or(false) + })) +} + fn main() { let conn = Connection::open("importer-rs.db").unwrap(); @@ -254,13 +318,15 @@ fn main() { // import_dataset(&conn.try_clone().unwrap(), parent.to_path_buf()).unwrap(); // }); - import_dataset( - &conn.try_clone().unwrap(), - args() - .nth(1) - .unwrap_or("/sepa_1_comercio-sepa-10_2024-11-23_09-05-11/".to_owned()) - .into(), - ) - .unwrap(); + let path: PathBuf = args() + .nth(1) + .unwrap_or("/sepa_1_comercio-sepa-10_2024-11-23_09-05-11/".to_owned()) + .into(); + + if is_dataset_dump(&path).unwrap() { + import_dataset_dump(&conn, &path).unwrap(); + } else { + import_dataset(&conn.try_clone().unwrap(), &path).unwrap(); + } println!("Hello, world!"); }