diff --git a/common/schema.js b/common/schema.js index ed76540..96b0d4f 100644 --- a/common/schema.js +++ b/common/schema.js @@ -30,14 +30,6 @@ export const zData = z.object({ }); /** @typedef {z.infer} Data */ -export const zError = z.object({ - url: z.string().optional(), - datasetIdentifier: z.string(), - distributionIdentifier: z.string(), - kind: z.enum(["generic_error", "http_error", "infinite_redirect"]), - error: z.string().optional(), -}); - export const zDumpMetadata = z.object({ sites: z.array( z.object({ @@ -49,3 +41,23 @@ export const zDumpMetadata = z.object({ ), }); /** @typedef {z.infer} DumpMetadata */ + +const zDumpErrorAlways = { + url: z.string().optional(), + datasetIdentifier: z.string(), + distributionIdentifier: z.string(), +}; +export const zDumpError = z.discriminatedUnion("kind", [ + z.object({ + ...zDumpErrorAlways, + kind: z.literal("http_error"), + status_code: z.number(), + }), + z.object({ ...zDumpErrorAlways, kind: z.literal("infinite_redirect") }), + z.object({ + ...zDumpErrorAlways, + kind: z.literal("generic_error"), + error: z.string(), + }), +]); +/** @typedef {z.infer} DumpError */ diff --git a/downloader/download_json.js b/downloader/download_json.js index 9aeebdf..98b7b3a 100644 --- a/downloader/download_json.js +++ b/downloader/download_json.js @@ -1,8 +1,8 @@ -import { mkdir, writeFile } from "node:fs/promises"; +import { mkdir, readFile, writeFile } from "node:fs/promises"; import { join, normalize } from "node:path"; import { targetsPorDefecto } from "./config.js"; import { generateDataJsonFromCkan } from "./ckan_to_datajson.js"; -import { zData } from "common/schema.js"; +import { zData, zDumpError } from "common/schema.js"; import { StatusCodeError, TooManyRedirectsError, @@ -12,6 +12,9 @@ import { createWriteStream } from "node:fs"; import pMap from "p-map"; let urls = process.argv.slice(2); +if (process.argv[2] === "retry") { + urls = process.argv.slice(3); +} if (urls.length < 1) { urls = targetsPorDefecto; } @@ -25,57 +28,69 @@ const targets = urls.map((url) => { return { type: "ckan", url: url.slice("ckan+".length) }; } else return { type: "datajson", url }; }); +const action = process.argv[2] === "retry" ? retryErrors : downloadEverything; for (const target of targets) - downloadFromData(target).catch((error) => + action(target).catch((error) => console.error(`${target.type}+${target.url} FALLÓ CON`, error) ); /** * @param {Target} target */ -async function downloadFromData(target) { +async function downloadEverything(target) { const outputPath = generateOutputPath(target.url); const json = await getDataJsonForTarget(target); - const parsed = zData.parse(JSON.parse(json)); await mkdir(outputPath, { recursive: true }); await writeFile(join(outputPath, "data.json"), json); await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`); + + await downloadFiles(target); +} + +/** + * @param {Target} target + */ +async function retryErrors(target) { + const outputPath = generateOutputPath(target.url); + const jsonl = await readFile(join(outputPath, "errors.jsonl"), "utf-8"); + const errors = jsonl + .split("\n") + .filter((l) => l.length > 0) + .map((line) => zDumpError.parse(JSON.parse(line))); + + await downloadFiles(target, (job) => + errors.some( + (e) => + e.datasetIdentifier === job.dataset.identifier && + e.distributionIdentifier === job.dist.identifier + ) + ); +} + +/** + * @param {Target} target + * @param {(job: DownloadJob) => boolean=} filterJobs + */ +async function downloadFiles(target, filterJobs) { + const outputPath = generateOutputPath(target.url); + const json = await readFile(join(outputPath, "data.json"), "utf-8"); + const parsed = zData.parse(JSON.parse(json)); + + let nFinished = 0; + let nErrors = 0; + /** @param {ReturnType} err */ + const onError = (err) => { + errorFile.write(JSON.stringify(err) + "\n"); + nErrors++; + }; + const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), { flags: "w", }); try { - let nFinished = 0; - let nErrors = 0; - /** @type {DownloadJob[]} */ - const jobs = parsed.dataset.flatMap((dataset) => - dataset.distribution - .filter( - /** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */ - (dist) => { - try { - if (!dist.downloadURL) { - throw new Error("No downloadURL in distribution"); - } - patchUrl(new URL(dist.downloadURL)); - return true; - } catch (error) { - errorFile.write( - JSON.stringify(encodeError({ dataset, dist }, error)) + "\n" - ); - nErrors++; - return false; - } - } - ) - .map((dist) => ({ - dataset, - dist, - url: patchUrl(new URL(dist.downloadURL)), - outputPath, - attempts: 0, - })) - ); + let jobs = jobsFromDataset(parsed.dataset, onError, outputPath); + if (filterJobs) jobs = jobs.filter(filterJobs); const totalJobs = jobs.length; // por las dudas verificar que no hayan archivos duplicados @@ -89,13 +104,11 @@ async function downloadFromData(target) { try { return await downloadDistWithRetries(job); } catch (error) { - errorFile.write(JSON.stringify(encodeError(job, error)) + "\n"); - nErrors++; + onError(encodeError(job, error)); } finally { nFinished++; } }, - // en realidad está limitado por el balancedpool { concurrency: 32 } ); @@ -114,6 +127,40 @@ async function downloadFromData(target) { } } +/** + * @param {import("common/schema.js").Dataset[]} datasets + * @param {(err: ReturnType) => void} onError + * @param {string} outputPath + * @returns {DownloadJob[]} + */ +function jobsFromDataset(datasets, onError, outputPath) { + return datasets.flatMap((dataset) => + dataset.distribution + .filter( + /** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */ + (dist) => { + try { + if (!dist.downloadURL) { + throw new Error("No downloadURL in distribution"); + } + patchUrl(new URL(dist.downloadURL)); + return true; + } catch (error) { + onError(encodeError({ dataset, dist }, error)); + return false; + } + } + ) + .map((dist) => ({ + dataset, + dist, + url: patchUrl(new URL(dist.downloadURL)), + outputPath, + attempts: 0, + })) + ); +} + /** * @param {Target} target * @returns {Promise} @@ -197,6 +244,7 @@ function hasDuplicates(array) { /** * @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job * @param {any} error + * @returns {import("common/schema.js").DumpError} */ function encodeError(job, error) { const always = {