2023-12-18 15:34:41 +00:00
import { mkdir , writeFile } from "node:fs/promises" ;
2023-12-19 15:40:53 +00:00
import { join } from "node:path" ;
2023-12-16 14:15:27 +00:00
import { targetsPorDefecto } from "./config.js" ;
2023-12-09 19:53:49 +00:00
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js" ;
2023-12-09 20:10:21 +00:00
import { zData } from "common/schema.js" ;
2023-12-16 14:15:27 +00:00
import {
StatusCodeError ,
TooManyRedirectsError ,
2023-12-19 18:48:19 +00:00
customRequest ,
2023-12-16 14:15:27 +00:00
} from "./network.js" ;
2023-12-19 18:48:19 +00:00
import { WriteStream , createWriteStream } from "node:fs" ;
import {
sanitizeSuffix ,
shuffleArray ,
hasDuplicates ,
waitUntil ,
} from "./utils.js" ;
import fastq from "fastq" ;
2023-11-27 20:01:56 +00:00
2023-12-09 19:53:49 +00:00
let urls = process . argv . slice ( 2 ) ;
if ( urls . length < 1 ) {
urls = targetsPorDefecto ;
2023-11-27 20:01:56 +00:00
}
2023-12-09 20:10:10 +00:00
/** @typedef {{type: "datajson" | "ckan"; url: string;}} Target */
2023-12-09 19:53:49 +00:00
/** @type {Target[]} */
const targets = urls . map ( ( url ) => {
if ( url . startsWith ( "datajson+" ) ) {
2023-12-09 20:10:10 +00:00
return { type : "datajson" , url : url . slice ( "datajson+" . length ) } ;
2023-12-09 19:53:49 +00:00
} else if ( url . startsWith ( "ckan+" ) ) {
return { type : "ckan" , url : url . slice ( "ckan+" . length ) } ;
2023-12-09 20:10:10 +00:00
} else return { type : "datajson" , url } ;
2023-12-09 19:53:49 +00:00
} ) ;
for ( const target of targets )
downloadFromData ( target ) . catch ( ( error ) =>
2023-12-16 14:15:27 +00:00
console . error ( ` ${ target . type } + ${ target . url } FALLÓ CON ` , error )
2023-11-28 22:34:31 +00:00
) ;
2023-11-28 21:22:25 +00:00
2023-12-19 18:48:19 +00:00
let nTotal = 0 ;
let nFinished = 0 ;
let nErrors = 0 ;
const queue = fastq . promise ( null , downloadDistWithRetries , 32 ) ;
const interval = setInterval ( ( ) => {
process . stderr . write ( ` info: ${ nFinished } / ${ nTotal } done \n ` ) ;
} , 10000 ) ;
await queue . drained ( ) ;
clearInterval ( interval ) ;
2023-11-28 21:22:25 +00:00
/ * *
2023-12-09 19:53:49 +00:00
* @ param { Target } target
2023-11-28 21:22:25 +00:00
* /
2023-12-09 19:53:49 +00:00
async function downloadFromData ( target ) {
const outputPath = generateOutputPath ( target . url ) ;
2023-12-19 03:55:47 +00:00
const json = await getDataJsonForTarget ( target ) ;
const parsed = zData . parse ( JSON . parse ( json ) ) ;
2023-12-09 20:10:21 +00:00
2023-11-28 21:22:25 +00:00
await mkdir ( outputPath , { recursive : true } ) ;
2023-12-19 03:55:47 +00:00
await writeFile ( join ( outputPath , "data.json" ) , json ) ;
2023-12-09 19:53:49 +00:00
await writeFile ( join ( outputPath , "url.txt" ) , ` ${ target . type } + ${ target . url } ` ) ;
2023-12-16 14:25:46 +00:00
const errorFile = createWriteStream ( join ( outputPath , "errors.jsonl" ) , {
flags : "w" ,
} ) ;
2023-11-28 22:34:31 +00:00
try {
/** @type {DownloadJob[]} */
const jobs = parsed . dataset . flatMap ( ( dataset ) =>
dataset . distribution
2023-12-09 20:10:21 +00:00
. filter (
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
( dist ) => {
try {
if ( ! dist . downloadURL ) {
throw new Error ( "No downloadURL in distribution" ) ;
}
patchUrl ( new URL ( dist . downloadURL ) ) ;
return true ;
} catch ( error ) {
errorFile . write (
2023-12-16 14:15:27 +00:00
JSON . stringify ( encodeError ( { dataset , dist } , error ) ) + "\n"
2023-12-09 20:10:21 +00:00
) ;
2023-12-10 04:37:08 +00:00
nErrors ++ ;
2023-12-09 20:10:21 +00:00
return false ;
}
2023-12-16 14:15:27 +00:00
}
2023-12-09 20:10:21 +00:00
)
2023-11-28 22:34:31 +00:00
. map ( ( dist ) => ( {
dataset ,
dist ,
url : patchUrl ( new URL ( dist . downloadURL ) ) ,
outputPath ,
attempts : 0 ,
2023-12-19 18:48:19 +00:00
errorFile ,
2023-12-16 14:15:27 +00:00
} ) )
2023-11-28 22:34:31 +00:00
) ;
2023-12-19 18:48:19 +00:00
nTotal += jobs . length ;
2023-11-28 21:22:25 +00:00
2023-11-28 22:34:31 +00:00
// por las dudas verificar que no hayan archivos duplicados
2023-11-29 00:38:25 +00:00
chequearIdsDuplicados ( jobs , outputPath ) ;
2023-11-28 21:22:25 +00:00
2023-11-28 22:34:31 +00:00
shuffleArray ( jobs ) ;
2023-11-28 21:22:25 +00:00
2023-12-19 18:48:19 +00:00
for ( const job of jobs ) queue . push ( job ) ;
await queue . drained ( ) ;
2023-11-28 22:34:31 +00:00
} finally {
errorFile . close ( ) ;
}
2023-11-28 01:43:58 +00:00
}
2023-12-19 03:55:47 +00:00
/ * *
* @ param { Target } target
* @ returns { Promise < string > }
* /
async function getDataJsonForTarget ( target ) {
if ( target . type === "ckan" ) {
return JSON . stringify ( await generateDataJsonFromCkan ( target . url ) ) ;
} else if ( target . type === "datajson" ) {
2023-12-19 18:48:19 +00:00
const jsonRes = await customRequest ( new URL ( target . url ) ) ;
2023-12-19 03:55:47 +00:00
return await jsonRes . body . text ( ) ;
} else throw new Error ( "?????????????" ) ;
}
2023-12-08 19:05:25 +00:00
/ * *
* @ param { string } jsonUrlString
* /
export function generateOutputPath ( jsonUrlString ) {
const jsonUrl = new URL ( jsonUrlString ) ;
const outputPath = ` ${ jsonUrl . host } ${ jsonUrl . pathname } ` . replaceAll ( "/" , "_" ) ;
return outputPath ;
}
2023-11-28 21:22:25 +00:00
/ * *
* @ param { DownloadJob [ ] } jobs
2023-11-29 00:38:25 +00:00
* @ param { string } id
2023-11-28 21:22:25 +00:00
* /
2023-11-29 00:38:25 +00:00
function chequearIdsDuplicados ( jobs , id ) {
2023-11-28 01:43:58 +00:00
const duplicated = hasDuplicates (
2023-12-16 14:15:27 +00:00
jobs . map ( ( j ) => ` ${ j . dataset . identifier } / ${ j . dist . identifier } ` )
2023-11-28 01:43:58 +00:00
) ;
if ( duplicated ) {
console . error (
2023-12-16 14:15:27 +00:00
` ADVERTENCIA[ ${ id } ]: ¡encontré duplicados! es posible que se pisen archivos entre si `
2023-11-28 01:43:58 +00:00
) ;
}
}
2023-11-27 20:01:56 +00:00
2023-11-28 22:57:35 +00:00
/ * *
2023-12-09 20:10:21 +00:00
* @ param { { dataset : import ( "common/schema.js" ) . Dataset , dist : import ( "common/schema.js" ) . Distribution , url ? : URL } } job
2023-11-28 22:57:35 +00:00
* @ param { any } error
* /
function encodeError ( job , error ) {
const always = {
2023-11-28 23:53:38 +00:00
url : job . url ? . toString ( ) || job . dist . downloadURL ,
2023-11-28 22:57:35 +00:00
datasetIdentifier : job . dataset . identifier ,
distributionIdentifier : job . dist . identifier ,
} ;
2023-11-28 01:43:58 +00:00
if ( error instanceof StatusCodeError )
2023-11-28 22:57:35 +00:00
return { ... always , kind : "http_error" , status _code : error . code } ;
2023-11-28 02:19:09 +00:00
else if ( error instanceof TooManyRedirectsError )
2023-11-28 22:57:35 +00:00
return { ... always , kind : "infinite_redirect" } ;
2023-11-28 01:43:58 +00:00
else {
2023-11-28 22:57:35 +00:00
return {
... always ,
kind : "generic_error" ,
error : error . code || error . message ,
} ;
2023-11-28 01:43:58 +00:00
}
}
2023-11-28 03:41:25 +00:00
/ * *
* parchea URLs que se rompen solas
* @ param { URL } url
* /
function patchUrl ( url ) {
if ( url . host === "www.ign.gob.ar" ) {
// por defecto, 'http://www.ign.gob.ar' redirige a 'https://ign.gob.ar' pero su certificado solo aplica para '*.ign.gob.ar'. se sirve todo el contenido correctamente en 'https://www.ign.gob.ar', así que vamos para ahí.
url . protocol = "https:" ;
}
return url ;
}
2023-12-19 18:48:19 +00:00
/ * * @ t y p e d e f D o w n l o a d J o b
* @ prop { import ( "common/schema.js" ) . Dataset } dataset
* @ prop { import ( "common/schema.js" ) . Distribution } dist
* @ prop { URL } url
* @ prop { string } outputPath
* @ prop { number } attempts
* @ prop { Date = } waitUntil
* @ prop { WriteStream } errorFile
* /
const REPORT _RETRIES = process . env . REPORT _RETRIES === "true" || false ;
/ * *
* @ argument { DownloadJob } job
* /
async function downloadDistWithRetries ( job ) {
try {
if ( job . waitUntil ) await waitUntil ( job . waitUntil ) ;
await downloadDist ( job ) ;
nFinished ++ ;
} catch ( error ) {
// algunos servidores usan 403 como coso para decir "calmate"
// intentar hasta 15 veces con 15 segundos de por medio
if (
error instanceof StatusCodeError &&
( ( error . code === 403 && job . url . host === "minsegar-my.sharepoint.com" ) ||
( error . code === 503 && job . url . host === "cdn.buenosaires.gob.ar" ) ) &&
job . attempts < 15
) {
if ( REPORT _RETRIES )
console . debug (
` reintentando(status)[ ${ job . attempts } ] ${ job . url . toString ( ) } `
) ;
queue . push ( {
... job ,
waitUntil : new Date (
Date . now ( ) + 1000 * ( job . attempts + 1 ) * * 2 + Math . random ( ) * 10000
) ,
attempts : job . attempts + 1 ,
} ) ;
}
// si no fue un error de http, reintentar hasta 3 veces con ~10 segundos de por medio
else if (
! ( error instanceof StatusCodeError ) &&
! ( error instanceof TooManyRedirectsError ) &&
job . attempts < 7
) {
if ( REPORT _RETRIES )
console . debug ( ` reintentando[ ${ job . attempts } ] ${ job . url . toString ( ) } ` ) ;
queue . push ( {
... job ,
waitUntil : new Date (
Date . now ( ) + 1000 * ( job . attempts + 1 ) * * 2 + Math . random ( ) * 10000
) ,
attempts : job . attempts + 1 ,
} ) ;
} else {
job . errorFile . write ( JSON . stringify ( encodeError ( job , error ) ) + "\n" ) ;
nErrors ++ ;
nFinished ++ ;
}
}
}
/ * *
* @ argument { DownloadJob } job
* /
async function downloadDist ( { url , outputPath , dataset , dist } ) {
const res = await customRequest ( url ) ;
const fileDirPath = join (
outputPath ,
sanitizeSuffix ( dataset . identifier ) ,
sanitizeSuffix ( dist . identifier )
) ;
await mkdir ( fileDirPath , { recursive : true } ) ;
const filePath = join (
fileDirPath ,
sanitizeSuffix ( dist . fileName || dist . identifier )
) ;
if ( ! res . body ) throw new Error ( "no body" ) ;
await writeFile ( filePath , res . body ) ;
}