cambios varios

This commit is contained in:
Cat /dev/Nulo 2023-11-27 22:43:58 -03:00
parent e1e851f797
commit 94b909abfa
3 changed files with 117 additions and 53 deletions

View file

@ -8,7 +8,7 @@ import { pipeline } from "node:stream/promises";
// www.enargas.gov.ar, transparencia.enargas.gov.ar, www.energia.gob.ar, www.economia.gob.ar, datos.yvera.gob.ar // www.enargas.gov.ar, transparencia.enargas.gov.ar, www.energia.gob.ar, www.economia.gob.ar, datos.yvera.gob.ar
const dispatcher = new Agent({ const dispatcher = new Agent({
pipelining: 10, pipelining: 50,
maxRedirections: 20, maxRedirections: 20,
}); });
@ -28,6 +28,7 @@ if (!outputPath) {
process.exit(1); process.exit(1);
} }
await mkdir(outputPath, { recursive: true }); await mkdir(outputPath, { recursive: true });
const errorFile = await open(join(outputPath, "errors.jsonl"), "w");
// Leer JSON de stdin // Leer JSON de stdin
const json = await process.stdin.toArray(); const json = await process.stdin.toArray();
@ -44,72 +45,93 @@ const jobs = parsed.dataset.flatMap((dataset) =>
); );
const totalJobs = jobs.length; const totalJobs = jobs.length;
let nFinished = 0; let nFinished = 0;
let nErrors = 0;
// por las dudas verificar que no hayan archivos duplicados // por las dudas verificar que no hayan archivos duplicados
const duplicated = hasDuplicates( chequearIdsDuplicados();
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`)
); /** @type {Map< string, DownloadJob[] >} */
if (duplicated) { let jobsPerHost = new Map();
console.error( for (const job of jobs) {
"ADVERTENCIA: ¡encontré duplicados! es posible que se pisen archivos entre si" jobsPerHost.set(job.url.host, [
); ...(jobsPerHost.get(job.url.host) || []),
job,
]);
} }
const greens = Array(128) const greens = [...jobsPerHost.entries()].flatMap(([host, jobs]) => {
const nThreads = 128;
return Array(nThreads)
.fill(0) .fill(0)
.map(() => .map(() =>
(async () => { (async () => {
let job; let job;
while ((job = jobs.pop())) { while ((job = jobs.pop())) {
const { dataset, dist } = job;
request: do {
try { try {
await downloadDist(dataset, dist); await downloadDistWithRetries(job);
} catch (error) { } catch (error) {
if (error instanceof StatusCodeError) { await errorFile.write(
// algunos servidores usan 403 como coso para decir "calmate" JSON.stringify({ url: job.url.toString(), ...encodeError(error) })
if (
error.code === 403 &&
dist.downloadURL.includes("minsegar-my.sharepoint.com")
) {
console.debug(
`debug: reintentando ${dist.downloadURL} porque tiró 403`
); );
await wait(15000); nErrors++;
continue request;
}
error = error.toString();
}
console.error(
`error: Failed to download URL ${dist.downloadURL} (${dataset.identifier}/${dist.identifier}):`,
error
);
if (!(error instanceof StatusCodeError)) continue request;
} finally { } finally {
nFinished++; nFinished++;
} }
} while (0);
} }
})() })()
); );
});
process.stderr.write(`greens: ${greens.length}\n`);
const interval = setInterval(() => { const interval = setInterval(() => {
console.info(`info: ${nFinished}/${totalJobs} done`); process.stderr.write(`info: ${nFinished}/${totalJobs} done\n`);
}, 15000); }, 30000);
await Promise.all(greens); await Promise.all(greens);
clearInterval(interval); clearInterval(interval);
if (nErrors > 0) console.error(`Finished with ${nErrors} errors`);
/** /**
* @argument {Dataset} dataset * @argument {DownloadJob} job
* @argument {Distribution} dist * @argument {number} tries
*/ */
async function downloadDist(dataset, dist) { async function downloadDistWithRetries(job, tries = 0) {
const { url } = job;
try {
await downloadDist(job);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
error.code === 403 &&
url.host === "minsegar-my.sharepoint.com" &&
tries < 15
) {
await wait(15000);
return await downloadDistWithRetries(job, tries + 1);
}
// si no fue un error de http, reintentar hasta 5 veces con 5 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!errorIsInfiniteRedirect(error) &&
tries < 5
) {
await wait(5000);
return await downloadDistWithRetries(job, tries + 1);
} else throw error;
}
}
/**
* @argument {DownloadJob} job
*/
async function downloadDist({ dist, dataset }) {
const url = new URL(dist.downloadURL); const url = new URL(dist.downloadURL);
const res = await fetch(url.toString(), { const res = await fetch(url.toString(), {
dispatcher, dispatcher,
}); });
if (res.status >= 400) { if (!res.ok) {
throw new StatusCodeError(res.status); throw new StatusCodeError(res.status);
} }
@ -129,11 +151,16 @@ async function downloadDist(dataset, dist) {
await pipeline(res.body, outputFile.createWriteStream()); await pipeline(res.body, outputFile.createWriteStream());
} }
/** @typedef {object} Dataset /** @typedef DownloadJob
* @prop {Dataset} dataset
* @prop {Distribution} dist
* @prop {URL} url
*/
/** @typedef Dataset
* @prop {string} identifier * @prop {string} identifier
* @prop {Distribution[]} distribution * @prop {Distribution[]} distribution
*/ */
/** @typedef {object} Distribution /** @typedef Distribution
* @prop {string} identifier * @prop {string} identifier
* @prop {string} fileName * @prop {string} fileName
* @prop {string} downloadURL * @prop {string} downloadURL
@ -147,18 +174,37 @@ function sanitizeSuffix(path) {
return normalize(path).replace(/^(\.\.(\/|\\|$))+/, ""); return normalize(path).replace(/^(\.\.(\/|\\|$))+/, "");
} }
function chequearIdsDuplicados() {
const duplicated = hasDuplicates(
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`)
);
if (duplicated) {
console.error(
"ADVERTENCIA: ¡encontré duplicados! es posible que se pisen archivos entre si"
);
}
}
// https://stackoverflow.com/a/7376645 // https://stackoverflow.com/a/7376645
/** /** @argument {any[]} array */
* @argument {any[]} array
*/
function hasDuplicates(array) { function hasDuplicates(array) {
return new Set(array).size !== array.length; return new Set(array).size !== array.length;
} }
/** /** @argument {number} ms */
* @argument {number} ms
*/
function wait(ms) { function wait(ms) {
if (ms < 0) return Promise.resolve(); if (ms < 0) return Promise.resolve();
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }
function encodeError(error) {
if (error instanceof StatusCodeError)
return { kind: "http_error", status_code: error.code };
else if (errorIsInfiniteRedirect(error)) return { kind: "infinite_redirect" };
else {
console.error(error, error.cause.message);
return { kind: "generic_error", error };
}
}
function errorIsInfiniteRedirect(error) {
return error?.cause?.message === "redirect count exceeded";
}

View file

@ -13,5 +13,8 @@
"dependencies": { "dependencies": {
"node_extra_ca_certs_mozilla_bundle": "^1.0.5", "node_extra_ca_certs_mozilla_bundle": "^1.0.5",
"undici": "^5.28.0" "undici": "^5.28.0"
},
"devDependencies": {
"@types/node": "^20.10.0"
} }
} }

