Compare commits

..

1 commit

Author SHA1 Message Date
6bd70eaa61 WIP: usar writeStream para guardar archivos 2023-12-18 17:09:20 -03:00
5 changed files with 66 additions and 56 deletions

View file

@ -1,7 +1,6 @@
import z from "zod";
import pMap from "p-map";
import { basename } from "path";
import { customRequest } from "./network.js";
import { customRequestWithLimitsAndRetries } from "./network.js";
const zCkanPackageList = z.object({
success: z.literal(true),
@ -12,7 +11,7 @@ const zCkanPackageList = z.object({
* @param {string} url
*/
async function getJson(url) {
const res = await customRequest(new URL(url));
const res = await customRequestWithLimitsAndRetries(new URL(url));
const json = await res.body.json();
return json;
}
@ -115,9 +114,9 @@ async function getCkanInfo(ckanUrl) {
export async function generateDataJsonFromCkan(ckanUrl) {
const list = await getCkanPackageList(ckanUrl);
const info = await getCkanInfo(ckanUrl);
const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), {
concurrency: 12,
});
const packages = await Promise.all(
list.map((n) => getCkanPackage(ckanUrl, n))
);
/** @type {import("common/schema.js").Data & { generatedBy: string }} */
const data = {
generatedBy:

View file

@ -6,10 +6,10 @@ import { zData } from "common/schema.js";
import {
StatusCodeError,
TooManyRedirectsError,
customRequestWithRetries,
customRequestWithLimitsAndRetries,
} from "./network.js";
import { createWriteStream } from "node:fs";
import pMap from "p-map";
import { pipeline } from "node:stream/promises";
let urls = process.argv.slice(2);
if (urls.length < 1) {
@ -35,11 +35,20 @@ for (const target of targets)
*/
async function downloadFromData(target) {
const outputPath = generateOutputPath(target.url);
const json = await getDataJsonForTarget(target);
const parsed = zData.parse(JSON.parse(json));
let json;
if (target.type === "ckan") {
json = await generateDataJsonFromCkan(target.url);
} else if (target.type === "datajson") {
const jsonRes = await customRequestWithLimitsAndRetries(
new URL(target.url)
);
json = await jsonRes.body.json();
}
const parsed = zData.parse(json);
await mkdir(outputPath, { recursive: true });
await writeFile(join(outputPath, "data.json"), json);
await writeFile(join(outputPath, "data.json"), JSON.stringify(json));
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
flags: "w",
@ -83,21 +92,16 @@ async function downloadFromData(target) {
shuffleArray(jobs);
const promise = pMap(
jobs,
async (job) => {
try {
return await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
},
// en realidad está limitado por el balancedpool
{ concurrency: 32 }
);
const promises = jobs.map(async (job) => {
try {
return await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
});
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => {
@ -105,7 +109,7 @@ async function downloadFromData(target) {
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
);
}, 30000);
await promise;
await Promise.all(promises);
clearInterval(interval);
if (nErrors > 0)
console.error(`${outputPath}: Finished with ${nErrors} errors`);
@ -114,19 +118,6 @@ async function downloadFromData(target) {
}
}
/**
* @param {Target} target
* @returns {Promise<string>}
*/
async function getDataJsonForTarget(target) {
if (target.type === "ckan") {
return JSON.stringify(await generateDataJsonFromCkan(target.url));
} else if (target.type === "datajson") {
const jsonRes = await customRequestWithRetries(new URL(target.url));
return await jsonRes.body.text();
} else throw new Error("?????????????");
}
/**
* @param {string} jsonUrlString
*/
@ -140,7 +131,7 @@ export function generateOutputPath(jsonUrlString) {
* @argument {DownloadJob} job
*/
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
const res = await customRequestWithRetries(url);
const res = await customRequestWithLimitsAndRetries(url);
const fileDirPath = join(
outputPath,
@ -154,7 +145,7 @@ async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
);
if (!res.body) throw new Error("no body");
await writeFile(filePath, res.body);
await pipeline(res.body, createWriteStream(filePath));
}
/** @typedef DownloadJob

View file

@ -1,5 +1,7 @@
import { Dispatcher, request, Agent } from "undici";
import pLimit from "p-limit";
import { userAgent } from "./config.js";
import pThrottle from "p-throttle";
const dispatcher = new Agent({
connect: { timeout: 60 * 1000 },
@ -18,6 +20,14 @@ export class StatusCodeError extends Error {
}
export class TooManyRedirectsError extends Error {}
/** key es host
* @type {Map<string, <Argument extends unknown, ReturnType>(
fn: (arguments_: Argument) => PromiseLike<ReturnType>) => Promise<ReturnType>>} */
const limiters = new Map();
const nConnections = process.env.N_THREADS
? parseInt(process.env.N_THREADS)
: 8;
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
/**
@ -25,9 +35,9 @@ const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
* @argument {number} attempts
* @returns {Promise<Dispatcher.ResponseData>}
*/
export async function customRequestWithRetries(url, attempts = 0) {
export async function customRequestWithLimitsAndRetries(url, attempts = 0) {
try {
return await customRequest(url);
return await _customRequestWithLimits(url);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
@ -40,7 +50,7 @@ export async function customRequestWithRetries(url, attempts = 0) {
if (REPORT_RETRIES)
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
return await customRequestWithLimitsAndRetries(url, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
@ -51,7 +61,7 @@ export async function customRequestWithRetries(url, attempts = 0) {
if (REPORT_RETRIES)
console.debug(`reintentando[${attempts}] ${url.toString()}`);
await wait(5000 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
return await customRequestWithLimitsAndRetries(url, attempts + 1);
} else throw error;
}
}
@ -61,6 +71,25 @@ function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* @param {URL} url
* @returns {Promise<Dispatcher.ResponseData>}
*/
function _customRequestWithLimits(url) {
let limit = limiters.get(url.host);
if (!limit) {
if (url.host === "cdn.buenosaires.gob.ar") {
// tenemos que throttlear en este host porque tiene un rate limit.
// de todas maneras descarga rápido
limit = pThrottle({ limit: 3, interval: 1000 })((x) => x());
} else {
limit = pLimit(nConnections);
}
limiters.set(url.host, limit);
}
return limit(() => _customRequest(url));
}
/**
* genera los headers para hacer un pedido dependiendo de la url
* @param {URL} url
@ -79,7 +108,7 @@ function getHeaders(url) {
/**
* @param {URL} url
*/
export async function customRequest(url) {
async function _customRequest(url) {
const res = await request(url.toString(), {
headers: getHeaders(url),
dispatcher,

View file

@ -13,7 +13,6 @@
"dependencies": {
"common": "workspace:",
"p-limit": "^5.0.0",
"p-map": "^7.0.0",
"p-throttle": "^6.1.0",
"undici": "^6.0.1",
"zod": "^3.22.4"

View file

@ -20,9 +20,6 @@ importers:
p-limit:
specifier: ^5.0.0
version: 5.0.0
p-map:
specifier: ^7.0.0
version: 7.0.0
p-throttle:
specifier: ^6.1.0
version: 6.1.0
@ -1209,11 +1206,6 @@ packages:
yocto-queue: 1.0.0
dev: false
/p-map@7.0.0:
resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==}
engines: {node: '>=18'}
dev: false
/p-throttle@6.1.0:
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
engines: {node: '>=18'}