faster harder stronger

This commit is contained in:
Nulo 2024-12-05 15:50:56 +00:00
parent e7bb9b6c29
commit e6c5f25108
4 changed files with 102 additions and 23 deletions

Binary file not shown.

30
sepa/importer-bench.js Normal file
View file

@ -0,0 +1,30 @@
// @ts-check
import { run, bench, boxplot } from "mitata";
import { execFile } from "node:child_process";
import { promisify } from "node:util";
import { main } from "./importer.js";
import { rm } from "node:fs/promises";
import { readFileSync } from "node:fs";
const execFileAsync = promisify(execFile);
async function fibonacci() {
await main("samples");
}
bench("main", async function* () {
await rm("importer.db", { force: true });
await rm("importer.db.wal", { force: true });
await execFileAsync("duckdb", [
"importer.db",
readFileSync("duckdb.sql", "utf8"),
]);
yield () => fibonacci();
});
// await run();
await rm("importer.db", { force: true });
await rm("importer.db.wal", { force: true });
await execFileAsync("duckdb", [
"importer.db",
readFileSync("duckdb.sql", "utf8"),
]);
await fibonacci();

View file

@ -3,7 +3,7 @@
import * as fsp from "fs/promises"; import * as fsp from "fs/promises";
import * as fs from "fs"; import * as fs from "fs";
import { CsvParserStream, parse, parseString } from "fast-csv"; import { CsvParserStream, parse, parseString } from "fast-csv";
import { basename, join, dirname } from "path"; import { basename, join, dirname, resolve } from "path";
import { $ } from "zx"; import { $ } from "zx";
import PQueue from "p-queue"; import PQueue from "p-queue";
import { Database } from "duckdb-async"; import { Database } from "duckdb-async";
@ -15,25 +15,53 @@ import {
} from "@duckdb/node-api"; } from "@duckdb/node-api";
import Papa from "papaparse"; import Papa from "papaparse";
import { writeFile } from "fs/promises"; import { writeFile } from "fs/promises";
import { fileURLToPath } from "url";
const pathToThisFile = resolve(fileURLToPath(import.meta.url));
const pathPassedToNode = resolve(process.argv[1]);
const isThisFileBeingRunViaCLI = pathToThisFile.includes(pathPassedToNode);
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
const instance = await DuckDBInstance.create("importer.db", {
// threads: "1",
});
const queue = new PQueue({ concurrency: 4 }); const console = {
// @ts-ignore
log: (..._args) => {},
// @ts-ignore
error: (..._args) => globalThis.console.error(..._args),
// @ts-ignore
debug: (..._args) => {},
};
let hasTars = false; if (isThisFileBeingRunViaCLI) {
const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); await main(process.argv[2]);
for (const file of files) {
hasTars = true;
const tar = join(process.argv[2], file);
queue.add(() => importDatasetTar(tar));
} }
await queue.onIdle();
if (!hasTars) { /**
await importDump(process.argv[2]); * @param {string} cwd
*/
export async function main(cwd) {
const instance = await DuckDBInstance.create("importer.db", {
// threads: "6",
});
if (cwd.endsWith(".tar.zst")) {
await importDatasetTar(instance, cwd);
return;
}
const queue = new PQueue({ concurrency: 4 });
let hasTars = false;
const files = await fg("**/*.tar.zst", { cwd });
console.log(`found ${files.length} tars`);
for (const file of files) {
hasTars = true;
const tar = join(cwd, file);
queue.add(() => importDatasetTar(instance, tar));
}
await queue.onIdle();
if (!hasTars) {
await importDump(instance, cwd);
}
} }
/** /**
@ -194,9 +222,25 @@ async function importPrecios(connection, datasetId, dir) {
await writeFile(fixedCsvPath, file); await writeFile(fixedCsvPath, file);
// TODO: remove ultima actualizacion // TODO: remove ultima actualizacion
} else { } else {
let file = await readFile(sourceCsvPath); // let file = await readFile(sourceCsvPath);
let file = await fsp.readFile(sourceCsvPath, "utf8");
if (
["30590360763", "30687310434", "30685849751", "30525705931"].includes(
comercioCuit
)
) {
let separator = file.lastIndexOf("\n\n");
if (separator === -1) separator = file.lastIndexOf("\r\n\r\n");
if (separator === -1) separator = file.lastIndexOf("\n\r\n");
if (separator === -1) separator = file.lastIndexOf("\n \n"); // TODO: actually make this work
if (separator === -1) separator = file.lastIndexOf("\n\0\n");
if (separator === -1) separator = file.lastIndexOf("\n \n");
file = file.slice(0, separator);
} else {
file = file.replace(/\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu, ""); file = file.replace(/\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu, "");
file = file.replaceAll(/[ \t]*\n/g, "\n"); file = file.replaceAll(/[ \t]*\n/g, "\n");
}
await writeFile(fixedCsvPath, file); await writeFile(fixedCsvPath, file);
} }
@ -209,9 +253,10 @@ async function importPrecios(connection, datasetId, dir) {
} }
/** /**
* @param {DuckDBInstance} instance
* @param {string} dir * @param {string} dir
*/ */
async function importDataset(dir) { async function importDataset(instance, dir) {
console.log(dir); console.log(dir);
const date = basename(dir).match(/(\d{4}-\d{2}-\d{2})/)?.[1]; const date = basename(dir).match(/(\d{4}-\d{2}-\d{2})/)?.[1];
const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)?.[1]; const id_comercio = basename(dir).match(/comercio-sepa-(\d+)/)?.[1];
@ -259,27 +304,30 @@ async function importDataset(dir) {
} }
/** /**
* @param {DuckDBInstance} instance
* @param {string} tarPath * @param {string} tarPath
*/ */
async function importDatasetTar(tarPath) { async function importDatasetTar(instance, tarPath) {
console.log(`importing tar ${tarPath}`); console.log(`importing tar ${tarPath}`);
const dir = await fsp.mkdtemp("/tmp/sepa-precios-importer-"); const dir = await fsp.mkdtemp("/tmp/sepa-precios-importer-");
try { try {
await $`tar -x -C ${dir} -f ${tarPath}`; await $`tar -x -C ${dir} -f ${tarPath}`;
await importDump(dir); await importDump(instance, dir);
} finally { } finally {
await fsp.rm(dir, { recursive: true }); await fsp.rm(dir, { recursive: true });
} }
} }
/** /**
* @param {DuckDBInstance} instance
* @param {string} dumpDir * @param {string} dumpDir
*/ */
async function importDump(dumpDir) { async function importDump(instance, dumpDir) {
const files = await fg("**/productos.csv", { cwd: dumpDir }); const files = await fg("**/productos.csv", { cwd: dumpDir });
for (const file of files) { const shuffledFiles = [...files].sort(() => Math.random() - 0.5);
for (const file of shuffledFiles) {
const dir = join(dumpDir, dirname(file)); const dir = join(dumpDir, dirname(file));
await importDataset(dir); await importDataset(instance, dir);
} }
} }

View file

@ -25,6 +25,7 @@
"fast-csv": "^5.0.2", "fast-csv": "^5.0.2",
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"jschardet": "^3.1.3", "jschardet": "^3.1.3",
"mitata": "^1.0.20",
"p-queue": "^8.0.1", "p-queue": "^8.0.1",
"papaparse": "^5.4.1", "papaparse": "^5.4.1",
"postgres": "^3.4.4", "postgres": "^3.4.4",