diff --git a/sepa/bun.lockb b/sepa/bun.lockb index c279026..7493719 100755 Binary files a/sepa/bun.lockb and b/sepa/bun.lockb differ diff --git a/sepa/importer.js b/sepa/importer.js index 667fd54..a395499 100644 --- a/sepa/importer.js +++ b/sepa/importer.js @@ -1,45 +1,56 @@ // @ts-check -import * as fs from "fs/promises"; -import Papa from "papaparse"; +import * as fsp from "fs/promises"; +import * as fs from "fs"; +import { CsvParserStream, parse, parseString } from "fast-csv"; import { basename, join, dirname } from "path"; import { $ } from "zx"; import PQueue from "p-queue"; import { Database } from "duckdb-async"; import fg from "fast-glob"; -// import { waddler } from "waddler"; +import { + DuckDBAppender, + DuckDBConnection, + DuckDBInstance, +} from "@duckdb/node-api"; +import Papa from "papaparse"; // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) +const instance = await DuckDBInstance.create("importer.db"); -/** - * @param {string} path - * @returns {Promise} - */ -async function readFile(path) { - // XXX: DORINKA SRL a veces envía archivos con UTF-16. - const buffer = await fs.readFile(path, { encoding: null }); - if (buffer[0] === 0xff && buffer[1] === 0xfe) { - return buffer.toString("utf16le"); - } else { - return buffer.toString("utf8"); - } +const queue = new PQueue({ concurrency: 5 }); + +let hasTars = false; +const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); +for (const file of files) { + hasTars = true; + const tar = join(process.argv[2], file); + queue.add(() => importDatasetTar(tar)); +} +await queue.onIdle(); + +if (!hasTars) { + await importDump(process.argv[2]); } /** - * @param {Database} db + * @param {DuckDBConnection} connection * @param {number} datasetId * @param {string} dir */ -async function importSucursales(db, datasetId, dir) { - const sucursales = Papa.parse(await readFile(join(dir, "sucursales.csv")), { - header: true, - }); +async function importSucursales(connection, datasetId, dir) { + const stream = await createReadStream(join(dir, "sucursales.csv")); + const appender = await connection.createAppender("main", "sucursales"); - const objs = sucursales.data - .filter((data) => data.id_comercio && data.id_bandera && data.id_sucursal) - .map((data) => { - // Megatone - if ("sucursales_domingohorario_atencion" in data) { + stream + .pipe( + parse({ headers: true, delimiter: "|", ignoreEmpty: true, trim: true }) + ) + .on("data", (data) => { + if (!data.id_comercio || !data.id_bandera || !data.id_sucursal) { + return; + } + if (data.sucursales_domingohorario_atencion) { data.sucursales_domingo_horario_atencion = data.sucursales_domingohorario_atencion; delete data.sucursales_domingohorario_atencion; @@ -47,84 +58,177 @@ async function importSucursales(db, datasetId, dir) { data.sucursales_nombre = data.sucursales_nombre .replaceAll("\t", " ") .trim(); - return { - id_dataset: datasetId, - ...data, - }; + + if (!("sucursales_longitud" in data)) { + console.debug({ data }); + throw new Error( + "Alberdi S.A. strikes again! las sucursales están rotas." + ); + } + appender.appendInteger(datasetId); + appender.appendInteger(parseInt(data.id_comercio)); + appender.appendInteger(parseInt(data.id_bandera)); + appender.appendInteger(parseInt(data.id_sucursal)); + appender.appendVarchar(data.sucursales_nombre); + appender.appendVarchar(data.sucursales_tipo); + appender.appendVarchar(data.sucursales_calle); + appender.appendVarchar(data.sucursales_numero); + appender.appendInteger(parseFloat(data.sucursales_latitud)); + appender.appendInteger(parseFloat(data.sucursales_longitud)); + appender.appendVarchar(data.sucursales_observaciones); + appender.appendVarchar(data.sucursales_barrio); + appender.appendVarchar(data.sucursales_codigo_postal); + appender.appendVarchar(data.sucursales_localidad); + appender.appendVarchar(data.sucursales_provincia); + appender.appendVarchar(data.sucursales_lunes_horario_atencion); + appender.appendVarchar(data.sucursales_martes_horario_atencion); + appender.appendVarchar(data.sucursales_miercoles_horario_atencion); + appender.appendVarchar(data.sucursales_jueves_horario_atencion); + appender.appendVarchar(data.sucursales_viernes_horario_atencion); + appender.appendVarchar(data.sucursales_sabado_horario_atencion); + appender.appendVarchar(data.sucursales_domingo_horario_atencion); + appender.endRow(); + }) + .on("error", (err) => { + console.error(err); }); - const keys = Object.keys(objs[0]); - if (!keys.includes("sucursales_longitud")) { - throw new Error("Alberdi S.A. strikes again! las sucursales están rotas."); - } - const lines = objs.map( - (data) => keys.map((key) => data[key]).join("\t") + "\n" - ); - - const tsv = `${keys.join("\t")}\n${lines.join("")}`; - await importTsv(db, "sucursales", tsv); + await new Promise((resolve) => stream.on("end", resolve)); + await appender.close(); } /** - * @param {Database} db - * @param {string} table - * @param {string} tsv - */ -async function importTsv(db, table, tsv) { - const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-"); - try { - const tempFile = join(dir, "temp.tsv"); - await fs.writeFile(tempFile, tsv); - console.log( - `COPY ${table} FROM '${tempFile}' WITH (HEADER, DELIMITER '\t', NULL '', QUOTE '')` - ); - await db.exec( - `COPY ${table} FROM '${tempFile}' WITH (HEADER, DELIMITER '\t', NULL '', QUOTE '')` - ); - await fs.rm(dir, { recursive: true }); - } finally { - } -} - -/** - * @param {Database} db + * @param {DuckDBConnection} connection * @param {number} datasetId * @param {string} dir */ -async function importBanderas(db, datasetId, dir) { - const banderas = Papa.parse(await readFile(join(dir, "comercio.csv")), { - header: true, - }); - const objs = banderas.data.map((data) => ({ - id_dataset: datasetId, - ...data, - })); - const keys = [ - "id_dataset", - "id_comercio", - "id_bandera", - "comercio_cuit", - "comercio_razon_social", - "comercio_bandera_nombre", - "comercio_bandera_url", - "comercio_ultima_actualizacion", - "comercio_version_sepa", - ]; - const lines = objs - .filter((data) => data.id_comercio && data.id_bandera) - .map((data) => - keys - .map((key) => { - const value = data[key]; - if (typeof value !== "string") { - return value; - } - return value.replaceAll("\t", " ").trim(); - }) - .join("\t") - ); - const tsv = `${keys.join("\t")}\n${lines.join("\n")}`; +async function importBanderas(connection, datasetId, dir) { + const stream = await createReadStream(join(dir, "comercio.csv")); + const appender = await connection.createAppender("main", "banderas"); - await importTsv(db, "banderas", tsv); + stream + .pipe( + parse({ headers: true, delimiter: "|", ignoreEmpty: true, trim: true }) + ) + .on("data", (data) => { + if (!data.id_comercio || !data.id_bandera) return; + + appender.appendInteger(datasetId); + appender.appendInteger(parseInt(data.id_comercio)); + appender.appendInteger(parseInt(data.id_bandera)); + appender.appendVarchar(data.comercio_cuit); + appender.appendVarchar(data.comercio_razon_social); + appender.appendVarchar(data.comercio_bandera_nombre); + appender.appendVarchar(data.comercio_bandera_url); + appender.appendVarchar(data.comercio_ultima_actualizacion); + appender.appendVarchar(data.comercio_version_sepa); + appender.endRow(); + }) + .on("error", (err) => { + console.error(err); + }); + await new Promise((resolve) => stream.on("end", resolve)); + await appender.close(); +} + +/** + * @param {DuckDBConnection} connection + * @param {number} datasetId + * @param {string} dir + */ +async function importPrecios(connection, datasetId, dir) { + const { comercioCuit } = await getComercioMetadata(dir); + const productosCsvPath = join(dir, "productos.csv"); + + /** @type {CsvParserStream} */ + let csvStream; + + const appender = await connection.createAppender("main", "precios"); + + if (comercioCuit == "30612929455") { + // Libertad S.A. + const file = (await readFile(productosCsvPath)).replaceAll( + "|RAPTOR 6X16X45", + "/RAPTOR 6X16X45" + ); + csvStream = parseString(file, { + headers: true, + delimiter: "|", + ignoreEmpty: true, + trim: true, + }); + } else if (comercioCuit == "30578411174") { + // Alberdi S.A. + const file = (await readFile(productosCsvPath)).replaceAll(";", "|"); + csvStream = parseString(file, { + headers: true, + delimiter: "|", + ignoreEmpty: true, + trim: true, + }); + } else { + csvStream = (await createReadStream(productosCsvPath)).pipe( + parse({ headers: true, delimiter: "|", ignoreEmpty: true, trim: true }) + ); + } + if (["30707429468", "30589621499", "30663005843"].includes(comercioCuit)) { + // TODO: si tienen los valores, pero con otros nombres, por ejemplo + // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. + // pero no quiero mentir, asi que por ahora no lo importo + throw new Error( + `No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b` + ); + } + + csvStream + .on("data", (data) => { + if ( + !data.id_comercio || + !data.id_bandera || + !data.id_sucursal || + !data.id_producto + ) + return; + + if (data.id_producto.includes("e+")) { + console.error(`[${dir}]`, "id_producto corrupto", data.id_producto); + return; + } + if (data.precio_unitario_bulto_por_unidad_venta_con_iva) { + console.error( + `[${dir}]`, + "tiene precio_unitario_bulto_por_unidad_venta_con_iva", + { + data, + } + ); + return; + } + delete data.id_dun_14; + appender.appendInteger(datasetId); + appender.appendInteger(parseInt(data.id_comercio)); + appender.appendInteger(parseInt(data.id_bandera)); + appender.appendInteger(parseInt(data.id_sucursal)); + appender.appendBigInt(BigInt(data.id_producto)); + appender.appendInteger(parseInt(data.productos_ean)); + appender.appendVarchar(data.productos_descripcion); + appender.appendInteger(parseFloat(data.productos_cantidad_presentacion)); + appender.appendVarchar(data.productos_unidad_medida_presentacion); + appender.appendVarchar(data.productos_marca); + appender.appendInteger(parseFloat(data.productos_precio_lista)); + appender.appendInteger(parseFloat(data.productos_precio_referencia)); + appender.appendInteger(parseFloat(data.productos_cantidad_referencia)); + appender.appendVarchar(data.productos_unidad_medida_referencia); + appender.appendInteger(parseFloat(data.productos_precio_unitario_promo1)); + appender.appendVarchar(data.productos_leyenda_promo1); + appender.appendInteger(parseFloat(data.productos_precio_unitario_promo2)); + appender.appendVarchar(data.productos_leyenda_promo2); + appender.endRow(); + }) + .on("error", (err) => { + console.error(err); + }); + await new Promise((resolve) => csvStream.on("end", resolve)); + await appender.close(); } /** @@ -136,16 +240,18 @@ async function importDataset(dir) { const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)?.[1]; // TODO: parsear "Ultima actualizacion" al final del CSV y insertarlo en la tabla datasets - const db = await Database.create("importer.db"); + const connection = await instance.connect(); try { - await db.exec("begin transaction"); - let datasetId; - const res = await db.all( + await connection.run("begin transaction"); + const res = await connection.run( `insert into datasets (id, name, date, id_comercio) values (nextval('seq_datasets'), '${basename(dir)}', '${date}', ${id_comercio}) returning id` ); + const rows = await res.getRows(); + if (!rows[0][0]) throw new Error("No se pudo insertar el dataset"); + console.log("inserted dataset"); - datasetId = res[0].id; + const datasetId = parseInt(rows[0][0].toString()); const comercios = Papa.parse(await readFile(join(dir, "comercio.csv")), { header: true, @@ -153,93 +259,20 @@ async function importDataset(dir) { const comercioCuit = comercios.data[0].comercio_cuit; console.log(`dataset ${datasetId}, comercio ${comercioCuit}`); - await importBanderas(db, datasetId, dir); - await importSucursales(db, datasetId, dir); + await importBanderas(connection, datasetId, dir); + await importSucursales(connection, datasetId, dir); + await importPrecios(connection, datasetId, dir); - let file = await readFile(join(dir, "productos.csv")); - // WALL OF SHAME: estos proveedores no saben producir CSVs correctos - if (comercioCuit == "30612929455") { - // Libertad S.A. - file = file.replaceAll("|RAPTOR 6X16X45", "/RAPTOR 6X16X45"); - } else if (comercioCuit == "30578411174") { - // Alberdi S.A. - file = file.replaceAll(";", "|"); - } - if (["30707429468", "30589621499"].includes(comercioCuit)) { - // TODO: si tienen los valores, pero con otros nombres, por ejemplo - // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. - // pero no quiero mentir, asi que por ahora no lo importo - throw new Error( - `No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b` - ); - } - - console.time("parse"); - - const parsedData = Papa.parse(file, { - header: true, - skipEmptyLines: true, - }); - const objs = parsedData.data - .filter( - (data) => - data.id_comercio && - data.id_bandera && - data.id_sucursal && - data.id_producto - ) - .map((data) => { - delete data.id_dun_14; - return { - id_dataset: datasetId, - ...data, - productos_descripcion: data.productos_descripcion - .replaceAll("\t", " ") - .trim(), - productos_marca: data.productos_marca.trim(), - }; - }); - - const keys = [ - "id_dataset", - "id_comercio", - "id_bandera", - "id_sucursal", - "id_producto", - "productos_ean", - "productos_descripcion", - "productos_cantidad_presentacion", - "productos_unidad_medida_presentacion", - "productos_marca", - "productos_precio_lista", - "productos_precio_referencia", - "productos_cantidad_referencia", - "productos_unidad_medida_referencia", - "productos_precio_unitario_promo1", - "productos_leyenda_promo1", - "productos_precio_unitario_promo2", - "productos_leyenda_promo2", - ]; - - const lines = objs.map( - (data) => keys.map((key) => data[key]).join("\t") + "\n" - ); - - const tsv = `${keys.join("\t")}\n${lines.join("")}`; - await importTsv(db, "precios", tsv); - - console.timeEnd("parse"); - await db.exec("commit"); - console.info(`saved ${objs.length} rows`); + await connection.run("commit"); } catch (e) { // @ts-ignore - if (e.errorType == "Constraint") { + if (e.message.includes("Constraint Error: Duplicate key")) { console.log(`dataset ${basename(dir)} already exists`); - await db.exec("abort"); + await connection.run("abort"); return; } console.error("errored, aborting transaction", e); - await db.exec("abort"); + await connection.run("abort"); } finally { try { Bun.gc(true); @@ -252,12 +285,12 @@ async function importDataset(dir) { */ async function importDatasetTar(tarPath) { console.log(`importing tar ${tarPath}`); - const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-"); + const dir = await fsp.mkdtemp("/tmp/sepa-precios-importer-"); try { await $`tar -x -C ${dir} -f ${tarPath}`; await importDump(dir); } finally { - await fs.rm(dir, { recursive: true }); + await fsp.rm(dir, { recursive: true }); } } @@ -265,23 +298,55 @@ async function importDatasetTar(tarPath) { * @param {string} dumpDir */ async function importDump(dumpDir) { - const pQueue = new PQueue({ concurrency: 1 }); const files = await fg("**/productos.csv", { cwd: dumpDir }); for (const file of files) { const dir = join(dumpDir, dirname(file)); - pQueue.add(() => importDataset(dir)); + await importDataset(dir); } - await pQueue.onIdle(); } -let hasTars = false; -const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); -for (const file of files) { - hasTars = true; - const tar = join(process.argv[2], file); - await importDatasetTar(tar); +/** + * @param {string} dir + */ +async function getComercioMetadata(dir) { + const comercios = Papa.parse(await readFile(join(dir, "comercio.csv")), { + header: true, + }); + const comercioCuit = comercios.data[0].comercio_cuit; + return { comercioCuit }; } -if (!hasTars) { - await importDump(process.argv[2]); +// ----------- +// tenemos que detectar si el archivo es UTF-16 o UTF-8 +// porque DORINKA SRL a veces envía archivos con UTF-16. +// ------------ + +/** + * @param {string} path + * @returns {Promise} + */ +async function readFile(path) { + const buffer = await fsp.readFile(path, { encoding: null }); + if (buffer[0] === 0xff && buffer[1] === 0xfe) { + return buffer.toString("utf16le"); + } else { + return buffer.toString("utf8"); + } +} + +/** + * @param {string} path + * @returns {Promise} + */ +async function createReadStream(path) { + const chunks = []; + for await (let chunk of fs.createReadStream(path, { start: 0, end: 1 })) { + chunks.push(chunk); + } + const header = Buffer.concat(chunks); + if (header[0] === 0xff && header[1] === 0xfe) { + return fs.createReadStream(path, { encoding: "utf16le" }); + } else { + return fs.createReadStream(path, { encoding: "utf8" }); + } } diff --git a/sepa/package.json b/sepa/package.json index c0f48b4..2b7e995 100644 --- a/sepa/package.json +++ b/sepa/package.json @@ -17,10 +17,12 @@ "dependencies": { "@aws-sdk/client-s3": "^3.651.0", "@aws-sdk/lib-storage": "^3.651.0", + "@duckdb/node-api": "^1.1.3-alpha.4", "date-fns": "^3.6.0", "drizzle-orm": "^0.33.0", "duckdb": "^1.1.3", "duckdb-async": "^1.1.3", + "fast-csv": "^5.0.2", "fast-glob": "^3.3.2", "jschardet": "^3.1.3", "p-queue": "^8.0.1",