usar generator

This commit is contained in:
Cat /dev/Nulo 2024-09-07 00:35:17 -03:00
parent c8c603aade
commit 1644d8b207

View file

@ -1,4 +1,5 @@
import { readFile } from "fs/promises"; import * as fs from "fs/promises";
import { createWriteStream } from "fs";
import Papa from "papaparse"; import Papa from "papaparse";
import { basename, join, dirname } from "path"; import { basename, join, dirname } from "path";
import postgres from "postgres"; import postgres from "postgres";
@ -65,19 +66,25 @@ await sql`
productos_precio_unitario_promo1 NUMERIC(10, 2), productos_precio_unitario_promo1 NUMERIC(10, 2),
productos_leyenda_promo1 TEXT, productos_leyenda_promo1 TEXT,
productos_precio_unitario_promo2 NUMERIC(10, 2), productos_precio_unitario_promo2 NUMERIC(10, 2),
productos_leyenda_promo2 TEXT, productos_leyenda_promo2 TEXT
FOREIGN KEY (id_dataset, id_comercio, id_bandera, id_sucursal) REFERENCES sucursales(id_dataset, id_comercio, id_bandera, id_sucursal),
PRIMARY KEY (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto)
); );
`; `;
await sql`
CREATE INDEX IF NOT EXISTS idx_precios_composite ON precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto);
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_sucursales_composite ON sucursales (id_dataset, id_comercio, id_bandera, id_sucursal);
`;
async function importSucursales( async function importSucursales(
sql: postgres.Sql, sql: postgres.Sql,
datasetId: number, datasetId: number,
dir: string dir: string
) { ) {
const sucursales: Papa.ParseResult<any> = Papa.parse( const sucursales: Papa.ParseResult<any> = Papa.parse(
await readFile(join(dir, "sucursales.csv"), "utf-8"), await fs.readFile(join(dir, "sucursales.csv"), "utf-8"),
{ {
header: true, header: true,
} }
@ -122,10 +129,9 @@ async function importDataset(dir: string) {
const res = const res =
await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`; await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`;
datasetId = res[0].id; datasetId = res[0].id;
const datas: any[] = [];
const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse( const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse(
await readFile(join(dir, "comercio.csv"), "utf-8"), await fs.readFile(join(dir, "comercio.csv"), "utf-8"),
{ header: true } { header: true }
); );
const comercioCuit = comercios.data[0].comercio_cuit; const comercioCuit = comercios.data[0].comercio_cuit;
@ -133,7 +139,7 @@ async function importDataset(dir: string) {
await importSucursales(sql, datasetId, dir); await importSucursales(sql, datasetId, dir);
let file = await readFile(join(dir, "productos.csv"), "utf-8"); let file = await fs.readFile(join(dir, "productos.csv"), "utf-8");
// WALL OF SHAME: estos proveedores no saben producir CSVs correctos // WALL OF SHAME: estos proveedores no saben producir CSVs correctos
if (comercioCuit == "30612929455") { if (comercioCuit == "30612929455") {
// Libertad S.A. // Libertad S.A.
@ -153,68 +159,71 @@ async function importDataset(dir: string) {
); );
return; return;
} }
console.time("parse"); console.time("parse");
return await new Promise((resolve, reject) => {
Papa.parse(file, { const writable =
await sql`copy precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto, productos_ean, productos_descripcion, productos_cantidad_presentacion, productos_unidad_medida_presentacion, productos_marca, productos_precio_lista, productos_precio_referencia, productos_cantidad_referencia, productos_unidad_medida_referencia, productos_precio_unitario_promo1, productos_leyenda_promo1, productos_precio_unitario_promo2, productos_leyenda_promo2) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
let rowCount = 0;
async function* processRows() {
const parsedData = Papa.parse(file, {
header: true, header: true,
step: function (result: any) { skipEmptyLines: true,
const { data } = result; });
for (const data of parsedData.data as any[]) {
if ( if (
data.id_comercio && data.id_comercio &&
data.id_bandera && data.id_bandera &&
data.id_sucursal && data.id_sucursal &&
data.id_producto data.id_producto
) ) {
datas.push(data);
},
complete: async function () {
try {
console.timeEnd("parse");
console.time("map");
const objs = datas.map((data) => {
delete data.id_dun_14; delete data.id_dun_14;
return { const row = {
id_dataset: datasetId, id_dataset: datasetId,
...data, ...data,
productos_descripcion: data.productos_descripcion.replaceAll( productos_descripcion: data.productos_descripcion
"\t", .replaceAll("\t", " ")
" " .trim(),
), productos_marca: data.productos_marca.trim(),
}; };
}); const values =
if (!objs.length) { [
console.error(`No hay datos para el dataset ${dir}`); row.id_dataset,
return; row.id_comercio,
} row.id_bandera,
const keys = Object.keys(objs[0]); row.id_sucursal,
console.timeEnd("map"); row.id_producto,
console.time("copy"); row.productos_ean,
const writable = row.productos_descripcion,
await sql`copy precios (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable(); row.productos_cantidad_presentacion,
row.productos_unidad_medida_presentacion,
row.productos_marca,
row.productos_precio_lista,
row.productos_precio_referencia,
row.productos_cantidad_referencia,
row.productos_unidad_medida_referencia,
row.productos_precio_unitario_promo1,
row.productos_leyenda_promo1,
row.productos_precio_unitario_promo2,
row.productos_leyenda_promo2,
].join("\t") + "\n";
await pipeline( rowCount++;
Readable.from( yield values;
(async function* () {
for (const data of objs) {
yield keys.map((key) => data[key]).join("\t") + "\n";
} }
})() }
), }
writable
); const generator = processRows();
console.timeEnd("copy"); await pipeline(Readable.from(generator), writable);
console.info(`saved ${objs.length} rows`);
} catch (e) { console.timeEnd("parse");
reject(e); console.info(`saved ${rowCount} rows`);
return;
} finally {
Bun.gc(true); Bun.gc(true);
resolve(void 0);
}
},
skipEmptyLines: true,
});
});
}); });
} catch (e) { } catch (e) {
if ((e as any).code == "23505") { if ((e as any).code == "23505") {