mirror of
https://github.com/catdevnull/transicion-desordenada-diablo
synced 2024-11-26 03:26:18 +00:00
Compare commits
2 commits
6bd70eaa61
...
6c3776ffd2
Author | SHA1 | Date | |
---|---|---|---|
6c3776ffd2 | |||
5406113dbb |
5 changed files with 56 additions and 66 deletions
|
@ -1,6 +1,7 @@
|
||||||
import z from "zod";
|
import z from "zod";
|
||||||
|
import pMap from "p-map";
|
||||||
import { basename } from "path";
|
import { basename } from "path";
|
||||||
import { customRequestWithLimitsAndRetries } from "./network.js";
|
import { customRequest } from "./network.js";
|
||||||
|
|
||||||
const zCkanPackageList = z.object({
|
const zCkanPackageList = z.object({
|
||||||
success: z.literal(true),
|
success: z.literal(true),
|
||||||
|
@ -11,7 +12,7 @@ const zCkanPackageList = z.object({
|
||||||
* @param {string} url
|
* @param {string} url
|
||||||
*/
|
*/
|
||||||
async function getJson(url) {
|
async function getJson(url) {
|
||||||
const res = await customRequestWithLimitsAndRetries(new URL(url));
|
const res = await customRequest(new URL(url));
|
||||||
const json = await res.body.json();
|
const json = await res.body.json();
|
||||||
return json;
|
return json;
|
||||||
}
|
}
|
||||||
|
@ -114,9 +115,9 @@ async function getCkanInfo(ckanUrl) {
|
||||||
export async function generateDataJsonFromCkan(ckanUrl) {
|
export async function generateDataJsonFromCkan(ckanUrl) {
|
||||||
const list = await getCkanPackageList(ckanUrl);
|
const list = await getCkanPackageList(ckanUrl);
|
||||||
const info = await getCkanInfo(ckanUrl);
|
const info = await getCkanInfo(ckanUrl);
|
||||||
const packages = await Promise.all(
|
const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), {
|
||||||
list.map((n) => getCkanPackage(ckanUrl, n))
|
concurrency: 12,
|
||||||
);
|
});
|
||||||
/** @type {import("common/schema.js").Data & { generatedBy: string }} */
|
/** @type {import("common/schema.js").Data & { generatedBy: string }} */
|
||||||
const data = {
|
const data = {
|
||||||
generatedBy:
|
generatedBy:
|
||||||
|
|
|
@ -6,10 +6,10 @@ import { zData } from "common/schema.js";
|
||||||
import {
|
import {
|
||||||
StatusCodeError,
|
StatusCodeError,
|
||||||
TooManyRedirectsError,
|
TooManyRedirectsError,
|
||||||
customRequestWithLimitsAndRetries,
|
customRequestWithRetries,
|
||||||
} from "./network.js";
|
} from "./network.js";
|
||||||
import { createWriteStream } from "node:fs";
|
import { createWriteStream } from "node:fs";
|
||||||
import { pipeline } from "node:stream/promises";
|
import pMap from "p-map";
|
||||||
|
|
||||||
let urls = process.argv.slice(2);
|
let urls = process.argv.slice(2);
|
||||||
if (urls.length < 1) {
|
if (urls.length < 1) {
|
||||||
|
@ -35,20 +35,11 @@ for (const target of targets)
|
||||||
*/
|
*/
|
||||||
async function downloadFromData(target) {
|
async function downloadFromData(target) {
|
||||||
const outputPath = generateOutputPath(target.url);
|
const outputPath = generateOutputPath(target.url);
|
||||||
let json;
|
const json = await getDataJsonForTarget(target);
|
||||||
if (target.type === "ckan") {
|
const parsed = zData.parse(JSON.parse(json));
|
||||||
json = await generateDataJsonFromCkan(target.url);
|
|
||||||
} else if (target.type === "datajson") {
|
|
||||||
const jsonRes = await customRequestWithLimitsAndRetries(
|
|
||||||
new URL(target.url)
|
|
||||||
);
|
|
||||||
json = await jsonRes.body.json();
|
|
||||||
}
|
|
||||||
|
|
||||||
const parsed = zData.parse(json);
|
|
||||||
|
|
||||||
await mkdir(outputPath, { recursive: true });
|
await mkdir(outputPath, { recursive: true });
|
||||||
await writeFile(join(outputPath, "data.json"), JSON.stringify(json));
|
await writeFile(join(outputPath, "data.json"), json);
|
||||||
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
|
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
|
||||||
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
|
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
|
||||||
flags: "w",
|
flags: "w",
|
||||||
|
@ -92,16 +83,21 @@ async function downloadFromData(target) {
|
||||||
|
|
||||||
shuffleArray(jobs);
|
shuffleArray(jobs);
|
||||||
|
|
||||||
const promises = jobs.map(async (job) => {
|
const promise = pMap(
|
||||||
try {
|
jobs,
|
||||||
return await downloadDistWithRetries(job);
|
async (job) => {
|
||||||
} catch (error) {
|
try {
|
||||||
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
|
return await downloadDistWithRetries(job);
|
||||||
nErrors++;
|
} catch (error) {
|
||||||
} finally {
|
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
|
||||||
nFinished++;
|
nErrors++;
|
||||||
}
|
} finally {
|
||||||
});
|
nFinished++;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// en realidad está limitado por el balancedpool
|
||||||
|
{ concurrency: 32 }
|
||||||
|
);
|
||||||
|
|
||||||
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
|
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
|
||||||
const interval = setInterval(() => {
|
const interval = setInterval(() => {
|
||||||
|
@ -109,7 +105,7 @@ async function downloadFromData(target) {
|
||||||
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
|
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
|
||||||
);
|
);
|
||||||
}, 30000);
|
}, 30000);
|
||||||
await Promise.all(promises);
|
await promise;
|
||||||
clearInterval(interval);
|
clearInterval(interval);
|
||||||
if (nErrors > 0)
|
if (nErrors > 0)
|
||||||
console.error(`${outputPath}: Finished with ${nErrors} errors`);
|
console.error(`${outputPath}: Finished with ${nErrors} errors`);
|
||||||
|
@ -118,6 +114,19 @@ async function downloadFromData(target) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Target} target
|
||||||
|
* @returns {Promise<string>}
|
||||||
|
*/
|
||||||
|
async function getDataJsonForTarget(target) {
|
||||||
|
if (target.type === "ckan") {
|
||||||
|
return JSON.stringify(await generateDataJsonFromCkan(target.url));
|
||||||
|
} else if (target.type === "datajson") {
|
||||||
|
const jsonRes = await customRequestWithRetries(new URL(target.url));
|
||||||
|
return await jsonRes.body.text();
|
||||||
|
} else throw new Error("?????????????");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} jsonUrlString
|
* @param {string} jsonUrlString
|
||||||
*/
|
*/
|
||||||
|
@ -131,7 +140,7 @@ export function generateOutputPath(jsonUrlString) {
|
||||||
* @argument {DownloadJob} job
|
* @argument {DownloadJob} job
|
||||||
*/
|
*/
|
||||||
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
|
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
|
||||||
const res = await customRequestWithLimitsAndRetries(url);
|
const res = await customRequestWithRetries(url);
|
||||||
|
|
||||||
const fileDirPath = join(
|
const fileDirPath = join(
|
||||||
outputPath,
|
outputPath,
|
||||||
|
@ -145,7 +154,7 @@ async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!res.body) throw new Error("no body");
|
if (!res.body) throw new Error("no body");
|
||||||
await pipeline(res.body, createWriteStream(filePath));
|
await writeFile(filePath, res.body);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @typedef DownloadJob
|
/** @typedef DownloadJob
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
import { Dispatcher, request, Agent } from "undici";
|
import { Dispatcher, request, Agent } from "undici";
|
||||||
import pLimit from "p-limit";
|
|
||||||
import { userAgent } from "./config.js";
|
import { userAgent } from "./config.js";
|
||||||
import pThrottle from "p-throttle";
|
|
||||||
|
|
||||||
const dispatcher = new Agent({
|
const dispatcher = new Agent({
|
||||||
connect: { timeout: 60 * 1000 },
|
connect: { timeout: 60 * 1000 },
|
||||||
|
@ -20,14 +18,6 @@ export class StatusCodeError extends Error {
|
||||||
}
|
}
|
||||||
export class TooManyRedirectsError extends Error {}
|
export class TooManyRedirectsError extends Error {}
|
||||||
|
|
||||||
/** key es host
|
|
||||||
* @type {Map<string, <Argument extends unknown, ReturnType>(
|
|
||||||
fn: (arguments_: Argument) => PromiseLike<ReturnType>) => Promise<ReturnType>>} */
|
|
||||||
const limiters = new Map();
|
|
||||||
const nConnections = process.env.N_THREADS
|
|
||||||
? parseInt(process.env.N_THREADS)
|
|
||||||
: 8;
|
|
||||||
|
|
||||||
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
|
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,9 +25,9 @@ const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
|
||||||
* @argument {number} attempts
|
* @argument {number} attempts
|
||||||
* @returns {Promise<Dispatcher.ResponseData>}
|
* @returns {Promise<Dispatcher.ResponseData>}
|
||||||
*/
|
*/
|
||||||
export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
|
export async function customRequestWithRetries(url, attempts = 0) {
|
||||||
try {
|
try {
|
||||||
return await _customRequestWithLimits(url);
|
return await customRequest(url);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// algunos servidores usan 403 como coso para decir "calmate"
|
// algunos servidores usan 403 como coso para decir "calmate"
|
||||||
// intentar hasta 15 veces con 15 segundos de por medio
|
// intentar hasta 15 veces con 15 segundos de por medio
|
||||||
|
@ -50,7 +40,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
|
||||||
if (REPORT_RETRIES)
|
if (REPORT_RETRIES)
|
||||||
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
|
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
|
||||||
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
|
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
|
||||||
return await customRequestWithLimitsAndRetries(url, attempts + 1);
|
return await customRequestWithRetries(url, attempts + 1);
|
||||||
}
|
}
|
||||||
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
|
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
|
||||||
else if (
|
else if (
|
||||||
|
@ -61,7 +51,7 @@ export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
|
||||||
if (REPORT_RETRIES)
|
if (REPORT_RETRIES)
|
||||||
console.debug(`reintentando[${attempts}] ${url.toString()}`);
|
console.debug(`reintentando[${attempts}] ${url.toString()}`);
|
||||||
await wait(5000 + Math.random() * 10000);
|
await wait(5000 + Math.random() * 10000);
|
||||||
return await customRequestWithLimitsAndRetries(url, attempts + 1);
|
return await customRequestWithRetries(url, attempts + 1);
|
||||||
} else throw error;
|
} else throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,25 +61,6 @@ function wait(ms) {
|
||||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {URL} url
|
|
||||||
* @returns {Promise<Dispatcher.ResponseData>}
|
|
||||||
*/
|
|
||||||
function _customRequestWithLimits(url) {
|
|
||||||
let limit = limiters.get(url.host);
|
|
||||||
if (!limit) {
|
|
||||||
if (url.host === "cdn.buenosaires.gob.ar") {
|
|
||||||
// tenemos que throttlear en este host porque tiene un rate limit.
|
|
||||||
// de todas maneras descarga rápido
|
|
||||||
limit = pThrottle({ limit: 3, interval: 1000 })((x) => x());
|
|
||||||
} else {
|
|
||||||
limit = pLimit(nConnections);
|
|
||||||
}
|
|
||||||
limiters.set(url.host, limit);
|
|
||||||
}
|
|
||||||
return limit(() => _customRequest(url));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* genera los headers para hacer un pedido dependiendo de la url
|
* genera los headers para hacer un pedido dependiendo de la url
|
||||||
* @param {URL} url
|
* @param {URL} url
|
||||||
|
@ -108,7 +79,7 @@ function getHeaders(url) {
|
||||||
/**
|
/**
|
||||||
* @param {URL} url
|
* @param {URL} url
|
||||||
*/
|
*/
|
||||||
async function _customRequest(url) {
|
export async function customRequest(url) {
|
||||||
const res = await request(url.toString(), {
|
const res = await request(url.toString(), {
|
||||||
headers: getHeaders(url),
|
headers: getHeaders(url),
|
||||||
dispatcher,
|
dispatcher,
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"common": "workspace:",
|
"common": "workspace:",
|
||||||
"p-limit": "^5.0.0",
|
"p-limit": "^5.0.0",
|
||||||
|
"p-map": "^7.0.0",
|
||||||
"p-throttle": "^6.1.0",
|
"p-throttle": "^6.1.0",
|
||||||
"undici": "^6.0.1",
|
"undici": "^6.0.1",
|
||||||
"zod": "^3.22.4"
|
"zod": "^3.22.4"
|
||||||
|
|
|
@ -20,6 +20,9 @@ importers:
|
||||||
p-limit:
|
p-limit:
|
||||||
specifier: ^5.0.0
|
specifier: ^5.0.0
|
||||||
version: 5.0.0
|
version: 5.0.0
|
||||||
|
p-map:
|
||||||
|
specifier: ^7.0.0
|
||||||
|
version: 7.0.0
|
||||||
p-throttle:
|
p-throttle:
|
||||||
specifier: ^6.1.0
|
specifier: ^6.1.0
|
||||||
version: 6.1.0
|
version: 6.1.0
|
||||||
|
@ -1206,6 +1209,11 @@ packages:
|
||||||
yocto-queue: 1.0.0
|
yocto-queue: 1.0.0
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/p-map@7.0.0:
|
||||||
|
resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==}
|
||||||
|
engines: {node: '>=18'}
|
||||||
|
dev: false
|
||||||
|
|
||||||
/p-throttle@6.1.0:
|
/p-throttle@6.1.0:
|
||||||
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
|
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
|
||||||
engines: {node: '>=18'}
|
engines: {node: '>=18'}
|
||||||
|
|
Loading…
Reference in a new issue