diff --git a/sepa/importer.js b/sepa/importer.js index a395499..efa01f8 100644 --- a/sepa/importer.js +++ b/sepa/importer.js @@ -14,11 +14,14 @@ import { DuckDBInstance, } from "@duckdb/node-api"; import Papa from "papaparse"; +import { writeFile } from "fs/promises"; // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) -const instance = await DuckDBInstance.create("importer.db"); +const instance = await DuckDBInstance.create("importer.db", { + threads: "1", +}); -const queue = new PQueue({ concurrency: 5 }); +const queue = new PQueue({ concurrency: 1 }); let hasTars = false; const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); @@ -137,40 +140,15 @@ async function importBanderas(connection, datasetId, dir) { */ async function importPrecios(connection, datasetId, dir) { const { comercioCuit } = await getComercioMetadata(dir); - const productosCsvPath = join(dir, "productos.csv"); - - /** @type {CsvParserStream} */ - let csvStream; - - const appender = await connection.createAppender("main", "precios"); - - if (comercioCuit == "30612929455") { - // Libertad S.A. - const file = (await readFile(productosCsvPath)).replaceAll( - "|RAPTOR 6X16X45", - "/RAPTOR 6X16X45" - ); - csvStream = parseString(file, { - headers: true, - delimiter: "|", - ignoreEmpty: true, - trim: true, - }); - } else if (comercioCuit == "30578411174") { - // Alberdi S.A. - const file = (await readFile(productosCsvPath)).replaceAll(";", "|"); - csvStream = parseString(file, { - headers: true, - delimiter: "|", - ignoreEmpty: true, - trim: true, - }); - } else { - csvStream = (await createReadStream(productosCsvPath)).pipe( - parse({ headers: true, delimiter: "|", ignoreEmpty: true, trim: true }) - ); - } - if (["30707429468", "30589621499", "30663005843"].includes(comercioCuit)) { + if ( + [ + "30707429468", + "30589621499", + "30663005843", + // Alberdi S.A. -- escriben id_producto en formato 7,790127e+012 + "30578411174", + ].includes(comercioCuit) + ) { // TODO: si tienen los valores, pero con otros nombres, por ejemplo // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. // pero no quiero mentir, asi que por ahora no lo importo @@ -179,56 +157,45 @@ async function importPrecios(connection, datasetId, dir) { ); } - csvStream - .on("data", (data) => { - if ( - !data.id_comercio || - !data.id_bandera || - !data.id_sucursal || - !data.id_producto - ) - return; + if (comercioCuit == "30543659734") { + throw new Error("Megatone envia archivos vacios que dicen 'error'. lol."); + } - if (data.id_producto.includes("e+")) { - console.error(`[${dir}]`, "id_producto corrupto", data.id_producto); - return; - } - if (data.precio_unitario_bulto_por_unidad_venta_con_iva) { - console.error( - `[${dir}]`, - "tiene precio_unitario_bulto_por_unidad_venta_con_iva", - { - data, - } - ); - return; - } - delete data.id_dun_14; - appender.appendInteger(datasetId); - appender.appendInteger(parseInt(data.id_comercio)); - appender.appendInteger(parseInt(data.id_bandera)); - appender.appendInteger(parseInt(data.id_sucursal)); - appender.appendBigInt(BigInt(data.id_producto)); - appender.appendInteger(parseInt(data.productos_ean)); - appender.appendVarchar(data.productos_descripcion); - appender.appendInteger(parseFloat(data.productos_cantidad_presentacion)); - appender.appendVarchar(data.productos_unidad_medida_presentacion); - appender.appendVarchar(data.productos_marca); - appender.appendInteger(parseFloat(data.productos_precio_lista)); - appender.appendInteger(parseFloat(data.productos_precio_referencia)); - appender.appendInteger(parseFloat(data.productos_cantidad_referencia)); - appender.appendVarchar(data.productos_unidad_medida_referencia); - appender.appendInteger(parseFloat(data.productos_precio_unitario_promo1)); - appender.appendVarchar(data.productos_leyenda_promo1); - appender.appendInteger(parseFloat(data.productos_precio_unitario_promo2)); - appender.appendVarchar(data.productos_leyenda_promo2); - appender.endRow(); - }) - .on("error", (err) => { - console.error(err); - }); - await new Promise((resolve) => csvStream.on("end", resolve)); - await appender.close(); + const sourceCsvPath = join(dir, "productos.csv"); + + const temp = await fsp.mkdtemp("/tmp/sepa-precios-importer-csv-cleaner-"); + try { + const fixedCsvPath = join(temp, "productos.csv"); + + // /** @type {CsvParserStream} */ + // let csvStream; + + // const appender = await connection.createAppender("main", "precios"); + + if (comercioCuit == "30612929455") { + // Libertad S.A. + const file = (await readFile(sourceCsvPath)) + .replaceAll("|RAPTOR 6X16X45", "/RAPTOR 6X16X45") + .replace(/\r?\n *\r?\n[uúUÚÃ]/giu, ""); + await writeFile(fixedCsvPath, file); + } else if (comercioCuit == "30578411174") { + // Alberdi S.A. + const file = (await readFile(sourceCsvPath)).replaceAll(";", "|"); + await writeFile(fixedCsvPath, file); + // TODO: remove ultima actualizacion + } else { + let file = await readFile(sourceCsvPath); + file = file.replace(/\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu, ""); + file = file.replaceAll(/[ \t]*\n/g, "\n"); + await writeFile(fixedCsvPath, file); + } + + const sql = `insert into precios select ${datasetId} as id_dataset, * from read_csv('${fixedCsvPath}', delim='|', header=true, nullstr='')`; + console.debug("sql", sql); + await connection.run(sql); + await fsp.rm(temp, { recursive: true }); + } finally { + } } /** @@ -274,6 +241,7 @@ async function importDataset(dir) { console.error("errored, aborting transaction", e); await connection.run("abort"); } finally { + await connection.run("CHECKPOINT"); try { Bun.gc(true); } catch {}