diff --git a/downloader/ckan_to_datajson.js b/downloader/ckan_to_datajson.js index 900ef32..b00bbf5 100644 --- a/downloader/ckan_to_datajson.js +++ b/downloader/ckan_to_datajson.js @@ -1,6 +1,7 @@ import z from "zod"; +import pMap from "p-map"; import { basename } from "path"; -import { customRequestWithLimitsAndRetries } from "./network.js"; +import { customRequest } from "./network.js"; const zCkanPackageList = z.object({ success: z.literal(true), @@ -11,7 +12,7 @@ const zCkanPackageList = z.object({ * @param {string} url */ async function getJson(url) { - const res = await customRequestWithLimitsAndRetries(new URL(url)); + const res = await customRequest(new URL(url)); const json = await res.body.json(); return json; } @@ -114,9 +115,9 @@ async function getCkanInfo(ckanUrl) { export async function generateDataJsonFromCkan(ckanUrl) { const list = await getCkanPackageList(ckanUrl); const info = await getCkanInfo(ckanUrl); - const packages = await Promise.all( - list.map((n) => getCkanPackage(ckanUrl, n)) - ); + const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), { + concurrency: 12, + }); /** @type {import("common/schema.js").Data & { generatedBy: string }} */ const data = { generatedBy: diff --git a/downloader/download_json.js b/downloader/download_json.js index 3d7de24..9aeebdf 100644 --- a/downloader/download_json.js +++ b/downloader/download_json.js @@ -6,9 +6,10 @@ import { zData } from "common/schema.js"; import { StatusCodeError, TooManyRedirectsError, - customRequestWithLimitsAndRetries, + customRequestWithRetries, } from "./network.js"; import { createWriteStream } from "node:fs"; +import pMap from "p-map"; let urls = process.argv.slice(2); if (urls.length < 1) { @@ -82,16 +83,21 @@ async function downloadFromData(target) { shuffleArray(jobs); - const promises = jobs.map(async (job) => { - try { - return await downloadDistWithRetries(job); - } catch (error) { - errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); - nErrors++; - } finally { - nFinished++; - } - }); + const promise = pMap( + jobs, + async (job) => { + try { + return await downloadDistWithRetries(job); + } catch (error) { + errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); + nErrors++; + } finally { + nFinished++; + } + }, + // en realidad está limitado por el balancedpool + { concurrency: 32 } + ); process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`); const interval = setInterval(() => { @@ -99,7 +105,7 @@ async function downloadFromData(target) { `info[${outputPath}]: ${nFinished}/${totalJobs} done\n` ); }, 30000); - await Promise.all(promises); + await promise; clearInterval(interval); if (nErrors > 0) console.error(`${outputPath}: Finished with ${nErrors} errors`); @@ -116,9 +122,7 @@ async function getDataJsonForTarget(target) { if (target.type === "ckan") { return JSON.stringify(await generateDataJsonFromCkan(target.url)); } else if (target.type === "datajson") { - const jsonRes = await customRequestWithLimitsAndRetries( - new URL(target.url) - ); + const jsonRes = await customRequestWithRetries(new URL(target.url)); return await jsonRes.body.text(); } else throw new Error("?????????????"); } @@ -136,7 +140,7 @@ export function generateOutputPath(jsonUrlString) { * @argument {DownloadJob} job */ async function downloadDistWithRetries({ dist, dataset, url, outputPath }) { - const res = await customRequestWithLimitsAndRetries(url); + const res = await customRequestWithRetries(url); const fileDirPath = join( outputPath, diff --git a/downloader/network.js b/downloader/network.js index d75b889..7898f45 100644 --- a/downloader/network.js +++ b/downloader/network.js @@ -1,7 +1,5 @@ import { Dispatcher, request, Agent } from "undici"; -import pLimit from "p-limit"; import { userAgent } from "./config.js"; -import pThrottle from "p-throttle"; const dispatcher = new Agent({ connect: { timeout: 60 * 1000 }, @@ -20,14 +18,6 @@ export class StatusCodeError extends Error { } export class TooManyRedirectsError extends Error {} -/** key es host - * @type {Map( - fn: (arguments_: Argument) => PromiseLike) => Promise>} */ -const limiters = new Map(); -const nConnections = process.env.N_THREADS - ? parseInt(process.env.N_THREADS) - : 8; - const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false; /** @@ -35,9 +25,9 @@ const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false; * @argument {number} attempts * @returns {Promise} */ -export async function customRequestWithLimitsAndRetries(url, attempts = 0) { +export async function customRequestWithRetries(url, attempts = 0) { try { - return await _customRequestWithLimits(url); + return await customRequest(url); } catch (error) { // algunos servidores usan 403 como coso para decir "calmate" // intentar hasta 15 veces con 15 segundos de por medio @@ -50,7 +40,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) { if (REPORT_RETRIES) console.debug(`reintentando(status)[${attempts}] ${url.toString()}`); await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000); - return await customRequestWithLimitsAndRetries(url, attempts + 1); + return await customRequestWithRetries(url, attempts + 1); } // si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio else if ( @@ -61,7 +51,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) { if (REPORT_RETRIES) console.debug(`reintentando[${attempts}] ${url.toString()}`); await wait(5000 + Math.random() * 10000); - return await customRequestWithLimitsAndRetries(url, attempts + 1); + return await customRequestWithRetries(url, attempts + 1); } else throw error; } } @@ -71,25 +61,6 @@ function wait(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } -/** - * @param {URL} url - * @returns {Promise} - */ -function _customRequestWithLimits(url) { - let limit = limiters.get(url.host); - if (!limit) { - if (url.host === "cdn.buenosaires.gob.ar") { - // tenemos que throttlear en este host porque tiene un rate limit. - // de todas maneras descarga rápido - limit = pThrottle({ limit: 3, interval: 1000 })((x) => x()); - } else { - limit = pLimit(nConnections); - } - limiters.set(url.host, limit); - } - return limit(() => _customRequest(url)); -} - /** * genera los headers para hacer un pedido dependiendo de la url * @param {URL} url @@ -108,7 +79,7 @@ function getHeaders(url) { /** * @param {URL} url */ -async function _customRequest(url) { +export async function customRequest(url) { const res = await request(url.toString(), { headers: getHeaders(url), dispatcher, diff --git a/downloader/package.json b/downloader/package.json index 5fbb023..7ea2278 100644 --- a/downloader/package.json +++ b/downloader/package.json @@ -13,6 +13,7 @@ "dependencies": { "common": "workspace:", "p-limit": "^5.0.0", + "p-map": "^7.0.0", "p-throttle": "^6.1.0", "undici": "^6.0.1", "zod": "^3.22.4" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d609fcc..0e4b54d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: p-limit: specifier: ^5.0.0 version: 5.0.0 + p-map: + specifier: ^7.0.0 + version: 7.0.0 p-throttle: specifier: ^6.1.0 version: 6.1.0 @@ -1206,6 +1209,11 @@ packages: yocto-queue: 1.0.0 dev: false + /p-map@7.0.0: + resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==} + engines: {node: '>=18'} + dev: false + /p-throttle@6.1.0: resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==} engines: {node: '>=18'}