centralizar queue

This commit is contained in:
Cat /dev/Nulo 2023-12-19 15:48:19 -03:00
parent 2b1b72b91e
commit 4ba9fab559
5 changed files with 133 additions and 108 deletions

View file

@ -6,11 +6,16 @@ import { zData } from "common/schema.js";
import {
StatusCodeError,
TooManyRedirectsError,
customRequestWithRetries,
customRequest,
} from "./network.js";
import { createWriteStream } from "node:fs";
import pMap from "p-map";
import { sanitizeSuffix, shuffleArray, hasDuplicates } from "./utils.js";
import { WriteStream, createWriteStream } from "node:fs";
import {
sanitizeSuffix,
shuffleArray,
hasDuplicates,
waitUntil,
} from "./utils.js";
import fastq from "fastq";
let urls = process.argv.slice(2);
if (urls.length < 1) {
@ -31,6 +36,17 @@ for (const target of targets)
console.error(`${target.type}+${target.url} FALLÓ CON`, error)
);
let nTotal = 0;
let nFinished = 0;
let nErrors = 0;
const queue = fastq.promise(null, downloadDistWithRetries, 32);
const interval = setInterval(() => {
process.stderr.write(`info: ${nFinished}/${nTotal} done\n`);
}, 10000);
await queue.drained();
clearInterval(interval);
/**
* @param {Target} target
*/
@ -46,8 +62,6 @@ async function downloadFromData(target) {
flags: "w",
});
try {
let nFinished = 0;
let nErrors = 0;
/** @type {DownloadJob[]} */
const jobs = parsed.dataset.flatMap((dataset) =>
dataset.distribution
@ -75,41 +89,18 @@ async function downloadFromData(target) {
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
errorFile,
}))
);
const totalJobs = jobs.length;
nTotal += jobs.length;
// por las dudas verificar que no hayan archivos duplicados
chequearIdsDuplicados(jobs, outputPath);
shuffleArray(jobs);
const promise = pMap(
jobs,
async (job) => {
try {
return await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
},
// en realidad está limitado por el balancedpool
{ concurrency: 32 }
);
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => {
process.stderr.write(
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
);
}, 30000);
await promise;
clearInterval(interval);
if (nErrors > 0)
console.error(`${outputPath}: Finished with ${nErrors} errors`);
for (const job of jobs) queue.push(job);
await queue.drained();
} finally {
errorFile.close();
}
@ -123,7 +114,7 @@ async function getDataJsonForTarget(target) {
if (target.type === "ckan") {
return JSON.stringify(await generateDataJsonFromCkan(target.url));
} else if (target.type === "datajson") {
const jsonRes = await customRequestWithRetries(new URL(target.url));
const jsonRes = await customRequest(new URL(target.url));
return await jsonRes.body.text();
} else throw new Error("?????????????");
}
@ -137,36 +128,6 @@ export function generateOutputPath(jsonUrlString) {
return outputPath;
}
/**
* @argument {DownloadJob} job
*/
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
const res = await customRequestWithRetries(url);
const fileDirPath = join(
outputPath,
sanitizeSuffix(dataset.identifier),
sanitizeSuffix(dist.identifier)
);
await mkdir(fileDirPath, { recursive: true });
const filePath = join(
fileDirPath,
sanitizeSuffix(dist.fileName || dist.identifier)
);
if (!res.body) throw new Error("no body");
await writeFile(filePath, res.body);
}
/** @typedef DownloadJob
* @prop {import("common/schema.js").Dataset} dataset
* @prop {import("common/schema.js").Distribution} dist
* @prop {URL} url
* @prop {string} outputPath
* @prop {number} attempts
* @prop {Date=} waitUntil
*/
/**
* @param {DownloadJob[]} jobs
* @param {string} id
@ -216,3 +177,88 @@ function patchUrl(url) {
}
return url;
}
/** @typedef DownloadJob
* @prop {import("common/schema.js").Dataset} dataset
* @prop {import("common/schema.js").Distribution} dist
* @prop {URL} url
* @prop {string} outputPath
* @prop {number} attempts
* @prop {Date=} waitUntil
* @prop {WriteStream} errorFile
*/
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
/**
* @argument {DownloadJob} job
*/
async function downloadDistWithRetries(job) {
try {
if (job.waitUntil) await waitUntil(job.waitUntil);
await downloadDist(job);
nFinished++;
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && job.url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && job.url.host === "cdn.buenosaires.gob.ar")) &&
job.attempts < 15
) {
if (REPORT_RETRIES)
console.debug(
`reintentando(status)[${job.attempts}] ${job.url.toString()}`
);
queue.push({
...job,
waitUntil: new Date(
Date.now() + 1000 * (job.attempts + 1) ** 2 + Math.random() * 10000
),
attempts: job.attempts + 1,
});
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
job.attempts < 7
) {
if (REPORT_RETRIES)
console.debug(`reintentando[${job.attempts}] ${job.url.toString()}`);
queue.push({
...job,
waitUntil: new Date(
Date.now() + 1000 * (job.attempts + 1) ** 2 + Math.random() * 10000
),
attempts: job.attempts + 1,
});
} else {
job.errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
nFinished++;
}
}
}
/**
* @argument {DownloadJob} job
*/
async function downloadDist({ url, outputPath, dataset, dist }) {
const res = await customRequest(url);
const fileDirPath = join(
outputPath,
sanitizeSuffix(dataset.identifier),
sanitizeSuffix(dist.identifier)
);
await mkdir(fileDirPath, { recursive: true });
const filePath = join(
fileDirPath,
sanitizeSuffix(dist.fileName || dist.identifier)
);
if (!res.body) throw new Error("no body");
await writeFile(filePath, res.body);
}

