Compare commits

..

No commits in common. "076abab943c1211d05492b456732bc4c42994b0e" and "4c7d11840b1c58019515ebb809d8a0e12e8807b3" have entirely different histories.

4 changed files with 68 additions and 99 deletions

Binary file not shown.

View file

@ -9,7 +9,6 @@ export const zResource = z.object({
url: z.string(), url: z.string(),
modified: z.coerce.date().optional(), modified: z.coerce.date().optional(),
description: z.string(), description: z.string(),
name: z.string(),
}); });
export type Resource = z.infer<typeof zResource>; export type Resource = z.infer<typeof zResource>;
export const zDatasetInfo = z.object({ export const zDatasetInfo = z.object({

View file

@ -1,13 +1,10 @@
import * as fs from "fs/promises"; import { readFile } from "fs/promises";
import Papa from "papaparse"; import Papa from "papaparse";
import { basename, join, dirname } from "path"; import { basename, join, dirname } from "path";
import postgres from "postgres"; import postgres from "postgres";
import { Readable } from "stream"; import { Readable } from "stream";
import { pipeline } from "node:stream/promises"; import { pipeline } from "node:stream/promises";
import { Glob } from "bun"; import { Glob } from "bun";
import PQueue from "p-queue";
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
const sql = postgres({ const sql = postgres({
database: "sepa-precios", database: "sepa-precios",
@ -68,38 +65,21 @@ await sql`
productos_precio_unitario_promo1 NUMERIC(10, 2), productos_precio_unitario_promo1 NUMERIC(10, 2),
productos_leyenda_promo1 TEXT, productos_leyenda_promo1 TEXT,
productos_precio_unitario_promo2 NUMERIC(10, 2), productos_precio_unitario_promo2 NUMERIC(10, 2),
productos_leyenda_promo2 TEXT productos_leyenda_promo2 TEXT,
FOREIGN KEY (id_dataset, id_comercio, id_bandera, id_sucursal) REFERENCES sucursales(id_dataset, id_comercio, id_bandera, id_sucursal)
); );
`; `;
await sql`
CREATE INDEX IF NOT EXISTS idx_precios_composite ON precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto);
`;
await sql`
CREATE INDEX IF NOT EXISTS idx_sucursales_composite ON sucursales (id_dataset, id_comercio, id_bandera, id_sucursal);
`;
async function readFile(path: string) {
// XXX: DORINKA SRL a veces envía archivos con UTF-16.
const buffer = await fs.readFile(path, { encoding: null });
if (buffer[0] === 0xff && buffer[1] === 0xfe) {
return buffer.toString("utf16le");
} else {
return buffer.toString("utf8");
}
}
async function importSucursales( async function importSucursales(
sql: postgres.Sql, sql: postgres.Sql,
datasetId: number, datasetId: number,
dir: string dir: string,
) { ) {
const sucursales: Papa.ParseResult<any> = Papa.parse( const sucursales: Papa.ParseResult<any> = Papa.parse(
await readFile(join(dir, "sucursales.csv")), await readFile(join(dir, "sucursales.csv"), "utf-8"),
{ {
header: true, header: true,
} },
); );
const objs = sucursales.data const objs = sucursales.data
@ -118,7 +98,7 @@ async function importSucursales(
}); });
const keys = Object.keys(objs[0]); const keys = Object.keys(objs[0]);
const lines = Readable.from( const lines = Readable.from(
objs.map((data) => keys.map((key) => (data as any)[key]).join("\t") + "\n") objs.map((data) => keys.map((key) => (data as any)[key]).join("\t") + "\n"),
); );
const writable = const writable =
await sql`copy sucursales (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable(); await sql`copy sucursales (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
@ -141,17 +121,18 @@ async function importDataset(dir: string) {
const res = const res =
await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`; await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`;
datasetId = res[0].id; datasetId = res[0].id;
const datas: any[] = [];
const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse( const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse(
await readFile(join(dir, "comercio.csv")), await readFile(join(dir, "comercio.csv"), "utf-8"),
{ header: true } { header: true },
); );
const comercioCuit = comercios.data[0].comercio_cuit; const comercioCuit = comercios.data[0].comercio_cuit;
console.log(`dataset ${datasetId}, comercio ${comercioCuit}`); console.log(`dataset ${datasetId}, comercio ${comercioCuit}`);
await importSucursales(sql, datasetId, dir); await importSucursales(sql, datasetId, dir);
let file = await readFile(join(dir, "productos.csv")); let file = await readFile(join(dir, "productos.csv"), "utf-8");
// WALL OF SHAME: estos proveedores no saben producir CSVs correctos // WALL OF SHAME: estos proveedores no saben producir CSVs correctos
if (comercioCuit == "30612929455") { if (comercioCuit == "30612929455") {
// Libertad S.A. // Libertad S.A.
@ -167,75 +148,67 @@ async function importDataset(dir: string) {
// productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva.
// pero no quiero mentir, asi que por ahora no lo importo // pero no quiero mentir, asi que por ahora no lo importo
console.error( console.error(
`No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b` `No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b`,
); );
return; return;
} }
console.time("parse"); console.time("parse");
return await new Promise((resolve, reject) => {
const writable = Papa.parse(file, {
await sql`copy precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto, productos_ean, productos_descripcion, productos_cantidad_presentacion, productos_unidad_medida_presentacion, productos_marca, productos_precio_lista, productos_precio_referencia, productos_cantidad_referencia, productos_unidad_medida_referencia, productos_precio_unitario_promo1, productos_leyenda_promo1, productos_precio_unitario_promo2, productos_leyenda_promo2) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
let rowCount = 0;
async function* processRows() {
const parsedData = Papa.parse(file, {
header: true, header: true,
skipEmptyLines: true, step: function (result: any) {
}); const { data } = result;
for (const data of parsedData.data as any[]) {
if ( if (
data.id_comercio && data.id_comercio &&
data.id_bandera && data.id_bandera &&
data.id_sucursal && data.id_sucursal &&
data.id_producto data.id_producto
) { )
datas.push(data);
},
complete: async function () {
try {
console.timeEnd("parse");
console.time("map");
const objs = datas.map((data) => {
delete data.id_dun_14; delete data.id_dun_14;
const row = { return {
id_dataset: datasetId, id_dataset: datasetId,
...data, ...data,
productos_descripcion: data.productos_descripcion productos_descripcion: data.productos_descripcion.replaceAll(
.replaceAll("\t", " ") "\t",
.trim(), " ",
productos_marca: data.productos_marca.trim(), ),
}; };
const values = });
[ if (!objs.length) {
row.id_dataset, console.error(`No hay datos para el dataset ${dir}`);
row.id_comercio, return;
row.id_bandera,
row.id_sucursal,
row.id_producto,
row.productos_ean,
row.productos_descripcion,
row.productos_cantidad_presentacion,
row.productos_unidad_medida_presentacion,
row.productos_marca,
row.productos_precio_lista,
row.productos_precio_referencia,
row.productos_cantidad_referencia,
row.productos_unidad_medida_referencia,
row.productos_precio_unitario_promo1,
row.productos_leyenda_promo1,
row.productos_precio_unitario_promo2,
row.productos_leyenda_promo2,
].join("\t") + "\n";
rowCount++;
yield values;
} }
} const keys = Object.keys(objs[0]);
} const lines = Readable.from(
objs.map(
const generator = processRows(); (data) => keys.map((key) => data[key]).join("\t") + "\n",
await pipeline(Readable.from(generator), writable); ),
);
console.timeEnd("parse"); console.timeEnd("map");
console.info(`saved ${rowCount} rows`); console.time("copy");
const writable =
await sql`copy precios (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
await pipeline(lines, writable);
console.timeEnd("copy");
console.info(`saved ${objs.length} rows`);
} catch (e) {
reject(e);
return;
} finally {
Bun.gc(true); Bun.gc(true);
resolve(void 0);
}
},
skipEmptyLines: true,
});
});
}); });
} catch (e) { } catch (e) {
if ((e as any).code == "23505") { if ((e as any).code == "23505") {
@ -246,16 +219,13 @@ async function importDataset(dir: string) {
} }
} }
const pQueue = new PQueue({ concurrency: 4 });
try { try {
const glob = new Glob("**/productos.csv"); const glob = new Glob("**/productos.csv");
for await (const file of glob.scan(process.argv[2])) { for await (const file of glob.scan(process.argv[2])) {
const dir = join(process.argv[2], dirname(file)); const dir = join(process.argv[2], dirname(file));
console.log(dir); console.log(dir);
pQueue.add(() => importDataset(dir)); await importDataset(dir);
} }
} finally { } finally {
await pQueue.onIdle();
await sql.end(); await sql.end();
} }

View file

@ -10,9 +10,9 @@
"typescript": "^5.0.0" "typescript": "^5.0.0"
}, },
"dependencies": { "dependencies": {
"ckan": "workspace:*",
"p-queue": "^8.0.1", "p-queue": "^8.0.1",
"papaparse": "^5.4.1", "papaparse": "^5.4.1",
"postgres": "^3.4.4" "postgres": "^3.4.4",
"ckan": "workspace:*"
} }
} }