From 4ba9fab559b8e7e315de0e30fc5591c690778ee5 Mon Sep 17 00:00:00 2001 From: Nulo Date: Tue, 19 Dec 2023 15:48:19 -0300 Subject: [PATCH] centralizar queue --- downloader/download_json.js | 174 +++++++++++++++++++++++------------- downloader/network.js | 43 --------- downloader/package.json | 1 + downloader/utils.js | 13 +++ pnpm-lock.yaml | 10 ++- 5 files changed, 133 insertions(+), 108 deletions(-) diff --git a/downloader/download_json.js b/downloader/download_json.js index ee46c88..6a4ea12 100644 --- a/downloader/download_json.js +++ b/downloader/download_json.js @@ -6,11 +6,16 @@ import { zData } from "common/schema.js"; import { StatusCodeError, TooManyRedirectsError, - customRequestWithRetries, + customRequest, } from "./network.js"; -import { createWriteStream } from "node:fs"; -import pMap from "p-map"; -import { sanitizeSuffix, shuffleArray, hasDuplicates } from "./utils.js"; +import { WriteStream, createWriteStream } from "node:fs"; +import { + sanitizeSuffix, + shuffleArray, + hasDuplicates, + waitUntil, +} from "./utils.js"; +import fastq from "fastq"; let urls = process.argv.slice(2); if (urls.length < 1) { @@ -31,6 +36,17 @@ for (const target of targets) console.error(`${target.type}+${target.url} FALLÓ CON`, error) ); +let nTotal = 0; +let nFinished = 0; +let nErrors = 0; +const queue = fastq.promise(null, downloadDistWithRetries, 32); + +const interval = setInterval(() => { + process.stderr.write(`info: ${nFinished}/${nTotal} done\n`); +}, 10000); +await queue.drained(); +clearInterval(interval); + /** * @param {Target} target */ @@ -46,8 +62,6 @@ async function downloadFromData(target) { flags: "w", }); try { - let nFinished = 0; - let nErrors = 0; /** @type {DownloadJob[]} */ const jobs = parsed.dataset.flatMap((dataset) => dataset.distribution @@ -75,41 +89,18 @@ async function downloadFromData(target) { url: patchUrl(new URL(dist.downloadURL)), outputPath, attempts: 0, + errorFile, })) ); - const totalJobs = jobs.length; + nTotal += jobs.length; // por las dudas verificar que no hayan archivos duplicados chequearIdsDuplicados(jobs, outputPath); shuffleArray(jobs); - const promise = pMap( - jobs, - async (job) => { - try { - return await downloadDistWithRetries(job); - } catch (error) { - errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); - nErrors++; - } finally { - nFinished++; - } - }, - // en realidad está limitado por el balancedpool - { concurrency: 32 } - ); - - process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`); - const interval = setInterval(() => { - process.stderr.write( - `info[${outputPath}]: ${nFinished}/${totalJobs} done\n` - ); - }, 30000); - await promise; - clearInterval(interval); - if (nErrors > 0) - console.error(`${outputPath}: Finished with ${nErrors} errors`); + for (const job of jobs) queue.push(job); + await queue.drained(); } finally { errorFile.close(); } @@ -123,7 +114,7 @@ async function getDataJsonForTarget(target) { if (target.type === "ckan") { return JSON.stringify(await generateDataJsonFromCkan(target.url)); } else if (target.type === "datajson") { - const jsonRes = await customRequestWithRetries(new URL(target.url)); + const jsonRes = await customRequest(new URL(target.url)); return await jsonRes.body.text(); } else throw new Error("?????????????"); } @@ -137,36 +128,6 @@ export function generateOutputPath(jsonUrlString) { return outputPath; } -/** - * @argument {DownloadJob} job - */ -async function downloadDistWithRetries({ dist, dataset, url, outputPath }) { - const res = await customRequestWithRetries(url); - - const fileDirPath = join( - outputPath, - sanitizeSuffix(dataset.identifier), - sanitizeSuffix(dist.identifier) - ); - await mkdir(fileDirPath, { recursive: true }); - const filePath = join( - fileDirPath, - sanitizeSuffix(dist.fileName || dist.identifier) - ); - - if (!res.body) throw new Error("no body"); - await writeFile(filePath, res.body); -} - -/** @typedef DownloadJob - * @prop {import("common/schema.js").Dataset} dataset - * @prop {import("common/schema.js").Distribution} dist - * @prop {URL} url - * @prop {string} outputPath - * @prop {number} attempts - * @prop {Date=} waitUntil - */ - /** * @param {DownloadJob[]} jobs * @param {string} id @@ -216,3 +177,88 @@ function patchUrl(url) { } return url; } + +/** @typedef DownloadJob + * @prop {import("common/schema.js").Dataset} dataset + * @prop {import("common/schema.js").Distribution} dist + * @prop {URL} url + * @prop {string} outputPath + * @prop {number} attempts + * @prop {Date=} waitUntil + * @prop {WriteStream} errorFile + */ + +const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false; + +/** + * @argument {DownloadJob} job + */ +async function downloadDistWithRetries(job) { + try { + if (job.waitUntil) await waitUntil(job.waitUntil); + await downloadDist(job); + nFinished++; + } catch (error) { + // algunos servidores usan 403 como coso para decir "calmate" + // intentar hasta 15 veces con 15 segundos de por medio + if ( + error instanceof StatusCodeError && + ((error.code === 403 && job.url.host === "minsegar-my.sharepoint.com") || + (error.code === 503 && job.url.host === "cdn.buenosaires.gob.ar")) && + job.attempts < 15 + ) { + if (REPORT_RETRIES) + console.debug( + `reintentando(status)[${job.attempts}] ${job.url.toString()}` + ); + queue.push({ + ...job, + waitUntil: new Date( + Date.now() + 1000 * (job.attempts + 1) ** 2 + Math.random() * 10000 + ), + attempts: job.attempts + 1, + }); + } + // si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio + else if ( + !(error instanceof StatusCodeError) && + !(error instanceof TooManyRedirectsError) && + job.attempts < 7 + ) { + if (REPORT_RETRIES) + console.debug(`reintentando[${job.attempts}] ${job.url.toString()}`); + queue.push({ + ...job, + waitUntil: new Date( + Date.now() + 1000 * (job.attempts + 1) ** 2 + Math.random() * 10000 + ), + attempts: job.attempts + 1, + }); + } else { + job.errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); + nErrors++; + nFinished++; + } + } +} + +/** + * @argument {DownloadJob} job + */ +async function downloadDist({ url, outputPath, dataset, dist }) { + const res = await customRequest(url); + + const fileDirPath = join( + outputPath, + sanitizeSuffix(dataset.identifier), + sanitizeSuffix(dist.identifier) + ); + await mkdir(fileDirPath, { recursive: true }); + const filePath = join( + fileDirPath, + sanitizeSuffix(dist.fileName || dist.identifier) + ); + + if (!res.body) throw new Error("no body"); + await writeFile(filePath, res.body); +} diff --git a/downloader/network.js b/downloader/network.js index 7898f45..ba6bb29 100644 --- a/downloader/network.js +++ b/downloader/network.js @@ -18,49 +18,6 @@ export class StatusCodeError extends Error { } export class TooManyRedirectsError extends Error {} -const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false; - -/** - * @argument {URL} url - * @argument {number} attempts - * @returns {Promise} - */ -export async function customRequestWithRetries(url, attempts = 0) { - try { - return await customRequest(url); - } catch (error) { - // algunos servidores usan 403 como coso para decir "calmate" - // intentar hasta 15 veces con 15 segundos de por medio - if ( - error instanceof StatusCodeError && - ((error.code === 403 && url.host === "minsegar-my.sharepoint.com") || - (error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) && - attempts < 15 - ) { - if (REPORT_RETRIES) - console.debug(`reintentando(status)[${attempts}] ${url.toString()}`); - await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000); - return await customRequestWithRetries(url, attempts + 1); - } - // si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio - else if ( - !(error instanceof StatusCodeError) && - !(error instanceof TooManyRedirectsError) && - attempts < 7 - ) { - if (REPORT_RETRIES) - console.debug(`reintentando[${attempts}] ${url.toString()}`); - await wait(5000 + Math.random() * 10000); - return await customRequestWithRetries(url, attempts + 1); - } else throw error; - } -} - -/** @argument {number} ms */ -function wait(ms) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - /** * genera los headers para hacer un pedido dependiendo de la url * @param {URL} url diff --git a/downloader/package.json b/downloader/package.json index 7ea2278..b71c967 100644 --- a/downloader/package.json +++ b/downloader/package.json @@ -12,6 +12,7 @@ "license": "ISC", "dependencies": { "common": "workspace:", + "fastq": "^1.16.0", "p-limit": "^5.0.0", "p-map": "^7.0.0", "p-throttle": "^6.1.0", diff --git a/downloader/utils.js b/downloader/utils.js index 77e8790..fe54c10 100644 --- a/downloader/utils.js +++ b/downloader/utils.js @@ -22,3 +22,16 @@ export function shuffleArray(array) { [array[i], array[j]] = [array[j], array[i]]; } } + +/** @argument {number} ms */ +export function wait(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** @argument {Date} date */ +export async function waitUntil(date) { + const relative = +date - Date.now(); + console.debug({ relative, date, now: Date.now() }); + if (relative <= 0) return; + await wait(relative); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0e4b54d..f960b49 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -17,6 +17,9 @@ importers: common: specifier: 'workspace:' version: link:../common + fastq: + specifier: ^1.16.0 + version: 1.16.0 p-limit: specifier: ^5.0.0 version: 5.0.0 @@ -906,6 +909,12 @@ packages: reusify: 1.0.4 dev: true + /fastq@1.16.0: + resolution: {integrity: sha512-ifCoaXsDrsdkWTtiNJX5uzHDsrck5TzfKKDcuFFTIrrc/BS076qgEIfoIy1VeZqViznfKiysPYTh/QeHtnIsYA==} + dependencies: + reusify: 1.0.4 + dev: false + /fflate@0.8.1: resolution: {integrity: sha512-/exOvEuc+/iaUm105QIiOt4LpBdMTWsXxqR0HDF35vx3fmaKzw7354gTilCh5rkzEt8WYyG//ku3h3nRmd7CHQ==} dev: false @@ -1442,7 +1451,6 @@ packages: /reusify@1.0.4: resolution: {integrity: sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==} engines: {iojs: '>=1.0.0', node: '>=0.10.0'} - dev: true /rimraf@2.7.1: resolution: {integrity: sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w==}