View file

@ -12,6 +12,11 @@ dependencies:
specifier: ^5.28.0 specifier: ^5.28.0
version: 5.28.0 version: 5.28.0
devDependencies:
'@types/node':
specifier: ^20.10.0
version: 20.10.0
packages: packages:
/@fastify/busboy@2.1.0: /@fastify/busboy@2.1.0:
@ -19,6 +24,12 @@ packages:
engines: {node: '>=14'} engines: {node: '>=14'}
dev: false dev: false
/@types/node@20.10.0:
resolution: {integrity: sha512-D0WfRmU9TQ8I9PFx9Yc+EBHw+vSpIub4IDvQivcp26PtPrdMGAq5SDcpXEo/epqa/DXotVpekHiLNTg3iaKXBQ==}
dependencies:
undici-types: 5.26.5
dev: true
/asynckit@0.4.0: /asynckit@0.4.0:
resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==}
dev: false dev: false
@ -153,6 +164,10 @@ packages:
is-utf8: 0.2.1 is-utf8: 0.2.1
dev: false dev: false
/undici-types@5.26.5:
resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==}
dev: true
/undici@5.28.0: /undici@5.28.0:
resolution: {integrity: sha512-gM12DkXhlAc5+/TPe60iy9P6ETgVfqTuRJ6aQ4w8RYu0MqKuXhaq3/b86GfzDQnNA3NUO6aUNdvevrKH59D0Nw==} resolution: {integrity: sha512-gM12DkXhlAc5+/TPe60iy9P6ETgVfqTuRJ6aQ4w8RYu0MqKuXhaq3/b86GfzDQnNA3NUO6aUNdvevrKH59D0Nw==}
engines: {node: '>=14.0'} engines: {node: '>=14.0'}