concurrency

This commit is contained in:
Cat /dev/Nulo 2024-09-07 12:37:11 -03:00
parent 41e7c54ab3
commit 076abab943
2 changed files with 7 additions and 2 deletions

Binary file not shown.

View file

@ -1,11 +1,13 @@
import * as fs from "fs/promises"; import * as fs from "fs/promises";
import { createWriteStream } from "fs";
import Papa from "papaparse"; import Papa from "papaparse";
import { basename, join, dirname } from "path"; import { basename, join, dirname } from "path";
import postgres from "postgres"; import postgres from "postgres";
import { Readable } from "stream"; import { Readable } from "stream";
import { pipeline } from "node:stream/promises"; import { pipeline } from "node:stream/promises";
import { Glob } from "bun"; import { Glob } from "bun";
import PQueue from "p-queue";
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
const sql = postgres({ const sql = postgres({
database: "sepa-precios", database: "sepa-precios",
@ -244,13 +246,16 @@ async function importDataset(dir: string) {
} }
} }
const pQueue = new PQueue({ concurrency: 4 });
try { try {
const glob = new Glob("**/productos.csv"); const glob = new Glob("**/productos.csv");
for await (const file of glob.scan(process.argv[2])) { for await (const file of glob.scan(process.argv[2])) {
const dir = join(process.argv[2], dirname(file)); const dir = join(process.argv[2], dirname(file));
console.log(dir); console.log(dir);
await importDataset(dir); pQueue.add(() => importDataset(dir));
} }
} finally { } finally {
await pQueue.onIdle();
await sql.end(); await sql.end();
} }