usar parser de duckdb

This commit is contained in:
Cat /dev/Nulo 2024-11-24 17:38:44 -03:00
parent a24cb20403
commit b1eda4a264

View file

@ -14,11 +14,14 @@ import {
DuckDBInstance,
} from "@duckdb/node-api";
import Papa from "papaparse";
import { writeFile } from "fs/promises";
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
const instance = await DuckDBInstance.create("importer.db");
const instance = await DuckDBInstance.create("importer.db", {
threads: "1",
});
const queue = new PQueue({ concurrency: 5 });
const queue = new PQueue({ concurrency: 1 });
let hasTars = false;
const files = await fg("**/*.tar.zst", { cwd: process.argv[2] });
@ -137,40 +140,15 @@ async function importBanderas(connection, datasetId, dir) {
*/
async function importPrecios(connection, datasetId, dir) {
const { comercioCuit } = await getComercioMetadata(dir);
const productosCsvPath = join(dir, "productos.csv");
/** @type {CsvParserStream<any,any>} */
let csvStream;
const appender = await connection.createAppender("main", "precios");
if (comercioCuit == "30612929455") {
// Libertad S.A.
const file = (await readFile(productosCsvPath)).replaceAll(
"|RAPTOR 6X16X45",
"/RAPTOR 6X16X45"
);
csvStream = parseString(file, {
headers: true,
delimiter: "|",
ignoreEmpty: true,
trim: true,
});
} else if (comercioCuit == "30578411174") {
// Alberdi S.A.
const file = (await readFile(productosCsvPath)).replaceAll(";", "|");
csvStream = parseString(file, {
headers: true,
delimiter: "|",
ignoreEmpty: true,
trim: true,
});
} else {
csvStream = (await createReadStream(productosCsvPath)).pipe(
parse({ headers: true, delimiter: "|", ignoreEmpty: true, trim: true })
);
}
if (["30707429468", "30589621499", "30663005843"].includes(comercioCuit)) {
if (
[
"30707429468",
"30589621499",
"30663005843",
// Alberdi S.A. -- escriben id_producto en formato 7,790127e+012
"30578411174",
].includes(comercioCuit)
) {
// TODO: si tienen los valores, pero con otros nombres, por ejemplo
// productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva.
// pero no quiero mentir, asi que por ahora no lo importo
@ -179,56 +157,45 @@ async function importPrecios(connection, datasetId, dir) {
);
}
csvStream
.on("data", (data) => {
if (
!data.id_comercio ||
!data.id_bandera ||
!data.id_sucursal ||
!data.id_producto
)
return;
if (comercioCuit == "30543659734") {
throw new Error("Megatone envia archivos vacios que dicen 'error'. lol.");
}
if (data.id_producto.includes("e+")) {
console.error(`[${dir}]`, "id_producto corrupto", data.id_producto);
return;
}
if (data.precio_unitario_bulto_por_unidad_venta_con_iva) {
console.error(
`[${dir}]`,
"tiene precio_unitario_bulto_por_unidad_venta_con_iva",
{
data,
}
);
return;
}
delete data.id_dun_14;
appender.appendInteger(datasetId);
appender.appendInteger(parseInt(data.id_comercio));
appender.appendInteger(parseInt(data.id_bandera));
appender.appendInteger(parseInt(data.id_sucursal));
appender.appendBigInt(BigInt(data.id_producto));
appender.appendInteger(parseInt(data.productos_ean));
appender.appendVarchar(data.productos_descripcion);
appender.appendInteger(parseFloat(data.productos_cantidad_presentacion));
appender.appendVarchar(data.productos_unidad_medida_presentacion);
appender.appendVarchar(data.productos_marca);
appender.appendInteger(parseFloat(data.productos_precio_lista));
appender.appendInteger(parseFloat(data.productos_precio_referencia));
appender.appendInteger(parseFloat(data.productos_cantidad_referencia));
appender.appendVarchar(data.productos_unidad_medida_referencia);
appender.appendInteger(parseFloat(data.productos_precio_unitario_promo1));
appender.appendVarchar(data.productos_leyenda_promo1);
appender.appendInteger(parseFloat(data.productos_precio_unitario_promo2));
appender.appendVarchar(data.productos_leyenda_promo2);
appender.endRow();
})
.on("error", (err) => {
console.error(err);
});
await new Promise((resolve) => csvStream.on("end", resolve));
await appender.close();
const sourceCsvPath = join(dir, "productos.csv");
const temp = await fsp.mkdtemp("/tmp/sepa-precios-importer-csv-cleaner-");
try {
const fixedCsvPath = join(temp, "productos.csv");
// /** @type {CsvParserStream<any,any>} */
// let csvStream;
// const appender = await connection.createAppender("main", "precios");
if (comercioCuit == "30612929455") {
// Libertad S.A.
const file = (await readFile(sourceCsvPath))
.replaceAll("|RAPTOR 6X16X45", "/RAPTOR 6X16X45")
.replace(/\r?\n *\r?\n[uúUÚÃ]/giu, "");
await writeFile(fixedCsvPath, file);
} else if (comercioCuit == "30578411174") {
// Alberdi S.A.
const file = (await readFile(sourceCsvPath)).replaceAll(";", "|");
await writeFile(fixedCsvPath, file);
// TODO: remove ultima actualizacion
} else {
let file = await readFile(sourceCsvPath);
file = file.replace(/\r?\n(&#032;)?\0? *\r?\n"?[uúUÚ]/giu, "");
file = file.replaceAll(/[ \t]*\n/g, "\n");
await writeFile(fixedCsvPath, file);
}
const sql = `insert into precios select ${datasetId} as id_dataset, * from read_csv('${fixedCsvPath}', delim='|', header=true, nullstr='')`;
console.debug("sql", sql);
await connection.run(sql);
await fsp.rm(temp, { recursive: true });
} finally {
}
}
/**
@ -274,6 +241,7 @@ async function importDataset(dir) {
console.error("errored, aborting transaction", e);
await connection.run("abort");
} finally {
await connection.run("CHECKPOINT");
try {
Bun.gc(true);
} catch {}