diff --git a/sepa/bun.lockb b/sepa/bun.lockb index 13f51e8..c279026 100755 Binary files a/sepa/bun.lockb and b/sepa/bun.lockb differ diff --git a/sepa/duckdb.sql b/sepa/duckdb.sql new file mode 100644 index 0000000..c61e7e9 --- /dev/null +++ b/sepa/duckdb.sql @@ -0,0 +1,81 @@ +CREATE TABLE datasets ( + id INTEGER PRIMARY KEY, + name TEXT UNIQUE, + date DATE, + id_comercio INTEGER +); +CREATE SEQUENCE seq_datasets START 1; + + +CREATE TABLE precios ( + id_dataset INTEGER, + id_comercio INTEGER, + id_bandera INTEGER, + id_sucursal INTEGER, + id_producto BIGINT, + productos_ean INTEGER, + productos_descripcion TEXT, + productos_cantidad_presentacion DECIMAL(10,2), + productos_unidad_medida_presentacion TEXT, + productos_marca TEXT, + productos_precio_lista DECIMAL(10,2), + productos_precio_referencia DECIMAL(10,2), + productos_cantidad_referencia DECIMAL(10,2), + productos_unidad_medida_referencia TEXT, + productos_precio_unitario_promo1 DECIMAL(10,2), + productos_leyenda_promo1 TEXT, + productos_precio_unitario_promo2 DECIMAL(10,2), + productos_leyenda_promo2 TEXT, + FOREIGN KEY (id_dataset) REFERENCES datasets(id) +); + +CREATE TABLE productos_descripcion_index ( + id_producto BIGINT, + productos_descripcion TEXT UNIQUE, + productos_marca TEXT +); + +CREATE TABLE sucursales ( + id_dataset INTEGER, + id_comercio INTEGER, + id_bandera INTEGER, + id_sucursal INTEGER, + sucursales_nombre TEXT, + sucursales_tipo TEXT, + sucursales_calle TEXT, + sucursales_numero TEXT, + sucursales_latitud DECIMAL, + sucursales_longitud DECIMAL, + sucursales_observaciones TEXT, + sucursales_barrio TEXT, + sucursales_codigo_postal TEXT, + sucursales_localidad TEXT, + sucursales_provincia TEXT, + sucursales_lunes_horario_atencion TEXT, + sucursales_martes_horario_atencion TEXT, + sucursales_miercoles_horario_atencion TEXT, + sucursales_jueves_horario_atencion TEXT, + sucursales_viernes_horario_atencion TEXT, + sucursales_sabado_horario_atencion TEXT, + sucursales_domingo_horario_atencion TEXT, + FOREIGN KEY (id_dataset) REFERENCES datasets(id), + UNIQUE (id_dataset, id_comercio, id_bandera, id_sucursal) +); + +CREATE TABLE banderas ( + id_dataset INTEGER, + id_comercio INTEGER NOT NULL, + id_bandera INTEGER NOT NULL, + comercio_cuit TEXT NOT NULL, + comercio_razon_social TEXT, + comercio_bandera_nombre TEXT, + comercio_bandera_url TEXT, + comercio_ultima_actualizacion DATE, + comercio_version_sepa TEXT, + FOREIGN KEY (id_dataset) REFERENCES datasets(id) +); + +-- Create indexes +CREATE INDEX idx_precios_id_producto ON precios(id_producto); +CREATE INDEX idx_precios_id_producto_id_dataset ON precios(id_producto, id_dataset); +CREATE INDEX idx_precios_id_dataset_id_comercio_id_sucursal ON precios(id_dataset, id_comercio, id_sucursal); \ No newline at end of file diff --git a/sepa/importer.js b/sepa/importer.js new file mode 100644 index 0000000..667fd54 --- /dev/null +++ b/sepa/importer.js @@ -0,0 +1,287 @@ +// @ts-check + +import * as fs from "fs/promises"; +import Papa from "papaparse"; +import { basename, join, dirname } from "path"; +import { $ } from "zx"; +import PQueue from "p-queue"; +import { Database } from "duckdb-async"; +import fg from "fast-glob"; +// import { waddler } from "waddler"; + +// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) + +/** + * @param {string} path + * @returns {Promise} + */ +async function readFile(path) { + // XXX: DORINKA SRL a veces envía archivos con UTF-16. + const buffer = await fs.readFile(path, { encoding: null }); + if (buffer[0] === 0xff && buffer[1] === 0xfe) { + return buffer.toString("utf16le"); + } else { + return buffer.toString("utf8"); + } +} + +/** + * @param {Database} db + * @param {number} datasetId + * @param {string} dir + */ +async function importSucursales(db, datasetId, dir) { + const sucursales = Papa.parse(await readFile(join(dir, "sucursales.csv")), { + header: true, + }); + + const objs = sucursales.data + .filter((data) => data.id_comercio && data.id_bandera && data.id_sucursal) + .map((data) => { + // Megatone + if ("sucursales_domingohorario_atencion" in data) { + data.sucursales_domingo_horario_atencion = + data.sucursales_domingohorario_atencion; + delete data.sucursales_domingohorario_atencion; + } + data.sucursales_nombre = data.sucursales_nombre + .replaceAll("\t", " ") + .trim(); + return { + id_dataset: datasetId, + ...data, + }; + }); + const keys = Object.keys(objs[0]); + if (!keys.includes("sucursales_longitud")) { + throw new Error("Alberdi S.A. strikes again! las sucursales están rotas."); + } + const lines = objs.map( + (data) => keys.map((key) => data[key]).join("\t") + "\n" + ); + + const tsv = `${keys.join("\t")}\n${lines.join("")}`; + await importTsv(db, "sucursales", tsv); +} + +/** + * @param {Database} db + * @param {string} table + * @param {string} tsv + */ +async function importTsv(db, table, tsv) { + const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-"); + try { + const tempFile = join(dir, "temp.tsv"); + await fs.writeFile(tempFile, tsv); + console.log( + `COPY ${table} FROM '${tempFile}' WITH (HEADER, DELIMITER '\t', NULL '', QUOTE '')` + ); + await db.exec( + `COPY ${table} FROM '${tempFile}' WITH (HEADER, DELIMITER '\t', NULL '', QUOTE '')` + ); + await fs.rm(dir, { recursive: true }); + } finally { + } +} + +/** + * @param {Database} db + * @param {number} datasetId + * @param {string} dir + */ +async function importBanderas(db, datasetId, dir) { + const banderas = Papa.parse(await readFile(join(dir, "comercio.csv")), { + header: true, + }); + const objs = banderas.data.map((data) => ({ + id_dataset: datasetId, + ...data, + })); + const keys = [ + "id_dataset", + "id_comercio", + "id_bandera", + "comercio_cuit", + "comercio_razon_social", + "comercio_bandera_nombre", + "comercio_bandera_url", + "comercio_ultima_actualizacion", + "comercio_version_sepa", + ]; + const lines = objs + .filter((data) => data.id_comercio && data.id_bandera) + .map((data) => + keys + .map((key) => { + const value = data[key]; + if (typeof value !== "string") { + return value; + } + return value.replaceAll("\t", " ").trim(); + }) + .join("\t") + ); + const tsv = `${keys.join("\t")}\n${lines.join("\n")}`; + + await importTsv(db, "banderas", tsv); +} + +/** + * @param {string} dir + */ +async function importDataset(dir) { + console.log(dir); + const date = basename(dir).match(/(\d{4}-\d{2}-\d{2})/)?.[1]; + const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)?.[1]; + // TODO: parsear "Ultima actualizacion" al final del CSV y insertarlo en la tabla datasets + + const db = await Database.create("importer.db"); + + try { + await db.exec("begin transaction"); + let datasetId; + const res = await db.all( + `insert into datasets (id, name, date, id_comercio) values (nextval('seq_datasets'), '${basename(dir)}', '${date}', ${id_comercio}) returning id` + ); + console.log("inserted dataset"); + datasetId = res[0].id; + + const comercios = Papa.parse(await readFile(join(dir, "comercio.csv")), { + header: true, + }); + const comercioCuit = comercios.data[0].comercio_cuit; + console.log(`dataset ${datasetId}, comercio ${comercioCuit}`); + + await importBanderas(db, datasetId, dir); + await importSucursales(db, datasetId, dir); + + let file = await readFile(join(dir, "productos.csv")); + // WALL OF SHAME: estos proveedores no saben producir CSVs correctos + if (comercioCuit == "30612929455") { + // Libertad S.A. + file = file.replaceAll("|RAPTOR 6X16X45", "/RAPTOR 6X16X45"); + } else if (comercioCuit == "30578411174") { + // Alberdi S.A. + file = file.replaceAll(";", "|"); + } + if (["30707429468", "30589621499"].includes(comercioCuit)) { + // TODO: si tienen los valores, pero con otros nombres, por ejemplo + // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. + // pero no quiero mentir, asi que por ahora no lo importo + throw new Error( + `No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b` + ); + } + + console.time("parse"); + + const parsedData = Papa.parse(file, { + header: true, + skipEmptyLines: true, + }); + const objs = parsedData.data + .filter( + (data) => + data.id_comercio && + data.id_bandera && + data.id_sucursal && + data.id_producto + ) + .map((data) => { + delete data.id_dun_14; + return { + id_dataset: datasetId, + ...data, + productos_descripcion: data.productos_descripcion + .replaceAll("\t", " ") + .trim(), + productos_marca: data.productos_marca.trim(), + }; + }); + + const keys = [ + "id_dataset", + "id_comercio", + "id_bandera", + "id_sucursal", + "id_producto", + "productos_ean", + "productos_descripcion", + "productos_cantidad_presentacion", + "productos_unidad_medida_presentacion", + "productos_marca", + "productos_precio_lista", + "productos_precio_referencia", + "productos_cantidad_referencia", + "productos_unidad_medida_referencia", + "productos_precio_unitario_promo1", + "productos_leyenda_promo1", + "productos_precio_unitario_promo2", + "productos_leyenda_promo2", + ]; + + const lines = objs.map( + (data) => keys.map((key) => data[key]).join("\t") + "\n" + ); + + const tsv = `${keys.join("\t")}\n${lines.join("")}`; + await importTsv(db, "precios", tsv); + + console.timeEnd("parse"); + await db.exec("commit"); + console.info(`saved ${objs.length} rows`); + } catch (e) { + // @ts-ignore + if (e.errorType == "Constraint") { + console.log(`dataset ${basename(dir)} already exists`); + await db.exec("abort"); + return; + } + console.error("errored, aborting transaction", e); + await db.exec("abort"); + } finally { + try { + Bun.gc(true); + } catch {} + } +} + +/** + * @param {string} tarPath + */ +async function importDatasetTar(tarPath) { + console.log(`importing tar ${tarPath}`); + const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-"); + try { + await $`tar -x -C ${dir} -f ${tarPath}`; + await importDump(dir); + } finally { + await fs.rm(dir, { recursive: true }); + } +} + +/** + * @param {string} dumpDir + */ +async function importDump(dumpDir) { + const pQueue = new PQueue({ concurrency: 1 }); + const files = await fg("**/productos.csv", { cwd: dumpDir }); + for (const file of files) { + const dir = join(dumpDir, dirname(file)); + pQueue.add(() => importDataset(dir)); + } + await pQueue.onIdle(); +} + +let hasTars = false; +const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); +for (const file of files) { + hasTars = true; + const tar = join(process.argv[2], file); + await importDatasetTar(tar); +} + +if (!hasTars) { + await importDump(process.argv[2]); +} diff --git a/sepa/importer.ts b/sepa/importer.ts deleted file mode 100644 index 27f5574..0000000 --- a/sepa/importer.ts +++ /dev/null @@ -1,253 +0,0 @@ -import * as fs from "fs/promises"; -import Papa from "papaparse"; -import { basename, join, dirname } from "path"; -import postgres from "postgres"; -import { Readable } from "stream"; -import { pipeline } from "node:stream/promises"; -import { $, Glob } from "bun"; -import PQueue from "p-queue"; - -// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) - -const sql = postgres({}); - -async function readFile(path: string) { - // XXX: DORINKA SRL a veces envía archivos con UTF-16. - const buffer = await fs.readFile(path, { encoding: null }); - if (buffer[0] === 0xff && buffer[1] === 0xfe) { - return buffer.toString("utf16le"); - } else { - return buffer.toString("utf8"); - } -} - -async function importSucursales( - sql: postgres.Sql, - datasetId: number, - dir: string -) { - const sucursales: Papa.ParseResult = Papa.parse( - await readFile(join(dir, "sucursales.csv")), - { - header: true, - } - ); - - const objs = sucursales.data - .filter((data) => data.id_comercio && data.id_bandera && data.id_sucursal) - .map((data) => { - // Megatone - if ("sucursales_domingohorario_atencion" in data) { - data.sucursales_domingo_horario_atencion = - data.sucursales_domingohorario_atencion; - delete data.sucursales_domingohorario_atencion; - } - return { - id_dataset: datasetId, - ...data, - }; - }); - const keys = Object.keys(objs[0]); - const lines = Readable.from( - objs.map((data) => keys.map((key) => (data as any)[key]).join("\t") + "\n") - ); - const writable = - await sql`copy sucursales (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable(); - await pipeline(lines, writable); -} - -async function importBanderas( - sql: postgres.Sql, - datasetId: number, - dir: string -) { - const banderas: Papa.ParseResult = Papa.parse( - await readFile(join(dir, "comercio.csv")), - { header: true } - ); - const objs = banderas.data.map((data) => ({ - id_dataset: datasetId, - ...data, - })); - const keys = [ - "id_dataset", - "id_comercio", - "id_bandera", - "comercio_cuit", - "comercio_razon_social", - "comercio_bandera_nombre", - "comercio_bandera_url", - "comercio_ultima_actualizacion", - "comercio_version_sepa", - ]; - const lines = Readable.from( - objs - .filter((data) => data.id_comercio && data.id_bandera) - .map( - (data) => - keys - .map((key) => { - const value = (data as any)[key]; - if (typeof value !== "string") { - return value; - } - return value.replaceAll("\t", " ").trim(); - }) - .join("\t") + "\n" - ) - ); - const writable = - await sql`copy banderas (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable(); - await pipeline(lines, writable); -} - -async function importDataset(dir: string) { - console.log(dir); - const date = basename(dir).match(/(\d{4}-\d{2}-\d{2})/)![1]; - const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)![1]; - // TODO: parsear "Ultima actualizacion" al final del CSV y insertarlo en la tabla datasets - - try { - await sql.begin(async (sql) => { - let datasetId: number; - const res = - await sql`insert into datasets (name, date, id_comercio) values (${basename(dir)}, ${date}, ${id_comercio}) returning id`; - datasetId = res[0].id; - - const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse( - await readFile(join(dir, "comercio.csv")), - { header: true } - ); - const comercioCuit = comercios.data[0].comercio_cuit; - console.log(`dataset ${datasetId}, comercio ${comercioCuit}`); - - await importBanderas(sql, datasetId, dir); - await importSucursales(sql, datasetId, dir); - - let file = await readFile(join(dir, "productos.csv")); - // WALL OF SHAME: estos proveedores no saben producir CSVs correctos - if (comercioCuit == "30612929455") { - // Libertad S.A. - file = file.replaceAll("|RAPTOR 6X16X45", "/RAPTOR 6X16X45"); - } else if (comercioCuit == "30578411174") { - // Alberdi S.A. - file = file.replaceAll(";", "|"); - } - if (["30707429468", "30589621499"].includes(comercioCuit)) { - // TODO: si tienen los valores, pero con otros nombres, por ejemplo - // productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva. - // pero no quiero mentir, asi que por ahora no lo importo - throw new Error( - `No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b` - ); - } - - console.time("parse"); - - const writable = - await sql`copy precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto, productos_ean, productos_descripcion, productos_cantidad_presentacion, productos_unidad_medida_presentacion, productos_marca, productos_precio_lista, productos_precio_referencia, productos_cantidad_referencia, productos_unidad_medida_referencia, productos_precio_unitario_promo1, productos_leyenda_promo1, productos_precio_unitario_promo2, productos_leyenda_promo2) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable(); - - let rowCount = 0; - - async function* processRows() { - const parsedData = Papa.parse(file, { - header: true, - skipEmptyLines: true, - }); - - for (const data of parsedData.data as any[]) { - if ( - data.id_comercio && - data.id_bandera && - data.id_sucursal && - data.id_producto - ) { - delete data.id_dun_14; - const row = { - id_dataset: datasetId, - ...data, - productos_descripcion: data.productos_descripcion - .replaceAll("\t", " ") - .trim(), - productos_marca: data.productos_marca.trim(), - }; - const values = - [ - row.id_dataset, - row.id_comercio, - row.id_bandera, - row.id_sucursal, - row.id_producto, - row.productos_ean, - row.productos_descripcion, - row.productos_cantidad_presentacion, - row.productos_unidad_medida_presentacion, - row.productos_marca, - row.productos_precio_lista, - row.productos_precio_referencia, - row.productos_cantidad_referencia, - row.productos_unidad_medida_referencia, - row.productos_precio_unitario_promo1, - row.productos_leyenda_promo1, - row.productos_precio_unitario_promo2, - row.productos_leyenda_promo2, - ].join("\t") + "\n"; - - rowCount++; - yield values; - } - } - } - - const generator = processRows(); - await pipeline(Readable.from(generator), writable); - - console.timeEnd("parse"); - console.info(`saved ${rowCount} rows`); - - Bun.gc(true); - }); - } catch (e) { - if ((e as any).code == "23505") { - console.log(`dataset ${basename(dir)} already exists`); - return; - } - throw e; - } -} - -async function importDatasetTar(tarPath: string) { - console.log(`importing tar ${tarPath}`); - const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-"); - try { - await $`tar -x -C ${dir} -f ${tarPath}`; - await importDump(dir); - } finally { - await fs.rm(dir, { recursive: true }); - } -} -async function importDump(dumpDir: string) { - const pQueue = new PQueue({ concurrency: 2 }); - const glob = new Glob("**/productos.csv"); - for await (const file of glob.scan(dumpDir)) { - const dir = join(dumpDir, dirname(file)); - pQueue.add(() => importDataset(dir)); - } - await pQueue.onIdle(); -} - -try { - const tarGlob = new Glob("**/*.tar.zst"); - let hasTars = false; - for await (const file of tarGlob.scan(process.argv[2])) { - hasTars = true; - const tar = join(process.argv[2], file); - await importDatasetTar(tar); - } - - if (!hasTars) { - await importDump(process.argv[2]); - } -} finally { - await sql.end(); -} diff --git a/sepa/package.json b/sepa/package.json index ddeecd8..c0f48b4 100644 --- a/sepa/package.json +++ b/sepa/package.json @@ -3,10 +3,13 @@ "private": true, "type": "module", "devDependencies": { + "0x": "^5.8.0", + "@mmarchini/observe": "^3.0.0", "@types/bun": "^1.1.11", - "bun-types": "^1.1.30", "@types/papaparse": "^5.3.14", - "drizzle-kit": "^0.24.2" + "bun-types": "^1.1.30", + "drizzle-kit": "^0.24.2", + "tsx": "^4.19.2" }, "peerDependencies": { "typescript": "^5.0.0" @@ -16,11 +19,16 @@ "@aws-sdk/lib-storage": "^3.651.0", "date-fns": "^3.6.0", "drizzle-orm": "^0.33.0", + "duckdb": "^1.1.3", + "duckdb-async": "^1.1.3", + "fast-glob": "^3.3.2", "jschardet": "^3.1.3", "p-queue": "^8.0.1", "papaparse": "^5.4.1", "postgres": "^3.4.4", - "zod": "^3.23.8" + "waddler": "^0.0.3", + "zod": "^3.23.8", + "zx": "^8.2.2" }, "workspaces": [ "sitio2"