solo usar limites locales

This commit is contained in:
Cat /dev/Nulo 2023-12-19 12:01:37 -03:00
parent 5406113dbb
commit 6c3776ffd2
5 changed files with 40 additions and 55 deletions

View file

@ -1,6 +1,7 @@
import z from "zod"; import z from "zod";
import pMap from "p-map";
import { basename } from "path"; import { basename } from "path";
import { customRequestWithLimitsAndRetries } from "./network.js"; import { customRequest } from "./network.js";
const zCkanPackageList = z.object({ const zCkanPackageList = z.object({
success: z.literal(true), success: z.literal(true),
@ -11,7 +12,7 @@ const zCkanPackageList = z.object({
* @param {string} url * @param {string} url
*/ */
async function getJson(url) { async function getJson(url) {
const res = await customRequestWithLimitsAndRetries(new URL(url)); const res = await customRequest(new URL(url));
const json = await res.body.json(); const json = await res.body.json();
return json; return json;
} }
@ -114,9 +115,9 @@ async function getCkanInfo(ckanUrl) {
export async function generateDataJsonFromCkan(ckanUrl) { export async function generateDataJsonFromCkan(ckanUrl) {
const list = await getCkanPackageList(ckanUrl); const list = await getCkanPackageList(ckanUrl);
const info = await getCkanInfo(ckanUrl); const info = await getCkanInfo(ckanUrl);
const packages = await Promise.all( const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), {
list.map((n) => getCkanPackage(ckanUrl, n)) concurrency: 12,
); });
/** @type {import("common/schema.js").Data & { generatedBy: string }} */ /** @type {import("common/schema.js").Data & { generatedBy: string }} */
const data = { const data = {
generatedBy: generatedBy:

View file

@ -6,9 +6,10 @@ import { zData } from "common/schema.js";
import { import {
StatusCodeError, StatusCodeError,
TooManyRedirectsError, TooManyRedirectsError,
customRequestWithLimitsAndRetries, customRequestWithRetries,
} from "./network.js"; } from "./network.js";
import { createWriteStream } from "node:fs"; import { createWriteStream } from "node:fs";
import pMap from "p-map";
let urls = process.argv.slice(2); let urls = process.argv.slice(2);
if (urls.length < 1) { if (urls.length < 1) {
@ -82,7 +83,9 @@ async function downloadFromData(target) {
shuffleArray(jobs); shuffleArray(jobs);
const promises = jobs.map(async (job) => { const promise = pMap(
jobs,
async (job) => {
try { try {
return await downloadDistWithRetries(job); return await downloadDistWithRetries(job);
} catch (error) { } catch (error) {
@ -91,7 +94,10 @@ async function downloadFromData(target) {
} finally { } finally {
nFinished++; nFinished++;
} }
}); },
// en realidad está limitado por el balancedpool
{ concurrency: 32 }
);
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`); process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => { const interval = setInterval(() => {
@ -99,7 +105,7 @@ async function downloadFromData(target) {
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n` `info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
); );
}, 30000); }, 30000);
await Promise.all(promises); await promise;
clearInterval(interval); clearInterval(interval);
if (nErrors > 0) if (nErrors > 0)
console.error(`${outputPath}: Finished with ${nErrors} errors`); console.error(`${outputPath}: Finished with ${nErrors} errors`);
@ -116,9 +122,7 @@ async function getDataJsonForTarget(target) {
if (target.type === "ckan") { if (target.type === "ckan") {
return JSON.stringify(await generateDataJsonFromCkan(target.url)); return JSON.stringify(await generateDataJsonFromCkan(target.url));
} else if (target.type === "datajson") { } else if (target.type === "datajson") {
const jsonRes = await customRequestWithLimitsAndRetries( const jsonRes = await customRequestWithRetries(new URL(target.url));
new URL(target.url)
);
return await jsonRes.body.text(); return await jsonRes.body.text();
} else throw new Error("?????????????"); } else throw new Error("?????????????");
} }
@ -136,7 +140,7 @@ export function generateOutputPath(jsonUrlString) {
* @argument {DownloadJob} job * @argument {DownloadJob} job
*/ */
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) { async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
const res = await customRequestWithLimitsAndRetries(url); const res = await customRequestWithRetries(url);
const fileDirPath = join( const fileDirPath = join(
outputPath, outputPath,

View file

@ -1,7 +1,5 @@
import { Dispatcher, request, Agent } from "undici"; import { Dispatcher, request, Agent } from "undici";
import pLimit from "p-limit";
import { userAgent } from "./config.js"; import { userAgent } from "./config.js";
import pThrottle from "p-throttle";
const dispatcher = new Agent({ const dispatcher = new Agent({
connect: { timeout: 60 * 1000 }, connect: { timeout: 60 * 1000 },
@ -20,14 +18,6 @@ export class StatusCodeError extends Error {
} }
export class TooManyRedirectsError extends Error {} export class TooManyRedirectsError extends Error {}
/** key es host
* @type {Map<string, <Argument extends unknown, ReturnType>(
fn: (arguments_: Argument) => PromiseLike<ReturnType>) => Promise<ReturnType>>} */
const limiters = new Map();
const nConnections = process.env.N_THREADS
? parseInt(process.env.N_THREADS)
: 8;
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false; const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
/** /**
@ -35,9 +25,9 @@ const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
* @argument {number} attempts * @argument {number} attempts
* @returns {Promise<Dispatcher.ResponseData>} * @returns {Promise<Dispatcher.ResponseData>}
*/ */
export async function customRequestWithLimitsAndRetries(url, attempts = 0) { export async function customRequestWithRetries(url, attempts = 0) {
try { try {
return await _customRequestWithLimits(url); return await customRequest(url);
} catch (error) { } catch (error) {
// algunos servidores usan 403 como coso para decir "calmate" // algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio // intentar hasta 15 veces con 15 segundos de por medio
@ -50,7 +40,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
if (REPORT_RETRIES) if (REPORT_RETRIES)
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`); console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000); await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
return await customRequestWithLimitsAndRetries(url, attempts + 1); return await customRequestWithRetries(url, attempts + 1);
} }
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio // si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if ( else if (
@ -61,7 +51,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
if (REPORT_RETRIES) if (REPORT_RETRIES)
console.debug(`reintentando[${attempts}] ${url.toString()}`); console.debug(`reintentando[${attempts}] ${url.toString()}`);
await wait(5000 + Math.random() * 10000); await wait(5000 + Math.random() * 10000);
return await customRequestWithLimitsAndRetries(url, attempts + 1); return await customRequestWithRetries(url, attempts + 1);
} else throw error; } else throw error;
} }
} }
@ -71,25 +61,6 @@ function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }
/**
* @param {URL} url
* @returns {Promise<Dispatcher.ResponseData>}
*/
function _customRequestWithLimits(url) {
let limit = limiters.get(url.host);
if (!limit) {
if (url.host === "cdn.buenosaires.gob.ar") {
// tenemos que throttlear en este host porque tiene un rate limit.
// de todas maneras descarga rápido
limit = pThrottle({ limit: 3, interval: 1000 })((x) => x());
} else {
limit = pLimit(nConnections);
}
limiters.set(url.host, limit);
}
return limit(() => _customRequest(url));
}
/** /**
* genera los headers para hacer un pedido dependiendo de la url * genera los headers para hacer un pedido dependiendo de la url
* @param {URL} url * @param {URL} url
@ -108,7 +79,7 @@ function getHeaders(url) {
/** /**
* @param {URL} url * @param {URL} url
*/ */
async function _customRequest(url) { export async function customRequest(url) {
const res = await request(url.toString(), { const res = await request(url.toString(), {
headers: getHeaders(url), headers: getHeaders(url),
dispatcher, dispatcher,

View file

@ -13,6 +13,7 @@
"dependencies": { "dependencies": {
"common": "workspace:", "common": "workspace:",
"p-limit": "^5.0.0", "p-limit": "^5.0.0",
"p-map": "^7.0.0",
"p-throttle": "^6.1.0", "p-throttle": "^6.1.0",
"undici": "^6.0.1", "undici": "^6.0.1",
"zod": "^3.22.4" "zod": "^3.22.4"

View file

@ -20,6 +20,9 @@ importers:
p-limit: p-limit:
specifier: ^5.0.0 specifier: ^5.0.0
version: 5.0.0 version: 5.0.0
p-map:
specifier: ^7.0.0
version: 7.0.0
p-throttle: p-throttle:
specifier: ^6.1.0 specifier: ^6.1.0
version: 6.1.0 version: 6.1.0
@ -1206,6 +1209,11 @@ packages:
yocto-queue: 1.0.0 yocto-queue: 1.0.0
dev: false dev: false
/p-map@7.0.0:
resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==}
engines: {node: '>=18'}
dev: false
/p-throttle@6.1.0: /p-throttle@6.1.0:
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==} resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
engines: {node: '>=18'} engines: {node: '>=18'}