WIP changes

This commit is contained in:
Nulo 2025-01-11 14:13:36 +00:00
parent 94c4b35740
commit 2f44b329e7
4 changed files with 149 additions and 61 deletions

View file

@ -1,5 +1,6 @@
{
"spellright.language": ["es_AR"],
"spellright.documentTypes": ["markdown", "latex", "plaintext"],
"editor.formatOnSave": true
"editor.formatOnSave": true,
"rust-analyzer.trace.server": "verbose"
}

View file

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "adler2"
@ -525,6 +525,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "filetime"
version = "0.2.25"
@ -654,6 +660,7 @@ dependencies = [
"regex",
"serde",
"tar",
"tempfile",
"zstd",
]
@ -1279,6 +1286,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tempfile"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.59.0",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"

View file

@ -11,4 +11,5 @@ rayon = "1.10.0"
regex = "1.11.1"
serde = { version = "1.0.215", features = ["derive"] }
tar = "0.4.43"
tempfile = "3.14.0"
zstd = "0.13.2"

View file

@ -1,7 +1,9 @@
use anyhow::anyhow;
use rayon::prelude::*;
use std::{
env::args,
io::{self, BufRead},
fs,
io::{self, BufRead, Write},
path::{Path, PathBuf},
};
use tar::Archive;
@ -44,7 +46,10 @@ struct Bandera {
fn read_csv_trimmed(path: PathBuf) -> anyhow::Result<String> {
Ok(io::BufReader::new(std::fs::File::open(path)?)
.lines()
.map(|line| line.map(|l| l.trim().to_string()))
.map(|line| {
line.map(|l| l.trim().to_string())
.map_err(anyhow::Error::from)
})
.take_while(|line| {
if let Ok(l) = line {
if l.starts_with('&') || l.starts_with(' ') || l.starts_with('\0') || l.is_empty() {
@ -52,6 +57,9 @@ fn read_csv_trimmed(path: PathBuf) -> anyhow::Result<String> {
.replace('\0', "")
.replace(" ", "")
.is_empty()
} else if l.starts_with("Última") {
// algunos datasets erroneamente les falta el newline
false
} else {
true
}
@ -59,7 +67,16 @@ fn read_csv_trimmed(path: PathBuf) -> anyhow::Result<String> {
true
}
})
.collect::<Result<Vec<_>, io::Error>>()?
.map(|line| {
line.and_then(|l| {
if l.starts_with("|") {
Err(anyhow!("Alberdi S.A.: newlines incorrectos"))
} else {
Ok(l)
}
})
})
.collect::<Result<Vec<String>, _>>()?
.join("\n"))
}
@ -72,8 +89,8 @@ struct Sucursal {
sucursales_tipo: String,
sucursales_calle: String,
sucursales_numero: String,
sucursales_latitud: f64,
sucursales_longitud: f64,
sucursales_latitud: Option<f64>,
sucursales_longitud: Option<f64>,
sucursales_observaciones: Option<String>,
sucursales_barrio: Option<String>,
sucursales_codigo_postal: String,
@ -85,10 +102,12 @@ struct Sucursal {
sucursales_jueves_horario_atencion: String,
sucursales_viernes_horario_atencion: String,
sucursales_sabado_horario_atencion: String,
/// bruh. a veces lo escriben asi
#[serde(alias = "sucursales_domingohorario_atencion")]
sucursales_domingo_horario_atencion: String,
}
fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> {
fn import_dataset(conn: &Connection, dir_path: &Path) -> anyhow::Result<()> {
conn.execute("BEGIN", duckdb::params![])?;
let dataset_id = {
@ -150,9 +169,17 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> {
let mut file = csv::ReaderBuilder::new()
.delimiter(b'|')
.from_reader(csv.as_bytes());
let sucursales = file
.records()
.map(|r| r.unwrap().deserialize::<Sucursal>(None).unwrap());
let sucursales: Vec<Sucursal> = file
.deserialize()
.map(|result: csv::Result<Sucursal>| {
result.map_err(|e| {
println!("Error: {:?}", e);
println!("CSV content: {}", csv);
e
})
})
.collect::<Result<_, _>>()?;
for sucursal in sucursales {
app.append_row(duckdb::params![
dataset_id,
@ -183,49 +210,58 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> {
{
let file = read_csv_trimmed(dir_path.join("productos.csv"))?;
let mut app = conn.appender("precios")?;
let mut temp = tempfile::NamedTempFile::new()?;
// get path to file
temp.write_all(file.as_bytes())?;
// let mut app = conn.appender("precios")?;
let start = std::time::Instant::now();
let mut rdr = csv::ReaderBuilder::new()
.delimiter(b'|')
.from_reader(file.as_bytes());
for result in rdr.records() {
match result {
Ok(record) => {
// println!("{:?}", record);
let producto: Producto = record.deserialize(None).unwrap();
// println!("{:?}", producto);
app.append_row(duckdb::params![
dataset_id,
producto.id_comercio,
producto.id_bandera,
producto.id_sucursal,
producto.id_producto,
producto.productos_ean,
producto.productos_descripcion,
producto.productos_cantidad_presentacion,
producto.productos_unidad_medida_presentacion,
producto.productos_marca,
producto.productos_precio_lista,
producto.productos_precio_referencia,
producto.productos_cantidad_referencia,
producto.productos_unidad_medida_referencia,
producto.productos_precio_unitario_promo1,
producto.productos_leyenda_promo1,
producto.productos_precio_unitario_promo2,
producto.productos_leyenda_promo2,
])?;
}
Err(e) => {
println!("Error: {:?}", e);
println!(
"lines: {:?}",
&file[e.position().unwrap().byte() as usize..]
);
panic!("Error parsing csv: {:#?}", e);
}
}
}
app.flush()?;
conn.execute(
"
insert into precios select ? as id_dataset, * from read_csv(?, delim='|', header=true, nullstr='')",
duckdb::params![dataset_id, &temp.path().to_string_lossy()],
)?;
// let mut rdr = csv::ReaderBuilder::new()
// .delimiter(b'|')
// .from_reader(file.as_bytes());
// for result in rdr.records() {
// match result {
// Ok(record) => {
// // println!("{:?}", record);
// let producto: Producto = record.deserialize(None).unwrap();
// // println!("{:?}", producto);
// // app.append_row(duckdb::params![
// // dataset_id,
// // producto.id_comercio,
// // producto.id_bandera,
// // producto.id_sucursal,
// // producto.id_producto,
// // producto.productos_ean,
// // producto.productos_descripcion,
// // producto.productos_cantidad_presentacion,
// // producto.productos_unidad_medida_presentacion,
// // producto.productos_marca,
// // producto.productos_precio_lista,
// // producto.productos_precio_referencia,
// // producto.productos_cantidad_referencia,
// // producto.productos_unidad_medida_referencia,
// // producto.productos_precio_unitario_promo1,
// // producto.productos_leyenda_promo1,
// // producto.productos_precio_unitario_promo2,
// // producto.productos_leyenda_promo2,
// // ])?;
// }
// Err(e) => {
// println!("Error: {:?}", e);
// println!(
// "lines: {:?}",
// &file[e.position().unwrap().byte() as usize..]
// );
// panic!("Error parsing csv: {:#?}", e);
// }
// }
// }
// app.flush()?;
println!("Time taken flushed: {:?}", start.elapsed());
}
conn.execute("COMMIT", duckdb::params![])?;
@ -233,6 +269,34 @@ fn import_dataset(conn: &Connection, dir_path: PathBuf) -> anyhow::Result<()> {
Ok(())
}
/// a dataset dump is a dump of a single day with multiple datasets (one per comercio)
fn import_dataset_dump(conn: &Connection, dir_path: &Path) -> anyhow::Result<()> {
for entry in fs::read_dir(dir_path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let res = import_dataset(conn, &path);
match res {
Ok(_) => {}
Err(e) => {
println!("Error importing dataset: {:?}", e);
conn.execute("ROLLBACK", duckdb::params![])?;
}
}
}
}
Ok(())
}
fn is_dataset_dump(dir_path: &Path) -> anyhow::Result<bool> {
Ok(fs::read_dir(dir_path)?.any(|entry| {
entry
.as_ref()
.map(|e| e.file_name() == "dataset-info.json")
.unwrap_or(false)
}))
}
fn main() {
let conn = Connection::open("importer-rs.db").unwrap();
@ -254,13 +318,15 @@ fn main() {
// import_dataset(&conn.try_clone().unwrap(), parent.to_path_buf()).unwrap();
// });
import_dataset(
&conn.try_clone().unwrap(),
args()
.nth(1)
.unwrap_or("/sepa_1_comercio-sepa-10_2024-11-23_09-05-11/".to_owned())
.into(),
)
.unwrap();
let path: PathBuf = args()
.nth(1)
.unwrap_or("/sepa_1_comercio-sepa-10_2024-11-23_09-05-11/".to_owned())
.into();
if is_dataset_dump(&path).unwrap() {
import_dataset_dump(&conn, &path).unwrap();
} else {
import_dataset(&conn.try_clone().unwrap(), &path).unwrap();
}
println!("Hello, world!");
}