moar concurrency

This commit is contained in:
Cat /dev/Nulo 2024-12-01 23:09:44 -03:00
parent f50f9df10b
commit e7bb9b6c29

View file

@ -18,10 +18,10 @@ import { writeFile } from "fs/promises";
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
const instance = await DuckDBInstance.create("importer.db", { const instance = await DuckDBInstance.create("importer.db", {
threads: "1", // threads: "1",
}); });
const queue = new PQueue({ concurrency: 1 }); const queue = new PQueue({ concurrency: 4 });
let hasTars = false; let hasTars = false;
const files = await fg("**/*.tar.zst", { cwd: process.argv[2] }); const files = await fg("**/*.tar.zst", { cwd: process.argv[2] });
@ -219,8 +219,8 @@ async function importDataset(dir) {
const connection = await instance.connect(); const connection = await instance.connect();
try {
await connection.run("begin transaction"); await connection.run("begin transaction");
try {
const res = await connection.run( const res = await connection.run(
`insert into datasets (id, name, date, id_comercio) values (nextval('seq_datasets'), '${basename(dir)}', '${date}', ${id_comercio}) returning id` `insert into datasets (id, name, date, id_comercio) values (nextval('seq_datasets'), '${basename(dir)}', '${date}', ${id_comercio}) returning id`
); );
@ -251,7 +251,7 @@ async function importDataset(dir) {
console.error("errored, aborting transaction", e); console.error("errored, aborting transaction", e);
await connection.run("abort"); await connection.run("abort");
} finally { } finally {
await connection.run("CHECKPOINT"); // await connection.run("CHECKPOINT");
try { try {
Bun.gc(true); Bun.gc(true);
} catch {} } catch {}