diff --git a/downloader/ckan_to_datajson.js b/downloader/ckan_to_datajson.js index 1e79f2c..900ef32 100644 --- a/downloader/ckan_to_datajson.js +++ b/downloader/ckan_to_datajson.js @@ -1,7 +1,6 @@ -import { request } from "undici"; import z from "zod"; -import { userAgent } from "./config.js"; import { basename } from "path"; +import { customRequestWithLimitsAndRetries } from "./network.js"; const zCkanPackageList = z.object({ success: z.literal(true), @@ -12,11 +11,7 @@ const zCkanPackageList = z.object({ * @param {string} url */ async function getJson(url) { - const res = await request(url, { - headers: { - "User-Agent": userAgent, - }, - }); + const res = await customRequestWithLimitsAndRetries(new URL(url)); const json = await res.body.json(); return json; } diff --git a/downloader/download_json.js b/downloader/download_json.js index ba8d2b8..378d50b 100644 --- a/downloader/download_json.js +++ b/downloader/download_json.js @@ -1,32 +1,14 @@ import { mkdir, open, writeFile } from "node:fs/promises"; -import { Agent, fetch, request, setGlobalDispatcher } from "undici"; import { join, normalize } from "node:path"; -import pLimit from "p-limit"; -import { targetsPorDefecto, userAgent } from "./config.js"; +import { targetsPorDefecto } from "./config.js"; import { generateDataJsonFromCkan } from "./ckan_to_datajson.js"; import { zData } from "common/schema.js"; +import { + StatusCodeError, + TooManyRedirectsError, + customRequestWithLimitsAndRetries, +} from "./network.js"; -setGlobalDispatcher( - new Agent({ - pipelining: 0, - }), -); - -/** key es host - * @type {Map} */ -const limiters = new Map(); -const nThreads = process.env.N_THREADS ? parseInt(process.env.N_THREADS) : 8; - -class StatusCodeError extends Error { - /** - * @param {number} code - */ - constructor(code) { - super(`Status code: ${code}`); - this.code = code; - } -} -class TooManyRedirectsError extends Error {} let urls = process.argv.slice(2); if (urls.length < 1) { urls = targetsPorDefecto; @@ -43,7 +25,7 @@ const targets = urls.map((url) => { }); for (const target of targets) downloadFromData(target).catch((error) => - console.error(`${target.type}+${target.url} FALLÓ CON`, error), + console.error(`${target.type}+${target.url} FALLÓ CON`, error) ); /** @@ -55,8 +37,10 @@ async function downloadFromData(target) { if (target.type === "ckan") { json = await generateDataJsonFromCkan(target.url); } else if (target.type === "datajson") { - const jsonRes = await fetch(target.url); - json = await jsonRes.json(); + const jsonRes = await customRequestWithLimitsAndRetries( + new URL(target.url) + ); + json = await jsonRes.body.json(); } const parsed = zData.parse(json); @@ -84,12 +68,12 @@ async function downloadFromData(target) { return true; } catch (error) { errorFile.write( - JSON.stringify(encodeError({ dataset, dist }, error)) + "\n", + JSON.stringify(encodeError({ dataset, dist }, error)) + "\n" ); nErrors++; return false; } - }, + } ) .map((dist) => ({ dataset, @@ -97,7 +81,7 @@ async function downloadFromData(target) { url: patchUrl(new URL(dist.downloadURL)), outputPath, attempts: 0, - })), + })) ); const totalJobs = jobs.length; @@ -106,28 +90,21 @@ async function downloadFromData(target) { shuffleArray(jobs); - const promises = jobs.map((job) => { - let limit = limiters.get(job.url.host); - if (!limit) { - limit = pLimit(nThreads); - limiters.set(job.url.host, limit); + const promises = jobs.map(async (job) => { + try { + return await downloadDistWithRetries(job); + } catch (error) { + errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); + nErrors++; + } finally { + nFinished++; } - return limit(async () => { - try { - await downloadDistWithRetries(job); - } catch (error) { - errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); - nErrors++; - } finally { - nFinished++; - } - }); }); process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`); const interval = setInterval(() => { process.stderr.write( - `info[${outputPath}]: ${nFinished}/${totalJobs} done\n`, + `info[${outputPath}]: ${nFinished}/${totalJobs} done\n` ); }, 30000); await Promise.all(promises); @@ -150,67 +127,19 @@ export function generateOutputPath(jsonUrlString) { /** * @argument {DownloadJob} job - * @argument {number} attempts - * @returns {Promise} */ -async function downloadDistWithRetries(job, attempts = 0) { - const { url } = job; - try { - await downloadDist(job); - } catch (error) { - // algunos servidores usan 403 como coso para decir "calmate" - // intentar hasta 15 veces con 15 segundos de por medio - if ( - error instanceof StatusCodeError && - ((error.code === 403 && url.host === "minsegar-my.sharepoint.com") || - (error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) && - attempts < 15 - ) { - await wait(15000); - return await downloadDistWithRetries(job, attempts + 1); - } - // si no fue un error de http, reintentar hasta 3 veces con 5 segundos de por medio - else if ( - !(error instanceof StatusCodeError) && - !(error instanceof TooManyRedirectsError) && - attempts < 3 - ) { - await wait(5000 + Math.random() * 10000); - return await downloadDistWithRetries(job, attempts + 1); - } else throw error; - } -} - -/** - * @argument {DownloadJob} job - */ -async function downloadDist({ dist, dataset, url, outputPath }) { - // sharepoint no le gusta compartir a bots lol - const spoofUserAgent = url.host.endsWith("sharepoint.com"); - - const res = await request(url.toString(), { - maxRedirections: 20, - headers: { - "User-Agent": spoofUserAgent - ? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0" - : userAgent, - }, - }); - if (res.statusCode >= 300 && res.statusCode <= 399) - throw new TooManyRedirectsError(); - if (res.statusCode < 200 || res.statusCode > 299) { - throw new StatusCodeError(res.statusCode); - } +async function downloadDistWithRetries({ dist, dataset, url, outputPath }) { + const res = await customRequestWithLimitsAndRetries(url); const fileDirPath = join( outputPath, sanitizeSuffix(dataset.identifier), - sanitizeSuffix(dist.identifier), + sanitizeSuffix(dist.identifier) ); await mkdir(fileDirPath, { recursive: true }); const filePath = join( fileDirPath, - sanitizeSuffix(dist.fileName || dist.identifier), + sanitizeSuffix(dist.fileName || dist.identifier) ); if (!res.body) throw new Error("no body"); @@ -240,11 +169,11 @@ function sanitizeSuffix(path) { */ function chequearIdsDuplicados(jobs, id) { const duplicated = hasDuplicates( - jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`), + jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`) ); if (duplicated) { console.error( - `ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`, + `ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si` ); } } @@ -254,11 +183,6 @@ function hasDuplicates(array) { return new Set(array).size !== array.length; } -/** @argument {number} ms */ -function wait(ms) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - /** * @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job * @param {any} error diff --git a/downloader/network.js b/downloader/network.js new file mode 100644 index 0000000..adaf535 --- /dev/null +++ b/downloader/network.js @@ -0,0 +1,105 @@ +import { Dispatcher, request, Agent, setGlobalDispatcher } from "undici"; +import pLimit from "p-limit"; +import { userAgent } from "./config.js"; + +setGlobalDispatcher( + new Agent({ + pipelining: 0, + bodyTimeout: 15 * 60 * 1000, + }) +); + +export class StatusCodeError extends Error { + /** + * @param {number} code + */ + constructor(code) { + super(`Status code: ${code}`); + this.code = code; + } +} +export class TooManyRedirectsError extends Error {} + +/** key es host + * @type {Map} */ +const limiters = new Map(); +const nConnections = process.env.N_THREADS + ? parseInt(process.env.N_THREADS) + : 8; + +/** + * @argument {URL} url + * @argument {number} attempts + * @returns {Promise} + */ +export async function customRequestWithLimitsAndRetries(url, attempts = 0) { + try { + return await _customRequestWithLimits(url); + } catch (error) { + // algunos servidores usan 403 como coso para decir "calmate" + // intentar hasta 15 veces con 15 segundos de por medio + if ( + error instanceof StatusCodeError && + ((error.code === 403 && url.host === "minsegar-my.sharepoint.com") || + (error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) && + attempts < 15 + ) { + await wait(15000); + return await customRequestWithLimitsAndRetries(url, attempts + 1); + } + // si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio + else if ( + !(error instanceof StatusCodeError) && + !(error instanceof TooManyRedirectsError) && + attempts < 7 + ) { + await wait(5000 + Math.random() * 10000); + return await customRequestWithLimitsAndRetries(url, attempts + 1); + } else throw error; + } +} + +/** @argument {number} ms */ +function wait(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * @param {URL} url + */ +async function _customRequestWithLimits(url) { + let limit = limiters.get(url.host); + if (!limit) { + limit = pLimit( + // tenemos que pingear mucho la API + url.host === "data.buenosaires.gob.ar" ? 32 : nConnections + ); + limiters.set(url.host, limit); + } + return limit(async () => { + return await _customRequest(url); + }); +} + +/** + * @param {URL} url + */ +async function _customRequest(url) { + // sharepoint no le gusta compartir a bots lol + const spoofUserAgent = url.host.endsWith("sharepoint.com"); + + const res = await request(url.toString(), { + maxRedirections: 20, + headers: { + "User-Agent": spoofUserAgent + ? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0" + : userAgent, + }, + }); + if (res.statusCode >= 300 && res.statusCode <= 399) + throw new TooManyRedirectsError(); + if (res.statusCode < 200 || res.statusCode > 299) + throw new StatusCodeError(res.statusCode); + + return res; +}