diff --git a/sepa/bun.lockb b/sepa/bun.lockb index 7493719..82b22b8 100755 Binary files a/sepa/bun.lockb and b/sepa/bun.lockb differ diff --git a/sepa/importer-bench.js b/sepa/importer-bench.js new file mode 100644 index 0000000..18b0535 --- /dev/null +++ b/sepa/importer-bench.js @@ -0,0 +1,30 @@ +// @ts-check +import { run, bench, boxplot } from "mitata"; +import { execFile } from "node:child_process"; +import { promisify } from "node:util"; +import { main } from "./importer.js"; +import { rm } from "node:fs/promises"; +import { readFileSync } from "node:fs"; +const execFileAsync = promisify(execFile); + +async function fibonacci() { + await main("samples"); +} +bench("main", async function* () { + await rm("importer.db", { force: true }); + await rm("importer.db.wal", { force: true }); + await execFileAsync("duckdb", [ + "importer.db", + readFileSync("duckdb.sql", "utf8"), + ]); + yield () => fibonacci(); +}); + +// await run(); +await rm("importer.db", { force: true }); +await rm("importer.db.wal", { force: true }); +await execFileAsync("duckdb", [ + "importer.db", + readFileSync("duckdb.sql", "utf8"), +]); +await fibonacci(); diff --git a/sepa/importer.js b/sepa/importer.js index 924f1f4..4df9bc7 100644 --- a/sepa/importer.js +++ b/sepa/importer.js @@ -3,7 +3,7 @@ import * as fsp from "fs/promises"; import * as fs from "fs"; import { CsvParserStream, parse, parseString } from "fast-csv"; -import { basename, join, dirname } from "path"; +import { basename, join, dirname, resolve } from "path"; import { $ } from "zx"; import PQueue from "p-queue"; import { Database } from "duckdb-async"; @@ -15,25 +15,53 @@ import { } from "@duckdb/node-api"; import Papa from "papaparse"; import { writeFile } from "fs/promises"; +import { fileURLToPath } from "url"; + +const pathToThisFile = resolve(fileURLToPath(import.meta.url)); +const pathPassedToNode = resolve(process.argv[1]); +const isThisFileBeingRunViaCLI = pathToThisFile.includes(pathPassedToNode); // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) -const instance = await DuckDBInstance.create("importer.db", { - // threads: "1", -}); -const queue = new PQueue({ concurrency: 4 }); +const console = { + // @ts-ignore + log: (..._args) => {}, + // @ts-ignore + error: (..._args) => globalThis.console.error(..._args), + // @ts-ignore + debug: (..._args) => {}, +}; -let hasTars = false; -const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); -for (const file of files) { - hasTars = true; - const tar = join(process.argv[2], file); - queue.add(() => importDatasetTar(tar)); +if (isThisFileBeingRunViaCLI) { + await main(process.argv[2]); } -await queue.onIdle(); -if (!hasTars) { - await importDump(process.argv[2]); +/** + * @param {string} cwd + */ +export async function main(cwd) { + const instance = await DuckDBInstance.create("importer.db", { + // threads: "6", + }); + if (cwd.endsWith(".tar.zst")) { + await importDatasetTar(instance, cwd); + return; + } + const queue = new PQueue({ concurrency: 4 }); + + let hasTars = false; + const files = await fg("**/*.tar.zst", { cwd }); + console.log(`found ${files.length} tars`); + for (const file of files) { + hasTars = true; + const tar = join(cwd, file); + queue.add(() => importDatasetTar(instance, tar)); + } + await queue.onIdle(); + + if (!hasTars) { + await importDump(instance, cwd); + } } /** @@ -194,9 +222,25 @@ async function importPrecios(connection, datasetId, dir) { await writeFile(fixedCsvPath, file); // TODO: remove ultima actualizacion } else { - let file = await readFile(sourceCsvPath); - file = file.replace(/\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu, ""); - file = file.replaceAll(/[ \t]*\n/g, "\n"); + // let file = await readFile(sourceCsvPath); + let file = await fsp.readFile(sourceCsvPath, "utf8"); + + if ( + ["30590360763", "30687310434", "30685849751", "30525705931"].includes( + comercioCuit + ) + ) { + let separator = file.lastIndexOf("\n\n"); + if (separator === -1) separator = file.lastIndexOf("\r\n\r\n"); + if (separator === -1) separator = file.lastIndexOf("\n\r\n"); + if (separator === -1) separator = file.lastIndexOf("\n \n"); // TODO: actually make this work + if (separator === -1) separator = file.lastIndexOf("\n\0\n"); + if (separator === -1) separator = file.lastIndexOf("\n \n"); + file = file.slice(0, separator); + } else { + file = file.replace(/\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu, ""); + file = file.replaceAll(/[ \t]*\n/g, "\n"); + } await writeFile(fixedCsvPath, file); } @@ -209,9 +253,10 @@ async function importPrecios(connection, datasetId, dir) { } /** + * @param {DuckDBInstance} instance * @param {string} dir */ -async function importDataset(dir) { +async function importDataset(instance, dir) { console.log(dir); const date = basename(dir).match(/(\d{4}-\d{2}-\d{2})/)?.[1]; const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)?.[1]; @@ -259,27 +304,30 @@ async function importDataset(dir) { } /** + * @param {DuckDBInstance} instance * @param {string} tarPath */ -async function importDatasetTar(tarPath) { +async function importDatasetTar(instance, tarPath) { console.log(`importing tar ${tarPath}`); const dir = await fsp.mkdtemp("/tmp/sepa-precios-importer-"); try { await $`tar -x -C ${dir} -f ${tarPath}`; - await importDump(dir); + await importDump(instance, dir); } finally { await fsp.rm(dir, { recursive: true }); } } /** + * @param {DuckDBInstance} instance * @param {string} dumpDir */ -async function importDump(dumpDir) { +async function importDump(instance, dumpDir) { const files = await fg("**/productos.csv", { cwd: dumpDir }); - for (const file of files) { + const shuffledFiles = [...files].sort(() => Math.random() - 0.5); + for (const file of shuffledFiles) { const dir = join(dumpDir, dirname(file)); - await importDataset(dir); + await importDataset(instance, dir); } } diff --git a/sepa/package.json b/sepa/package.json index 2b7e995..b095299 100644 --- a/sepa/package.json +++ b/sepa/package.json @@ -25,6 +25,7 @@ "fast-csv": "^5.0.2", "fast-glob": "^3.3.2", "jschardet": "^3.1.3", + "mitata": "^1.0.20", "p-queue": "^8.0.1", "papaparse": "^5.4.1", "postgres": "^3.4.4",