diff --git a/download_json.js b/download_json.js index d914866..492734e 100644 --- a/download_json.js +++ b/download_json.js @@ -8,7 +8,7 @@ import { pipeline } from "node:stream/promises"; // www.enargas.gov.ar, transparencia.enargas.gov.ar, www.energia.gob.ar, www.economia.gob.ar, datos.yvera.gob.ar const dispatcher = new Agent({ - pipelining: 10, + pipelining: 50, maxRedirections: 20, }); @@ -28,6 +28,7 @@ if (!outputPath) { process.exit(1); } await mkdir(outputPath, { recursive: true }); +const errorFile = await open(join(outputPath, "errors.jsonl"), "w"); // Leer JSON de stdin const json = await process.stdin.toArray(); @@ -44,72 +45,93 @@ const jobs = parsed.dataset.flatMap((dataset) => ); const totalJobs = jobs.length; let nFinished = 0; +let nErrors = 0; // por las dudas verificar que no hayan archivos duplicados -const duplicated = hasDuplicates( - jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`) -); -if (duplicated) { - console.error( - "ADVERTENCIA: ¡encontré duplicados! es posible que se pisen archivos entre si" - ); +chequearIdsDuplicados(); + +/** @type {Map< string, DownloadJob[] >} */ +let jobsPerHost = new Map(); +for (const job of jobs) { + jobsPerHost.set(job.url.host, [ + ...(jobsPerHost.get(job.url.host) || []), + job, + ]); } -const greens = Array(128) - .fill(0) - .map(() => - (async () => { - let job; - while ((job = jobs.pop())) { - const { dataset, dist } = job; - request: do { +const greens = [...jobsPerHost.entries()].flatMap(([host, jobs]) => { + const nThreads = 128; + return Array(nThreads) + .fill(0) + .map(() => + (async () => { + let job; + while ((job = jobs.pop())) { try { - await downloadDist(dataset, dist); + await downloadDistWithRetries(job); } catch (error) { - if (error instanceof StatusCodeError) { - // algunos servidores usan 403 como coso para decir "calmate" - if ( - error.code === 403 && - dist.downloadURL.includes("minsegar-my.sharepoint.com") - ) { - console.debug( - `debug: reintentando ${dist.downloadURL} porque tiró 403` - ); - await wait(15000); - continue request; - } - error = error.toString(); - } - console.error( - `error: Failed to download URL ${dist.downloadURL} (${dataset.identifier}/${dist.identifier}):`, - error + await errorFile.write( + JSON.stringify({ url: job.url.toString(), ...encodeError(error) }) ); - if (!(error instanceof StatusCodeError)) continue request; + nErrors++; } finally { nFinished++; } - } while (0); - } - })() - ); + } + })() + ); +}); +process.stderr.write(`greens: ${greens.length}\n`); const interval = setInterval(() => { - console.info(`info: ${nFinished}/${totalJobs} done`); -}, 15000); + process.stderr.write(`info: ${nFinished}/${totalJobs} done\n`); +}, 30000); await Promise.all(greens); clearInterval(interval); +if (nErrors > 0) console.error(`Finished with ${nErrors} errors`); /** - * @argument {Dataset} dataset - * @argument {Distribution} dist + * @argument {DownloadJob} job + * @argument {number} tries */ -async function downloadDist(dataset, dist) { +async function downloadDistWithRetries(job, tries = 0) { + const { url } = job; + try { + await downloadDist(job); + } catch (error) { + // algunos servidores usan 403 como coso para decir "calmate" + // intentar hasta 15 veces con 15 segundos de por medio + if ( + error instanceof StatusCodeError && + error.code === 403 && + url.host === "minsegar-my.sharepoint.com" && + tries < 15 + ) { + await wait(15000); + return await downloadDistWithRetries(job, tries + 1); + } + // si no fue un error de http, reintentar hasta 5 veces con 5 segundos de por medio + else if ( + !(error instanceof StatusCodeError) && + !errorIsInfiniteRedirect(error) && + tries < 5 + ) { + await wait(5000); + return await downloadDistWithRetries(job, tries + 1); + } else throw error; + } +} + +/** + * @argument {DownloadJob} job + */ +async function downloadDist({ dist, dataset }) { const url = new URL(dist.downloadURL); const res = await fetch(url.toString(), { dispatcher, }); - if (res.status >= 400) { + if (!res.ok) { throw new StatusCodeError(res.status); } @@ -129,11 +151,16 @@ async function downloadDist(dataset, dist) { await pipeline(res.body, outputFile.createWriteStream()); } -/** @typedef {object} Dataset +/** @typedef DownloadJob + * @prop {Dataset} dataset + * @prop {Distribution} dist + * @prop {URL} url + */ +/** @typedef Dataset * @prop {string} identifier * @prop {Distribution[]} distribution */ -/** @typedef {object} Distribution +/** @typedef Distribution * @prop {string} identifier * @prop {string} fileName * @prop {string} downloadURL @@ -147,18 +174,37 @@ function sanitizeSuffix(path) { return normalize(path).replace(/^(\.\.(\/|\\|$))+/, ""); } +function chequearIdsDuplicados() { + const duplicated = hasDuplicates( + jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`) + ); + if (duplicated) { + console.error( + "ADVERTENCIA: ¡encontré duplicados! es posible que se pisen archivos entre si" + ); + } +} // https://stackoverflow.com/a/7376645 -/** - * @argument {any[]} array - */ +/** @argument {any[]} array */ function hasDuplicates(array) { return new Set(array).size !== array.length; } -/** - * @argument {number} ms - */ +/** @argument {number} ms */ function wait(ms) { if (ms < 0) return Promise.resolve(); return new Promise((resolve) => setTimeout(resolve, ms)); } + +function encodeError(error) { + if (error instanceof StatusCodeError) + return { kind: "http_error", status_code: error.code }; + else if (errorIsInfiniteRedirect(error)) return { kind: "infinite_redirect" }; + else { + console.error(error, error.cause.message); + return { kind: "generic_error", error }; + } +} +function errorIsInfiniteRedirect(error) { + return error?.cause?.message === "redirect count exceeded"; +} diff --git a/package.json b/package.json index 0c42442..f75ee4a 100644 --- a/package.json +++ b/package.json @@ -13,5 +13,8 @@ "dependencies": { "node_extra_ca_certs_mozilla_bundle": "^1.0.5", "undici": "^5.28.0" + }, + "devDependencies": { + "@types/node": "^20.10.0" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5543c10..c697c46 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,6 +12,11 @@ dependencies: specifier: ^5.28.0 version: 5.28.0 +devDependencies: + '@types/node': + specifier: ^20.10.0 + version: 20.10.0 + packages: /@fastify/busboy@2.1.0: @@ -19,6 +24,12 @@ packages: engines: {node: '>=14'} dev: false + /@types/node@20.10.0: + resolution: {integrity: sha512-D0WfRmU9TQ8I9PFx9Yc+EBHw+vSpIub4IDvQivcp26PtPrdMGAq5SDcpXEo/epqa/DXotVpekHiLNTg3iaKXBQ==} + dependencies: + undici-types: 5.26.5 + dev: true + /asynckit@0.4.0: resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} dev: false @@ -153,6 +164,10 @@ packages: is-utf8: 0.2.1 dev: false + /undici-types@5.26.5: + resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} + dev: true + /undici@5.28.0: resolution: {integrity: sha512-gM12DkXhlAc5+/TPe60iy9P6ETgVfqTuRJ6aQ4w8RYu0MqKuXhaq3/b86GfzDQnNA3NUO6aUNdvevrKH59D0Nw==} engines: {node: '>=14.0'}