permitir reintentar descargar después de que esté generado el dump

This commit is contained in:
Cat /dev/Nulo 2023-12-29 12:27:16 -03:00
parent 20c3deb18e
commit 1e85db9443
2 changed files with 107 additions and 47 deletions

View file

@ -30,14 +30,6 @@ export const zData = z.object({
});
/** @typedef {z.infer<typeof zData>} Data */
export const zError = z.object({
url: z.string().optional(),
datasetIdentifier: z.string(),
distributionIdentifier: z.string(),
kind: z.enum(["generic_error", "http_error", "infinite_redirect"]),
error: z.string().optional(),
});
export const zDumpMetadata = z.object({
sites: z.array(
z.object({
@ -49,3 +41,23 @@ export const zDumpMetadata = z.object({
),
});
/** @typedef {z.infer<typeof zDumpMetadata>} DumpMetadata */
const zDumpErrorAlways = {
url: z.string().optional(),
datasetIdentifier: z.string(),
distributionIdentifier: z.string(),
};
export const zDumpError = z.discriminatedUnion("kind", [
z.object({
...zDumpErrorAlways,
kind: z.literal("http_error"),
status_code: z.number(),
}),
z.object({ ...zDumpErrorAlways, kind: z.literal("infinite_redirect") }),
z.object({
...zDumpErrorAlways,
kind: z.literal("generic_error"),
error: z.string(),
}),
]);
/** @typedef {z.infer<typeof zDumpError>} DumpError */

View file

@ -1,8 +1,8 @@
import { mkdir, writeFile } from "node:fs/promises";
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { join, normalize } from "node:path";
import { targetsPorDefecto } from "./config.js";
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js";
import { zData } from "common/schema.js";
import { zData, zDumpError } from "common/schema.js";
import {
StatusCodeError,
TooManyRedirectsError,
@ -12,6 +12,9 @@ import { createWriteStream } from "node:fs";
import pMap from "p-map";
let urls = process.argv.slice(2);
if (process.argv[2] === "retry") {
urls = process.argv.slice(3);
}
if (urls.length < 1) {
urls = targetsPorDefecto;
}
@ -25,57 +28,69 @@ const targets = urls.map((url) => {
return { type: "ckan", url: url.slice("ckan+".length) };
} else return { type: "datajson", url };
});
const action = process.argv[2] === "retry" ? retryErrors : downloadEverything;
for (const target of targets)
downloadFromData(target).catch((error) =>
action(target).catch((error) =>
console.error(`${target.type}+${target.url} FALLÓ CON`, error)
);
/**
* @param {Target} target
*/
async function downloadFromData(target) {
async function downloadEverything(target) {
const outputPath = generateOutputPath(target.url);
const json = await getDataJsonForTarget(target);
const parsed = zData.parse(JSON.parse(json));
await mkdir(outputPath, { recursive: true });
await writeFile(join(outputPath, "data.json"), json);
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
await downloadFiles(target);
}
/**
* @param {Target} target
*/
async function retryErrors(target) {
const outputPath = generateOutputPath(target.url);
const jsonl = await readFile(join(outputPath, "errors.jsonl"), "utf-8");
const errors = jsonl
.split("\n")
.filter((l) => l.length > 0)
.map((line) => zDumpError.parse(JSON.parse(line)));
await downloadFiles(target, (job) =>
errors.some(
(e) =>
e.datasetIdentifier === job.dataset.identifier &&
e.distributionIdentifier === job.dist.identifier
)
);
}
/**
* @param {Target} target
* @param {(job: DownloadJob) => boolean=} filterJobs
*/
async function downloadFiles(target, filterJobs) {
const outputPath = generateOutputPath(target.url);
const json = await readFile(join(outputPath, "data.json"), "utf-8");
const parsed = zData.parse(JSON.parse(json));
let nFinished = 0;
let nErrors = 0;
/** @param {ReturnType<typeof encodeError>} err */
const onError = (err) => {
errorFile.write(JSON.stringify(err) + "\n");
nErrors++;
};
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
flags: "w",
});
try {
let nFinished = 0;
let nErrors = 0;
/** @type {DownloadJob[]} */
const jobs = parsed.dataset.flatMap((dataset) =>
dataset.distribution
.filter(
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
(dist) => {
try {
if (!dist.downloadURL) {
throw new Error("No downloadURL in distribution");
}
patchUrl(new URL(dist.downloadURL));
return true;
} catch (error) {
errorFile.write(
JSON.stringify(encodeError({ dataset, dist }, error)) + "\n"
);
nErrors++;
return false;
}
}
)
.map((dist) => ({
dataset,
dist,
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
}))
);
let jobs = jobsFromDataset(parsed.dataset, onError, outputPath);
if (filterJobs) jobs = jobs.filter(filterJobs);
const totalJobs = jobs.length;
// por las dudas verificar que no hayan archivos duplicados
@ -89,13 +104,11 @@ async function downloadFromData(target) {
try {
return await downloadDistWithRetries(job);
} catch (error) {
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
onError(encodeError(job, error));
} finally {
nFinished++;
}
},
// en realidad está limitado por el balancedpool
{ concurrency: 32 }
);
@ -114,6 +127,40 @@ async function downloadFromData(target) {
}
}
/**
* @param {import("common/schema.js").Dataset[]} datasets
* @param {(err: ReturnType<typeof encodeError>) => void} onError
* @param {string} outputPath
* @returns {DownloadJob[]}
*/
function jobsFromDataset(datasets, onError, outputPath) {
return datasets.flatMap((dataset) =>
dataset.distribution
.filter(
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
(dist) => {
try {
if (!dist.downloadURL) {
throw new Error("No downloadURL in distribution");
}
patchUrl(new URL(dist.downloadURL));
return true;
} catch (error) {
onError(encodeError({ dataset, dist }, error));
return false;
}
}
)
.map((dist) => ({
dataset,
dist,
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
}))
);
}
/**
* @param {Target} target
* @returns {Promise<string>}
@ -197,6 +244,7 @@ function hasDuplicates(array) {
/**
* @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job
* @param {any} error
* @returns {import("common/schema.js").DumpError}
*/
function encodeError(job, error) {
const always = {