WIP: estandarizar pedidos http

This commit is contained in:
Cat /dev/Nulo 2023-12-16 11:15:27 -03:00
parent 5880f9f289
commit 59db305e74
3 changed files with 136 additions and 112 deletions

View file

@ -1,7 +1,6 @@
import { request } from "undici";
import z from "zod";
import { userAgent } from "./config.js";
import { basename } from "path";
import { customRequestWithLimitsAndRetries } from "./network.js";
const zCkanPackageList = z.object({
success: z.literal(true),
@ -12,11 +11,7 @@ const zCkanPackageList = z.object({
* @param {string} url
*/
async function getJson(url) {
const res = await request(url, {
headers: {
"User-Agent": userAgent,
},
});
const res = await customRequestWithLimitsAndRetries(new URL(url));
const json = await res.body.json();
return json;
}

View file

@ -1,32 +1,14 @@
import { mkdir, open, writeFile } from "node:fs/promises";
import { Agent, fetch, request, setGlobalDispatcher } from "undici";
import { join, normalize } from "node:path";
import pLimit from "p-limit";
import { targetsPorDefecto, userAgent } from "./config.js";
import { targetsPorDefecto } from "./config.js";
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js";
import { zData } from "common/schema.js";
import {
StatusCodeError,
TooManyRedirectsError,
customRequestWithLimitsAndRetries,
} from "./network.js";
setGlobalDispatcher(
new Agent({
pipelining: 0,
}),
);
/** key es host
* @type {Map<string, import("p-limit").LimitFunction>} */
const limiters = new Map();
const nThreads = process.env.N_THREADS ? parseInt(process.env.N_THREADS) : 8;
class StatusCodeError extends Error {
/**
* @param {number} code
*/
constructor(code) {
super(`Status code: ${code}`);
this.code = code;
}
}
class TooManyRedirectsError extends Error {}
let urls = process.argv.slice(2);
if (urls.length < 1) {
urls = targetsPorDefecto;
@ -43,7 +25,7 @@ const targets = urls.map((url) => {
});
for (const target of targets)
downloadFromData(target).catch((error) =>
console.error(`${target.type}+${target.url} FALLÓ CON`, error),
console.error(`${target.type}+${target.url} FALLÓ CON`, error)
);
/**
@ -55,8 +37,10 @@ async function downloadFromData(target) {
if (target.type === "ckan") {
json = await generateDataJsonFromCkan(target.url);
} else if (target.type === "datajson") {
const jsonRes = await fetch(target.url);
json = await jsonRes.json();
const jsonRes = await customRequestWithLimitsAndRetries(
new URL(target.url)
);
json = await jsonRes.body.json();
}
const parsed = zData.parse(json);
@ -84,12 +68,12 @@ async function downloadFromData(target) {
return true;
} catch (error) {
errorFile.write(
JSON.stringify(encodeError({ dataset, dist }, error)) + "\n",
JSON.stringify(encodeError({ dataset, dist }, error)) + "\n"
);
nErrors++;
return false;
}
},
}
)
.map((dist) => ({
dataset,
@ -97,7 +81,7 @@ async function downloadFromData(target) {
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
})),
}))
);
const totalJobs = jobs.length;
@ -106,28 +90,21 @@ async function downloadFromData(target) {
shuffleArray(jobs);
const promises = jobs.map((job) => {
let limit = limiters.get(job.url.host);
if (!limit) {
limit = pLimit(nThreads);
limiters.set(job.url.host, limit);
const promises = jobs.map(async (job) => {
try {
return await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
return limit(async () => {
try {
await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
});
});
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => {
process.stderr.write(
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`,
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
);
}, 30000);
await Promise.all(promises);
@ -150,67 +127,19 @@ export function generateOutputPath(jsonUrlString) {
/**
* @argument {DownloadJob} job
* @argument {number} attempts
* @returns {Promise<void>}
*/
async function downloadDistWithRetries(job, attempts = 0) {
const { url } = job;
try {
await downloadDist(job);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) &&
attempts < 15
) {
await wait(15000);
return await downloadDistWithRetries(job, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con 5 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
attempts < 3
) {
await wait(5000 + Math.random() * 10000);
return await downloadDistWithRetries(job, attempts + 1);
} else throw error;
}
}
/**
* @argument {DownloadJob} job
*/
async function downloadDist({ dist, dataset, url, outputPath }) {
// sharepoint no le gusta compartir a bots lol
const spoofUserAgent = url.host.endsWith("sharepoint.com");
const res = await request(url.toString(), {
maxRedirections: 20,
headers: {
"User-Agent": spoofUserAgent
? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0"
: userAgent,
},
});
if (res.statusCode >= 300 && res.statusCode <= 399)
throw new TooManyRedirectsError();
if (res.statusCode < 200 || res.statusCode > 299) {
throw new StatusCodeError(res.statusCode);
}
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
const res = await customRequestWithLimitsAndRetries(url);
const fileDirPath = join(
outputPath,
sanitizeSuffix(dataset.identifier),
sanitizeSuffix(dist.identifier),
sanitizeSuffix(dist.identifier)
);
await mkdir(fileDirPath, { recursive: true });
const filePath = join(
fileDirPath,
sanitizeSuffix(dist.fileName || dist.identifier),
sanitizeSuffix(dist.fileName || dist.identifier)
);
if (!res.body) throw new Error("no body");
@ -240,11 +169,11 @@ function sanitizeSuffix(path) {
*/
function chequearIdsDuplicados(jobs, id) {
const duplicated = hasDuplicates(
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`),
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`)
);
if (duplicated) {
console.error(
`ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`,
`ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`
);
}
}
@ -254,11 +183,6 @@ function hasDuplicates(array) {
return new Set(array).size !== array.length;
}
/** @argument {number} ms */
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job
* @param {any} error

105
downloader/network.js Normal file
View file

@ -0,0 +1,105 @@
import { Dispatcher, request, Agent, setGlobalDispatcher } from "undici";
import pLimit from "p-limit";
import { userAgent } from "./config.js";
setGlobalDispatcher(
new Agent({
pipelining: 0,
bodyTimeout: 15 * 60 * 1000,
})
);
export class StatusCodeError extends Error {
/**
* @param {number} code
*/
constructor(code) {
super(`Status code: ${code}`);
this.code = code;
}
}
export class TooManyRedirectsError extends Error {}
/** key es host
* @type {Map<string, import("p-limit").LimitFunction>} */
const limiters = new Map();
const nConnections = process.env.N_THREADS
? parseInt(process.env.N_THREADS)
: 8;
/**
* @argument {URL} url
* @argument {number} attempts
* @returns {Promise<Dispatcher.ResponseData>}
*/
export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
try {
return await _customRequestWithLimits(url);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) &&
attempts < 15
) {
await wait(15000);
return await customRequestWithLimitsAndRetries(url, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
attempts < 7
) {
await wait(5000 + Math.random() * 10000);
return await customRequestWithLimitsAndRetries(url, attempts + 1);
} else throw error;
}
}
/** @argument {number} ms */
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* @param {URL} url
*/
async function _customRequestWithLimits(url) {
let limit = limiters.get(url.host);
if (!limit) {
limit = pLimit(
// tenemos que pingear mucho la API
url.host === "data.buenosaires.gob.ar" ? 32 : nConnections
);
limiters.set(url.host, limit);
}
return limit(async () => {
return await _customRequest(url);
});
}
/**
* @param {URL} url
*/
async function _customRequest(url) {
// sharepoint no le gusta compartir a bots lol
const spoofUserAgent = url.host.endsWith("sharepoint.com");
const res = await request(url.toString(), {
maxRedirections: 20,
headers: {
"User-Agent": spoofUserAgent
? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0"
: userAgent,
},
});
if (res.statusCode >= 300 && res.statusCode <= 399)
throw new TooManyRedirectsError();
if (res.statusCode < 200 || res.statusCode > 299)
throw new StatusCodeError(res.statusCode);
return res;
}