
288 lines
8 KiB
Raw Permalink Normal View History

import { mkdir, readFile, writeFile } from "node:fs/promises";
2023-11-27 20:01:56 +00:00
import { join, normalize } from "node:path";
2023-12-16 14:15:27 +00:00
import { targetsPorDefecto } from "./config.js";
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js";
import { zData, zDumpError } from "common/schema.js";
2023-12-16 14:15:27 +00:00
import {
2023-12-19 15:01:37 +00:00
2023-12-16 14:15:27 +00:00
} from "./network.js";
import { createWriteStream } from "node:fs";
2023-12-19 15:01:37 +00:00
import pMap from "p-map";
2023-11-27 20:01:56 +00:00
let urls = process.argv.slice(2);
if (process.argv[2] === "retry") {
urls = process.argv.slice(3);
if (urls.length < 1) {
urls = targetsPorDefecto;
2023-11-27 20:01:56 +00:00
/** @typedef {{type: "datajson" | "ckan"; url: string;}} Target */
/** @type {Target[]} */
const targets = => {
if (url.startsWith("datajson+")) {
return { type: "datajson", url: url.slice("datajson+".length) };
} else if (url.startsWith("ckan+")) {
return { type: "ckan", url: url.slice("ckan+".length) };
} else return { type: "datajson", url };
const action = process.argv[2] === "retry" ? retryErrors : downloadEverything;
for (const target of targets)
action(target).catch((error) =>
2023-12-16 14:15:27 +00:00
console.error(`${target.type}+${target.url} FALLÓ CON`, error)
* @param {Target} target
async function downloadEverything(target) {
const outputPath = generateOutputPath(target.url);
2023-12-19 03:55:47 +00:00
const json = await getDataJsonForTarget(target);
2023-12-09 20:10:21 +00:00
await mkdir(outputPath, { recursive: true });
2023-12-19 03:55:47 +00:00
await writeFile(join(outputPath, "data.json"), json);
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
await downloadFiles(target);
* @param {Target} target
async function retryErrors(target) {
const outputPath = generateOutputPath(target.url);
const jsonl = await readFile(join(outputPath, "errors.jsonl"), "utf-8");
const errors = jsonl
.filter((l) => l.length > 0)
.map((line) => zDumpError.parse(JSON.parse(line)));
await downloadFiles(target, (job) =>
(e) =>
e.datasetIdentifier === job.dataset.identifier &&
e.distributionIdentifier === job.dist.identifier
* @param {Target} target
* @param {(job: DownloadJob) => boolean=} filterJobs
async function downloadFiles(target, filterJobs) {
const outputPath = generateOutputPath(target.url);
const json = await readFile(join(outputPath, "data.json"), "utf-8");
const parsed = zData.parse(JSON.parse(json));
let nFinished = 0;
let nErrors = 0;
/** @param {ReturnType<typeof encodeError>} err */
const onError = (err) => {
errorFile.write(JSON.stringify(err) + "\n");
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
flags: "w",
try {
let jobs = jobsFromDataset(parsed.dataset, onError, outputPath);
if (filterJobs) jobs = jobs.filter(filterJobs);
const totalJobs = jobs.length;
// por las dudas verificar que no hayan archivos duplicados
chequearIdsDuplicados(jobs, outputPath);
2023-12-19 15:01:37 +00:00
const promise = pMap(
async (job) => {
try {
return await downloadDistWithRetries(job);
} catch (error) {
onError(encodeError(job, error));
2023-12-19 15:01:37 +00:00
} finally {
{ concurrency: 32 }
2023-11-27 20:01:56 +00:00
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => {
2023-12-16 14:15:27 +00:00
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
}, 30000);
2023-12-19 15:01:37 +00:00
await promise;
if (nErrors > 0)
console.error(`${outputPath}: Finished with ${nErrors} errors`);
} finally {
2023-11-28 01:43:58 +00:00
* @param {import("common/schema.js").Dataset[]} datasets
* @param {(err: ReturnType<typeof encodeError>) => void} onError
* @param {string} outputPath
* @returns {DownloadJob[]}
function jobsFromDataset(datasets, onError, outputPath) {
return datasets.flatMap((dataset) =>
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
(dist) => {
try {
if (!dist.downloadURL) {
throw new Error("No downloadURL in distribution");
patchUrl(new URL(dist.downloadURL));
return true;
} catch (error) {
onError(encodeError({ dataset, dist }, error));
return false;
.map((dist) => ({
url: patchUrl(new URL(dist.downloadURL)),
attempts: 0,
2023-12-19 03:55:47 +00:00
* @param {Target} target
* @returns {Promise<string>}
async function getDataJsonForTarget(target) {
if (target.type === "ckan") {
return JSON.stringify(await generateDataJsonFromCkan(target.url));
} else if (target.type === "datajson") {
2023-12-19 15:01:37 +00:00
const jsonRes = await customRequestWithRetries(new URL(target.url));
2023-12-19 03:55:47 +00:00
return await jsonRes.body.text();
} else throw new Error("?????????????");
* @param {string} jsonUrlString
export function generateOutputPath(jsonUrlString) {
const jsonUrl = new URL(jsonUrlString);
const outputPath = `${}${jsonUrl.pathname}`.replaceAll("/", "_");
return outputPath;
2023-11-28 01:43:58 +00:00
* @argument {DownloadJob} job
2023-12-16 14:15:27 +00:00
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
2023-12-19 15:01:37 +00:00
const res = await customRequestWithRetries(url);
2023-11-27 20:01:56 +00:00
const fileDirPath = join(
2023-12-16 14:15:27 +00:00
2023-11-27 20:01:56 +00:00
await mkdir(fileDirPath, { recursive: true });
const filePath = join(
2023-12-16 14:15:27 +00:00
sanitizeSuffix(dist.fileName || dist.identifier)
2023-11-27 20:01:56 +00:00
if (!res.body) throw new Error("no body");
await writeFile(filePath, res.body);
2023-11-27 20:01:56 +00:00
2023-11-28 01:43:58 +00:00
/** @typedef DownloadJob
2023-12-09 20:10:21 +00:00
* @prop {import("common/schema.js").Dataset} dataset
* @prop {import("common/schema.js").Distribution} dist
2023-11-28 01:43:58 +00:00
* @prop {URL} url
* @prop {string} outputPath
* @prop {number} attempts
* @prop {Date=} waitUntil
2023-11-28 01:43:58 +00:00
2023-11-27 20:01:56 +00:00
* @argument {string} path
function sanitizeSuffix(path) {
return normalize(path).replace(/^(\.\.(\/|\\|$))+/, "");
* @param {DownloadJob[]} jobs
* @param {string} id
function chequearIdsDuplicados(jobs, id) {
2023-11-28 01:43:58 +00:00
const duplicated = hasDuplicates(
2023-12-16 14:15:27 +00:00 => `${j.dataset.identifier}/${j.dist.identifier}`)
2023-11-28 01:43:58 +00:00
if (duplicated) {
2023-12-16 14:15:27 +00:00
`ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`
2023-11-28 01:43:58 +00:00
2023-11-27 20:01:56 +00:00
2023-11-28 01:43:58 +00:00
/** @argument {any[]} array */
2023-11-27 20:01:56 +00:00
function hasDuplicates(array) {
return new Set(array).size !== array.length;
2023-12-09 20:10:21 +00:00
* @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job
* @param {any} error
* @returns {import("common/schema.js").DumpError}
function encodeError(job, error) {
const always = {
2023-11-28 23:53:38 +00:00
url: job.url?.toString() || job.dist.downloadURL,
datasetIdentifier: job.dataset.identifier,
distributionIdentifier: job.dist.identifier,
2023-11-28 01:43:58 +00:00
if (error instanceof StatusCodeError)
return { ...always, kind: "http_error", status_code: error.code };
else if (error instanceof TooManyRedirectsError)
return { ...always, kind: "infinite_redirect" };
2023-11-28 01:43:58 +00:00
else {
return {
kind: "generic_error",
error: error.code || error.message,
2023-11-28 01:43:58 +00:00
2023-11-28 03:41:25 +00:00
* parchea URLs que se rompen solas
* @param {URL} url
function patchUrl(url) {
if ( === "") {
// por defecto, '' redirige a '' pero su certificado solo aplica para '*'. se sirve todo el contenido correctamente en '', así que vamos para ahí.
url.protocol = "https:";
return url;
/** @param {any[]} array */
function shuffleArray(array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];