View file

@ -18,49 +18,6 @@ export class StatusCodeError extends Error {
}
export class TooManyRedirectsError extends Error {}
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
/**
* @argument {URL} url
* @argument {number} attempts
* @returns {Promise<Dispatcher.ResponseData>}
*/
export async function customRequestWithRetries(url, attempts = 0) {
try {
return await customRequest(url);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) &&
attempts < 15
) {
if (REPORT_RETRIES)
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
attempts < 7
) {
if (REPORT_RETRIES)
console.debug(`reintentando[${attempts}] ${url.toString()}`);
await wait(5000 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
} else throw error;
}
}
/** @argument {number} ms */
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* genera los headers para hacer un pedido dependiendo de la url
* @param {URL} url

View file

@ -12,6 +12,7 @@
"license": "ISC",
"dependencies": {
"common": "workspace:",
"fastq": "^1.16.0",
"p-limit": "^5.0.0",
"p-map": "^7.0.0",
"p-throttle": "^6.1.0",

View file

@ -22,3 +22,16 @@ export function shuffleArray(array) {
[array[i], array[j]] = [array[j], array[i]];
}
}
/** @argument {number} ms */
export function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/** @argument {Date} date */
export async function waitUntil(date) {
const relative = +date - Date.now();
console.debug({ relative, date, now: Date.now() });
if (relative <= 0) return;
await wait(relative);
}

View file

@ -17,6 +17,9 @@ importers:
common:
specifier: 'workspace:'
version: link:../common
fastq:
specifier: ^1.16.0
version: 1.16.0
p-limit:
specifier: ^5.0.0
version: 5.0.0
@ -906,6 +909,12 @@ packages:
reusify: 1.0.4
dev: true
/fastq@1.16.0:
resolution: {integrity: sha512-ifCoaXsDrsdkWTtiNJX5uzHDsrck5TzfKKDcuFFTIrrc/BS076qgEIfoIy1VeZqViznfKiysPYTh/QeHtnIsYA==}
dependencies:
reusify: 1.0.4
dev: false
/fflate@0.8.1:
resolution: {integrity: sha512-/exOvEuc+/iaUm105QIiOt4LpBdMTWsXxqR0HDF35vx3fmaKzw7354gTilCh5rkzEt8WYyG//ku3h3nRmd7CHQ==}
dev: false
@ -1442,7 +1451,6 @@ packages:
/reusify@1.0.4:
resolution: {integrity: sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==}
engines: {iojs: '>=1.0.0', node: '>=0.10.0'}
dev: true
/rimraf@2.7.1:
resolution: {integrity: sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w==}