mirror of
https://github.com/catdevnull/transicion-desordenada-diablo
synced 2024-11-26 03:26:18 +00:00
Compare commits
1 commit
6c3776ffd2
...
6bd70eaa61
Author | SHA1 | Date | |
---|---|---|---|
6bd70eaa61 |
5 changed files with 66 additions and 56 deletions
|
@ -1,7 +1,6 @@
|
|||
import z from "zod";
|
||||
import pMap from "p-map";
|
||||
import { basename } from "path";
|
||||
import { customRequest } from "./network.js";
|
||||
import { customRequestWithLimitsAndRetries } from "./network.js";
|
||||
|
||||
const zCkanPackageList = z.object({
|
||||
success: z.literal(true),
|
||||
|
@ -12,7 +11,7 @@ const zCkanPackageList = z.object({
|
|||
* @param {string} url
|
||||
*/
|
||||
async function getJson(url) {
|
||||
const res = await customRequest(new URL(url));
|
||||
const res = await customRequestWithLimitsAndRetries(new URL(url));
|
||||
const json = await res.body.json();
|
||||
return json;
|
||||
}
|
||||
|
@ -115,9 +114,9 @@ async function getCkanInfo(ckanUrl) {
|
|||
export async function generateDataJsonFromCkan(ckanUrl) {
|
||||
const list = await getCkanPackageList(ckanUrl);
|
||||
const info = await getCkanInfo(ckanUrl);
|
||||
const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), {
|
||||
concurrency: 12,
|
||||
});
|
||||
const packages = await Promise.all(
|
||||
list.map((n) => getCkanPackage(ckanUrl, n))
|
||||
);
|
||||
/** @type {import("common/schema.js").Data & { generatedBy: string }} */
|
||||
const data = {
|
||||
generatedBy:
|
||||
|
|
|
@ -6,10 +6,10 @@ import { zData } from "common/schema.js";
|
|||
import {
|
||||
StatusCodeError,
|
||||
TooManyRedirectsError,
|
||||
customRequestWithRetries,
|
||||
customRequestWithLimitsAndRetries,
|
||||
} from "./network.js";
|
||||
import { createWriteStream } from "node:fs";
|
||||
import pMap from "p-map";
|
||||
import { pipeline } from "node:stream/promises";
|
||||
|
||||
let urls = process.argv.slice(2);
|
||||
if (urls.length < 1) {
|
||||
|
@ -35,11 +35,20 @@ for (const target of targets)
|
|||
*/
|
||||
async function downloadFromData(target) {
|
||||
const outputPath = generateOutputPath(target.url);
|
||||
const json = await getDataJsonForTarget(target);
|
||||
const parsed = zData.parse(JSON.parse(json));
|
||||
let json;
|
||||
if (target.type === "ckan") {
|
||||
json = await generateDataJsonFromCkan(target.url);
|
||||
} else if (target.type === "datajson") {
|
||||
const jsonRes = await customRequestWithLimitsAndRetries(
|
||||
new URL(target.url)
|
||||
);
|
||||
json = await jsonRes.body.json();
|
||||
}
|
||||
|
||||
const parsed = zData.parse(json);
|
||||
|
||||
await mkdir(outputPath, { recursive: true });
|
||||
await writeFile(join(outputPath, "data.json"), json);
|
||||
await writeFile(join(outputPath, "data.json"), JSON.stringify(json));
|
||||
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
|
||||
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
|
||||
flags: "w",
|
||||
|
@ -83,21 +92,16 @@ async function downloadFromData(target) {
|
|||
|
||||
shuffleArray(jobs);
|
||||
|
||||
const promise = pMap(
|
||||
jobs,
|
||||
async (job) => {
|
||||
try {
|
||||
return await downloadDistWithRetries(job);
|
||||
} catch (error) {
|
||||
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
|
||||
nErrors++;
|
||||
} finally {
|
||||
nFinished++;
|
||||
}
|
||||
},
|
||||
// en realidad está limitado por el balancedpool
|
||||
{ concurrency: 32 }
|
||||
);
|
||||
const promises = jobs.map(async (job) => {
|
||||
try {
|
||||
return await downloadDistWithRetries(job);
|
||||
} catch (error) {
|
||||
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
|
||||
nErrors++;
|
||||
} finally {
|
||||
nFinished++;
|
||||
}
|
||||
});
|
||||
|
||||
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
|
||||
const interval = setInterval(() => {
|
||||
|
@ -105,7 +109,7 @@ async function downloadFromData(target) {
|
|||
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
|
||||
);
|
||||
}, 30000);
|
||||
await promise;
|
||||
await Promise.all(promises);
|
||||
clearInterval(interval);
|
||||
if (nErrors > 0)
|
||||
console.error(`${outputPath}: Finished with ${nErrors} errors`);
|
||||
|
@ -114,19 +118,6 @@ async function downloadFromData(target) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Target} target
|
||||
* @returns {Promise<string>}
|
||||
*/
|
||||
async function getDataJsonForTarget(target) {
|
||||
if (target.type === "ckan") {
|
||||
return JSON.stringify(await generateDataJsonFromCkan(target.url));
|
||||
} else if (target.type === "datajson") {
|
||||
const jsonRes = await customRequestWithRetries(new URL(target.url));
|
||||
return await jsonRes.body.text();
|
||||
} else throw new Error("?????????????");
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} jsonUrlString
|
||||
*/
|
||||
|
@ -140,7 +131,7 @@ export function generateOutputPath(jsonUrlString) {
|
|||
* @argument {DownloadJob} job
|
||||
*/
|
||||
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
|
||||
const res = await customRequestWithRetries(url);
|
||||
const res = await customRequestWithLimitsAndRetries(url);
|
||||
|
||||
const fileDirPath = join(
|
||||
outputPath,
|
||||
|
@ -154,7 +145,7 @@ async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
|
|||
);
|
||||
|
||||
if (!res.body) throw new Error("no body");
|
||||
await writeFile(filePath, res.body);
|
||||
await pipeline(res.body, createWriteStream(filePath));
|
||||
}
|
||||
|
||||
/** @typedef DownloadJob
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import { Dispatcher, request, Agent } from "undici";
|
||||
import pLimit from "p-limit";
|
||||
import { userAgent } from "./config.js";
|
||||
import pThrottle from "p-throttle";
|
||||
|
||||
const dispatcher = new Agent({
|
||||
connect: { timeout: 60 * 1000 },
|
||||
|
@ -18,6 +20,14 @@ export class StatusCodeError extends Error {
|
|||
}
|
||||
export class TooManyRedirectsError extends Error {}
|
||||
|
||||
/** key es host
|
||||
* @type {Map<string, <Argument extends unknown, ReturnType>(
|
||||
fn: (arguments_: Argument) => PromiseLike<ReturnType>) => Promise<ReturnType>>} */
|
||||
const limiters = new Map();
|
||||
const nConnections = process.env.N_THREADS
|
||||
? parseInt(process.env.N_THREADS)
|
||||
: 8;
|
||||
|
||||
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
|
||||
|
||||
/**
|
||||
|
@ -25,9 +35,9 @@ const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
|
|||
* @argument {number} attempts
|
||||
* @returns {Promise<Dispatcher.ResponseData>}
|
||||
*/
|
||||
export async function customRequestWithRetries(url, attempts = 0) {
|
||||
export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
|
||||
try {
|
||||
return await customRequest(url);
|
||||
return await _customRequestWithLimits(url);
|
||||
} catch (error) {
|
||||
// algunos servidores usan 403 como coso para decir "calmate"
|
||||
// intentar hasta 15 veces con 15 segundos de por medio
|
||||
|
@ -40,7 +50,7 @@ export async function customRequestWithRetries(url, attempts = 0) {
|
|||
if (REPORT_RETRIES)
|
||||
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
|
||||
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
|
||||
return await customRequestWithRetries(url, attempts + 1);
|
||||
return await customRequestWithLimitsAndRetries(url, attempts + 1);
|
||||
}
|
||||
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
|
||||
else if (
|
||||
|
@ -51,7 +61,7 @@ export async function customRequestWithRetries(url, attempts = 0) {
|
|||
if (REPORT_RETRIES)
|
||||
console.debug(`reintentando[${attempts}] ${url.toString()}`);
|
||||
await wait(5000 + Math.random() * 10000);
|
||||
return await customRequestWithRetries(url, attempts + 1);
|
||||
return await customRequestWithLimitsAndRetries(url, attempts + 1);
|
||||
} else throw error;
|
||||
}
|
||||
}
|
||||
|
@ -61,6 +71,25 @@ function wait(ms) {
|
|||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {URL} url
|
||||
* @returns {Promise<Dispatcher.ResponseData>}
|
||||
*/
|
||||
function _customRequestWithLimits(url) {
|
||||
let limit = limiters.get(url.host);
|
||||
if (!limit) {
|
||||
if (url.host === "cdn.buenosaires.gob.ar") {
|
||||
// tenemos que throttlear en este host porque tiene un rate limit.
|
||||
// de todas maneras descarga rápido
|
||||
limit = pThrottle({ limit: 3, interval: 1000 })((x) => x());
|
||||
} else {
|
||||
limit = pLimit(nConnections);
|
||||
}
|
||||
limiters.set(url.host, limit);
|
||||
}
|
||||
return limit(() => _customRequest(url));
|
||||
}
|
||||
|
||||
/**
|
||||
* genera los headers para hacer un pedido dependiendo de la url
|
||||
* @param {URL} url
|
||||
|
@ -79,7 +108,7 @@ function getHeaders(url) {
|
|||
/**
|
||||
* @param {URL} url
|
||||
*/
|
||||
export async function customRequest(url) {
|
||||
async function _customRequest(url) {
|
||||
const res = await request(url.toString(), {
|
||||
headers: getHeaders(url),
|
||||
dispatcher,
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
"dependencies": {
|
||||
"common": "workspace:",
|
||||
"p-limit": "^5.0.0",
|
||||
"p-map": "^7.0.0",
|
||||
"p-throttle": "^6.1.0",
|
||||
"undici": "^6.0.1",
|
||||
"zod": "^3.22.4"
|
||||
|
|
|
@ -20,9 +20,6 @@ importers:
|
|||
p-limit:
|
||||
specifier: ^5.0.0
|
||||
version: 5.0.0
|
||||
p-map:
|
||||
specifier: ^7.0.0
|
||||
version: 7.0.0
|
||||
p-throttle:
|
||||
specifier: ^6.1.0
|
||||
version: 6.1.0
|
||||
|
@ -1209,11 +1206,6 @@ packages:
|
|||
yocto-queue: 1.0.0
|
||||
dev: false
|
||||
|
||||
/p-map@7.0.0:
|
||||
resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==}
|
||||
engines: {node: '>=18'}
|
||||
dev: false
|
||||
|
||||
/p-throttle@6.1.0:
|
||||
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
|
||||
engines: {node: '>=18'}
|
||||
|
|
Loading…
Reference in a new issue