importar tars

This commit is contained in:
Cat /dev/Nulo 2024-09-12 19:39:28 -03:00
parent 7be97e61d7
commit 1cba65a910

View file

@ -4,7 +4,7 @@ import { basename, join, dirname } from "path";
import postgres from "postgres"; import postgres from "postgres";
import { Readable } from "stream"; import { Readable } from "stream";
import { pipeline } from "node:stream/promises"; import { pipeline } from "node:stream/promises";
import { Glob } from "bun"; import { $, Glob } from "bun";
import PQueue from "p-queue"; import PQueue from "p-queue";
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios) // TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
@ -244,15 +244,38 @@ async function importDataset(dir: string) {
} }
} }
const pQueue = new PQueue({ concurrency: 2 }); async function importDatasetTar(tarPath: string) {
console.log(`importing tar ${tarPath}`);
const dir = await fs.mkdtemp("/tmp/sepa-precios-importer-");
try { try {
await $`tar -x -C ${dir} -f ${tarPath}`;
await importDump(dir);
} finally {
await fs.rm(dir, { recursive: true });
}
}
async function importDump(dumpDir: string) {
const pQueue = new PQueue({ concurrency: 2 });
const glob = new Glob("**/productos.csv"); const glob = new Glob("**/productos.csv");
for await (const file of glob.scan(process.argv[2])) { for await (const file of glob.scan(dumpDir)) {
const dir = join(process.argv[2], dirname(file)); const dir = join(dumpDir, dirname(file));
pQueue.add(() => importDataset(dir)); pQueue.add(() => importDataset(dir));
} }
} finally {
await pQueue.onIdle(); await pQueue.onIdle();
}
try {
const tarGlob = new Glob("**/*.tar.zst");
let hasTars = false;
for await (const file of tarGlob.scan(process.argv[2])) {
hasTars = true;
const tar = join(process.argv[2], file);
await importDatasetTar(tar);
}
if (!hasTars) {
await importDump(process.argv[2]);
}
} finally {
await sql.end(); await sql.end();
} }