Compare commits

..

No commits in common. "6b75cea27af7d8c65d7a6d33a3064ccef7170374" and "f006df6089876bbd5d577d9bf629707bbbfaa484" have entirely different histories.

8 changed files with 178 additions and 300 deletions

View file

@ -30,6 +30,14 @@ export const zData = z.object({
});
/** @typedef {z.infer<typeof zData>} Data */
export const zError = z.object({
url: z.string().optional(),
datasetIdentifier: z.string(),
distributionIdentifier: z.string(),
kind: z.enum(["generic_error", "http_error", "infinite_redirect"]),
error: z.string().optional(),
});
export const zDumpMetadata = z.object({
sites: z.array(
z.object({
@ -41,23 +49,3 @@ export const zDumpMetadata = z.object({
),
});
/** @typedef {z.infer<typeof zDumpMetadata>} DumpMetadata */
const zDumpErrorAlways = {
url: z.string().optional(),
datasetIdentifier: z.string(),
distributionIdentifier: z.string(),
};
export const zDumpError = z.discriminatedUnion("kind", [
z.object({
...zDumpErrorAlways,
kind: z.literal("http_error"),
status_code: z.number(),
}),
z.object({ ...zDumpErrorAlways, kind: z.literal("infinite_redirect") }),
z.object({
...zDumpErrorAlways,
kind: z.literal("generic_error"),
error: z.string(),
}),
]);
/** @typedef {z.infer<typeof zDumpError>} DumpError */

View file

@ -1,7 +1,7 @@
import { request } from "undici";
import z from "zod";
import pMap from "p-map";
import { userAgent } from "./config.js";
import { basename } from "path";
import { customRequest } from "./network.js";
const zCkanPackageList = z.object({
success: z.literal(true),
@ -12,7 +12,11 @@ const zCkanPackageList = z.object({
* @param {string} url
*/
async function getJson(url) {
const res = await customRequest(new URL(url));
const res = await request(url, {
headers: {
"User-Agent": userAgent,
},
});
const json = await res.body.json();
return json;
}
@ -115,9 +119,9 @@ async function getCkanInfo(ckanUrl) {
export async function generateDataJsonFromCkan(ckanUrl) {
const list = await getCkanPackageList(ckanUrl);
const info = await getCkanInfo(ckanUrl);
const packages = await pMap(list, (link) => getCkanPackage(ckanUrl, link), {
concurrency: 12,
});
const packages = await Promise.all(
list.map((n) => getCkanPackage(ckanUrl, n))
);
/** @type {import("common/schema.js").Data & { generatedBy: string }} */
const data = {
generatedBy:

View file

@ -4,6 +4,7 @@ export const targetsPorDefecto = [
"datajson+https://datos.magyp.gob.ar/data.json",
"datajson+https://datos.acumar.gov.ar/data.json",
"datajson+https://datasets.datos.mincyt.gob.ar/data.json",
"datajson+https://datos.arsat.com.ar/data.json",
"datajson+https://datos.cultura.gob.ar/data.json",
"datajson+https://datos.mininterior.gob.ar/data.json",
"datajson+https://datos.produccion.gob.ar/data.json",
@ -35,13 +36,14 @@ export const targetsPorDefecto = [
// "datajson+http://andino.siu.edu.ar/data.json",
"datajson+https://monitoreo.datos.gob.ar/catalog/educacion/data.json",
"datajson+https://monitoreo.datos.gob.ar/media/catalog/inti/data.json",
"datajson+https://monitoreo.datos.gob.ar/catalog/ssprys/data.json",
"datajson+https://www.presupuestoabierto.gob.ar/sici/rest-api/catalog/public",
"datajson+https://transparencia.enargas.gob.ar/data.json",
"datajson+https://infra.datos.gob.ar/catalog/sspm/data.json",
"datajson+https://monitoreo.datos.gob.ar/catalog/ssprys/data.json",
"datajson+https://monitoreo.datos.gob.ar/catalog/siep/data.json",
"datajson+https://monitoreo.datos.gob.ar/catalog/exterior/data.json",
"datajson+https://datos.pami.org.ar/data.json",
"datajson+http://datos.pami.org.ar/data.json",
"datajson+https://monitoreo.datos.gob.ar/media/catalog/trabajo/data.json",
"datajson+https://datos.yvera.gob.ar/data.json",
"datajson+https://monitoreo.datos.gob.ar/media/catalog/renaper/data.json",

View file

@ -1,20 +1,33 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { mkdir, open, writeFile } from "node:fs/promises";
import { Agent, fetch, request, setGlobalDispatcher } from "undici";
import { join, normalize } from "node:path";
import { targetsPorDefecto } from "./config.js";
import pLimit from "p-limit";
import { targetsPorDefecto, userAgent } from "./config.js";
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js";
import { zData, zDumpError } from "common/schema.js";
import {
StatusCodeError,
TooManyRedirectsError,
customRequestWithRetries,
} from "./network.js";
import { createWriteStream } from "node:fs";
import pMap from "p-map";
import { zData } from "common/schema.js";
let urls = process.argv.slice(2);
if (process.argv[2] === "retry") {
urls = process.argv.slice(3);
setGlobalDispatcher(
new Agent({
pipelining: 0,
}),
);
/** key es host
* @type {Map<string, import("p-limit").LimitFunction>} */
const limiters = new Map();
const nThreads = process.env.N_THREADS ? parseInt(process.env.N_THREADS) : 8;
class StatusCodeError extends Error {
/**
* @param {number} code
*/
constructor(code) {
super(`Status code: ${code}`);
this.code = code;
}
}
class TooManyRedirectsError extends Error {}
let urls = process.argv.slice(2);
if (urls.length < 1) {
urls = targetsPorDefecto;
}
@ -28,69 +41,64 @@ const targets = urls.map((url) => {
return { type: "ckan", url: url.slice("ckan+".length) };
} else return { type: "datajson", url };
});
const action = process.argv[2] === "retry" ? retryErrors : downloadEverything;
for (const target of targets)
action(target).catch((error) =>
console.error(`${target.type}+${target.url} FALLÓ CON`, error)
downloadFromData(target).catch((error) =>
console.error(`${target.type}+${target.url} FALLÓ CON`, error),
);
/**
* @param {Target} target
*/
async function downloadEverything(target) {
async function downloadFromData(target) {
const outputPath = generateOutputPath(target.url);
const json = await getDataJsonForTarget(target);
let json;
if (target.type === "ckan") {
json = await generateDataJsonFromCkan(target.url);
} else if (target.type === "datajson") {
const jsonRes = await fetch(target.url);
json = await jsonRes.json();
}
const parsed = zData.parse(json);
await mkdir(outputPath, { recursive: true });
await writeFile(join(outputPath, "data.json"), json);
await writeFile(join(outputPath, "data.json"), JSON.stringify(json));
await writeFile(join(outputPath, "url.txt"), `${target.type}+${target.url}`);
await downloadFiles(target);
}
/**
* @param {Target} target
*/
async function retryErrors(target) {
const outputPath = generateOutputPath(target.url);
const jsonl = await readFile(join(outputPath, "errors.jsonl"), "utf-8");
const errors = jsonl
.split("\n")
.filter((l) => l.length > 0)
.map((line) => zDumpError.parse(JSON.parse(line)));
await downloadFiles(target, (job) =>
errors.some(
(e) =>
e.datasetIdentifier === job.dataset.identifier &&
e.distributionIdentifier === job.dist.identifier
)
);
}
/**
* @param {Target} target
* @param {(job: DownloadJob) => boolean=} filterJobs
*/
async function downloadFiles(target, filterJobs) {
const outputPath = generateOutputPath(target.url);
const json = await readFile(join(outputPath, "data.json"), "utf-8");
const parsed = zData.parse(JSON.parse(json));
let nFinished = 0;
let nErrors = 0;
/** @param {ReturnType<typeof encodeError>} err */
const onError = (err) => {
errorFile.write(JSON.stringify(err) + "\n");
nErrors++;
};
const errorFile = createWriteStream(join(outputPath, "errors.jsonl"), {
flags: "w",
});
const errorFile = (
await open(join(outputPath, "errors.jsonl"), "w")
).createWriteStream();
try {
let jobs = jobsFromDataset(parsed.dataset, onError, outputPath);
if (filterJobs) jobs = jobs.filter(filterJobs);
let nFinished = 0;
let nErrors = 0;
/** @type {DownloadJob[]} */
const jobs = parsed.dataset.flatMap((dataset) =>
dataset.distribution
.filter(
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
(dist) => {
try {
if (!dist.downloadURL) {
throw new Error("No downloadURL in distribution");
}
patchUrl(new URL(dist.downloadURL));
return true;
} catch (error) {
errorFile.write(
JSON.stringify(encodeError({ dataset, dist }, error)) + "\n",
);
nErrors++;
return false;
}
},
)
.map((dist) => ({
dataset,
dist,
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
})),
);
const totalJobs = jobs.length;
// por las dudas verificar que no hayan archivos duplicados
@ -98,27 +106,31 @@ async function downloadFiles(target, filterJobs) {
shuffleArray(jobs);
const promise = pMap(
jobs,
async (job) => {
const promises = jobs.map((job) => {
let limit = limiters.get(job.url.host);
if (!limit) {
limit = pLimit(nThreads);
limiters.set(job.url.host, limit);
}
return limit(async () => {
try {
return await downloadDistWithRetries(job);
await downloadDistWithRetries(job);
} catch (error) {
onError(encodeError(job, error));
errorFile.write(JSON.stringify(encodeError(job, error)) + "\n");
nErrors++;
} finally {
nFinished++;
}
},
{ concurrency: 32 }
);
});
});
process.stderr.write(`info[${outputPath}]: 0/${totalJobs} done\n`);
const interval = setInterval(() => {
process.stderr.write(
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`
`info[${outputPath}]: ${nFinished}/${totalJobs} done\n`,
);
}, 30000);
await promise;
await Promise.all(promises);
clearInterval(interval);
if (nErrors > 0)
console.error(`${outputPath}: Finished with ${nErrors} errors`);
@ -127,53 +139,6 @@ async function downloadFiles(target, filterJobs) {
}
}
/**
* @param {import("common/schema.js").Dataset[]} datasets
* @param {(err: ReturnType<typeof encodeError>) => void} onError
* @param {string} outputPath
* @returns {DownloadJob[]}
*/
function jobsFromDataset(datasets, onError, outputPath) {
return datasets.flatMap((dataset) =>
dataset.distribution
.filter(
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
(dist) => {
try {
if (!dist.downloadURL) {
throw new Error("No downloadURL in distribution");
}
patchUrl(new URL(dist.downloadURL));
return true;
} catch (error) {
onError(encodeError({ dataset, dist }, error));
return false;
}
}
)
.map((dist) => ({
dataset,
dist,
url: patchUrl(new URL(dist.downloadURL)),
outputPath,
attempts: 0,
}))
);
}
/**
* @param {Target} target
* @returns {Promise<string>}
*/
async function getDataJsonForTarget(target) {
if (target.type === "ckan") {
return JSON.stringify(await generateDataJsonFromCkan(target.url));
} else if (target.type === "datajson") {
const jsonRes = await customRequestWithRetries(new URL(target.url));
return await jsonRes.body.text();
} else throw new Error("?????????????");
}
/**
* @param {string} jsonUrlString
*/
@ -185,19 +150,67 @@ export function generateOutputPath(jsonUrlString) {
/**
* @argument {DownloadJob} job
* @argument {number} attempts
* @returns {Promise<void>}
*/
async function downloadDistWithRetries({ dist, dataset, url, outputPath }) {
const res = await customRequestWithRetries(url);
async function downloadDistWithRetries(job, attempts = 0) {
const { url } = job;
try {
await downloadDist(job);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && url.host === "cdn.buenosaires.gob.ar")) &&
attempts < 15
) {
await wait(15000);
return await downloadDistWithRetries(job, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con 5 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
attempts < 3
) {
await wait(5000 + Math.random() * 10000);
return await downloadDistWithRetries(job, attempts + 1);
} else throw error;
}
}
/**
* @argument {DownloadJob} job
*/
async function downloadDist({ dist, dataset, url, outputPath }) {
// sharepoint no le gusta compartir a bots lol
const spoofUserAgent = url.host.endsWith("sharepoint.com");
const res = await request(url.toString(), {
maxRedirections: 20,
headers: {
"User-Agent": spoofUserAgent
? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0"
: userAgent,
},
});
if (res.statusCode >= 300 && res.statusCode <= 399)
throw new TooManyRedirectsError();
if (res.statusCode < 200 || res.statusCode > 299) {
throw new StatusCodeError(res.statusCode);
}
const fileDirPath = join(
outputPath,
sanitizeSuffix(dataset.identifier),
sanitizeSuffix(dist.identifier)
sanitizeSuffix(dist.identifier),
);
await mkdir(fileDirPath, { recursive: true });
const filePath = join(
fileDirPath,
sanitizeSuffix(dist.fileName || dist.identifier)
sanitizeSuffix(dist.fileName || dist.identifier),
);
if (!res.body) throw new Error("no body");
@ -227,11 +240,11 @@ function sanitizeSuffix(path) {
*/
function chequearIdsDuplicados(jobs, id) {
const duplicated = hasDuplicates(
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`)
jobs.map((j) => `${j.dataset.identifier}/${j.dist.identifier}`),
);
if (duplicated) {
console.error(
`ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`
`ADVERTENCIA[${id}]: ¡encontré duplicados! es posible que se pisen archivos entre si`,
);
}
}
@ -241,10 +254,14 @@ function hasDuplicates(array) {
return new Set(array).size !== array.length;
}
/** @argument {number} ms */
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* @param {{ dataset: import("common/schema.js").Dataset, dist: import("common/schema.js").Distribution, url?: URL }} job
* @param {any} error
* @returns {import("common/schema.js").DumpError}
*/
function encodeError(job, error) {
const always = {

View file

@ -1,113 +0,0 @@
import { Dispatcher, request, Agent } from "undici";
import { userAgent } from "./config.js";
const dispatcher = new Agent({
connect: { timeout: 60 * 1000 },
bodyTimeout: 15 * 60 * 1000,
maxRedirections: 20,
});
const ignoreTlsDispatcher = new Agent({
connect: {
timeout: 60 * 1000,
rejectUnauthorized: false,
checkServerIdentity() {
return undefined;
},
},
bodyTimeout: 15 * 60 * 1000,
maxRedirections: 20,
});
export class StatusCodeError extends Error {
/**
* @param {number} code
*/
constructor(code) {
super(`Status code: ${code}`);
this.code = code;
}
}
export class TooManyRedirectsError extends Error {}
const REPORT_RETRIES = process.env.REPORT_RETRIES === "true" || false;
/**
* @argument {URL} url
* @argument {number} attempts
* @returns {Promise<Dispatcher.ResponseData>}
*/
export async function customRequestWithRetries(url, attempts = 0) {
try {
return await customRequest(url);
} catch (error) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
((error.code === 403 && url.host === "minsegar-my.sharepoint.com") ||
(error.code === 503 && url.host === "cdn.buenosaires.gob.ar") ||
(error.code === 502 && url.host === "datos.jus.gob.ar")) &&
attempts < 15
) {
if (REPORT_RETRIES)
console.debug(`reintentando(status)[${attempts}] ${url.toString()}`);
await wait(1000 * (attempts + 1) ** 2 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
!(error instanceof StatusCodeError) &&
!(error instanceof TooManyRedirectsError) &&
attempts < 7
) {
if (REPORT_RETRIES)
console.debug(`reintentando[${attempts}] ${url.toString()}`);
await wait(5000 + Math.random() * 10000);
return await customRequestWithRetries(url, attempts + 1);
} else throw error;
}
}
/** @argument {number} ms */
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* genera los headers para hacer un pedido dependiendo de la url
* @param {URL} url
*/
function getHeaders(url) {
// sharepoint no le gusta compartir a bots lol
const spoofUserAgent = url.host.endsWith("sharepoint.com");
return {
"User-Agent": spoofUserAgent
? "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0"
: userAgent,
};
}
/**
* @param {URL} url
*/
export async function customRequest(url) {
let d = dispatcher;
if (
url.hostname === "www.energia.gob.ar" ||
url.hostname === "datos.agroindustria.gob.ar" ||
url.hostname === "www.agroindustria.gob.ar"
) {
d = ignoreTlsDispatcher;
}
const res = await request(url.toString(), {
headers: getHeaders(url),
dispatcher,
});
if (res.statusCode >= 300 && res.statusCode <= 399)
throw new TooManyRedirectsError();
if (res.statusCode < 200 || res.statusCode > 299)
throw new StatusCodeError(res.statusCode);
return res;
}

View file

@ -13,9 +13,7 @@
"dependencies": {
"common": "workspace:",
"p-limit": "^5.0.0",
"p-map": "^7.0.0",
"p-throttle": "^6.1.0",
"undici": "^6.0.1",
"undici": "^5.28.0",
"zod": "^3.22.4"
},
"devDependencies": {

View file

@ -27,8 +27,6 @@
<ul
class="divide-y divide-gray-100 border-y border-y-gray-100 dark:divide-gray-700 dark:border-y-gray-700"
>
<!-- para ver diferencias entre dumps descomprimidos (fish shell): diff -u (find data-2023-12-09 -printf '%P\n' | sort | psub) (find data -printf '%P\n' | sort | psub)|less -->
<!-- nPortales: find . -maxdepth 1 -mindepth 1 -type d | wc -l -->
<!-- nDatasets: jq '.dataset | length' */data.json | awk '{s+=$1} END {print s}' -->
<!-- size: du -sh -->

View file

@ -20,15 +20,9 @@ importers:
p-limit:
specifier: ^5.0.0
version: 5.0.0
p-map:
specifier: ^7.0.0
version: 7.0.0
p-throttle:
specifier: ^6.1.0
version: 6.1.0
undici:
specifier: ^6.0.1
version: 6.0.1
specifier: ^5.28.0
version: 5.28.2
zod:
specifier: ^3.22.4
version: 3.22.4
@ -63,7 +57,7 @@ importers:
devDependencies:
'@poppanator/sveltekit-svg':
specifier: ^4.1.3
version: 4.1.3(svelte@4.2.8)(svgo@3.1.0)(vite@5.0.7)
version: 4.1.3(svelte@4.2.8)(svgo@3.0.5)(vite@5.0.7)
'@sveltejs/vite-plugin-svelte':
specifier: ^3.0.0
version: 3.0.1(svelte@4.2.8)(vite@5.0.7)
@ -379,7 +373,7 @@ packages:
fastq: 1.15.0
dev: true
/@poppanator/sveltekit-svg@4.1.3(svelte@4.2.8)(svgo@3.1.0)(vite@5.0.7):
/@poppanator/sveltekit-svg@4.1.3(svelte@4.2.8)(svgo@3.0.5)(vite@5.0.7):
resolution: {integrity: sha512-cKdFxFPPzS470xy2XFQ2m/URa9On4fw7n5wvBqAwVO4sY8dmski+2N3GKFELt4tvzM3JPjAqz76Ex7U5IpKeIg==}
peerDependencies:
svelte: '>=4.x'
@ -387,7 +381,7 @@ packages:
vite: '>=4.x'
dependencies:
svelte: 4.2.8
svgo: 3.1.0
svgo: 3.0.5
vite: 5.0.7
dev: true
@ -1209,16 +1203,6 @@ packages:
yocto-queue: 1.0.0
dev: false
/p-map@7.0.0:
resolution: {integrity: sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==}
engines: {node: '>=18'}
dev: false
/p-throttle@6.1.0:
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
engines: {node: '>=18'}
dev: false
/parent-module@1.0.1:
resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==}
engines: {node: '>=6'}
@ -1642,8 +1626,8 @@ packages:
periscopic: 3.1.0
dev: true
/svgo@3.1.0:
resolution: {integrity: sha512-R5SnNA89w1dYgNv570591F66v34b3eQShpIBcQtZtM5trJwm1VvxbIoMpRYY3ybTAutcKTLEmTsdnaknOHbiQA==}
/svgo@3.0.5:
resolution: {integrity: sha512-HQKHEo73pMNOlDlBcLgZRcHW2+1wo7bFYayAXkGN0l/2+h68KjlfZyMRhdhaGvoHV2eApOovl12zoFz42sT6rQ==}
engines: {node: '>=14.0.0'}
hasBin: true
dependencies:
@ -1725,9 +1709,9 @@ packages:
resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==}
dev: true
/undici@6.0.1:
resolution: {integrity: sha512-eZFYQLeS9BiXpsU0cuFhCwfeda2MnC48EVmmOz/eCjsTgmyTdaHdVsPSC/kwC2GtW2e0uH0HIPbadf3/bRWSxw==}
engines: {node: '>=18.0'}
/undici@5.28.2:
resolution: {integrity: sha512-wh1pHJHnUeQV5Xa8/kyQhO7WFa8M34l026L5P/+2TYiakvGy5Rdc8jWZVyG7ieht/0WgJLEd3kcU5gKx+6GC8w==}
engines: {node: '>=14.0'}
dependencies:
'@fastify/busboy': 2.1.0
dev: false