2023-12-18 15:34:41 +00:00
import { mkdir , writeFile } from "node:fs/promises" ;
2023-12-19 15:40:53 +00:00
import { join } from "node:path" ;
2023-12-16 14:15:27 +00:00
import { targetsPorDefecto } from "./config.js" ;
2023-12-09 19:53:49 +00:00
import { generateDataJsonFromCkan } from "./ckan_to_datajson.js" ;
2023-12-09 20:10:21 +00:00
import { zData } from "common/schema.js" ;
2023-12-16 14:15:27 +00:00
import {
StatusCodeError ,
TooManyRedirectsError ,
2023-12-19 15:01:37 +00:00
customRequestWithRetries ,
2023-12-16 14:15:27 +00:00
} from "./network.js" ;
2023-12-16 14:25:46 +00:00
import { createWriteStream } from "node:fs" ;
2023-12-19 15:01:37 +00:00
import pMap from "p-map" ;
2023-12-19 15:40:53 +00:00
import { sanitizeSuffix , shuffleArray , hasDuplicates } from "./utils.js" ;
2023-11-27 20:01:56 +00:00
2023-12-09 19:53:49 +00:00
let urls = process . argv . slice ( 2 ) ;
if ( urls . length < 1 ) {
urls = targetsPorDefecto ;
2023-11-27 20:01:56 +00:00
}
2023-12-09 20:10:10 +00:00
/** @typedef {{type: "datajson" | "ckan"; url: string;}} Target */
2023-12-09 19:53:49 +00:00
/** @type {Target[]} */
const targets = urls . map ( ( url ) => {
if ( url . startsWith ( "datajson+" ) ) {
2023-12-09 20:10:10 +00:00
return { type : "datajson" , url : url . slice ( "datajson+" . length ) } ;
2023-12-09 19:53:49 +00:00
} else if ( url . startsWith ( "ckan+" ) ) {
return { type : "ckan" , url : url . slice ( "ckan+" . length ) } ;
2023-12-09 20:10:10 +00:00
} else return { type : "datajson" , url } ;
2023-12-09 19:53:49 +00:00
} ) ;
for ( const target of targets )
downloadFromData ( target ) . catch ( ( error ) =>
2023-12-16 14:15:27 +00:00
console . error ( ` ${ target . type } + ${ target . url } FALLÓ CON ` , error )
2023-11-28 22:34:31 +00:00
) ;
2023-11-28 21:22:25 +00:00
/ * *
2023-12-09 19:53:49 +00:00
* @ param { Target } target
2023-11-28 21:22:25 +00:00
* /
2023-12-09 19:53:49 +00:00
async function downloadFromData ( target ) {
const outputPath = generateOutputPath ( target . url ) ;
2023-12-19 03:55:47 +00:00
const json = await getDataJsonForTarget ( target ) ;
const parsed = zData . parse ( JSON . parse ( json ) ) ;
2023-12-09 20:10:21 +00:00
2023-11-28 21:22:25 +00:00
await mkdir ( outputPath , { recursive : true } ) ;
2023-12-19 03:55:47 +00:00
await writeFile ( join ( outputPath , "data.json" ) , json ) ;
2023-12-09 19:53:49 +00:00
await writeFile ( join ( outputPath , "url.txt" ) , ` ${ target . type } + ${ target . url } ` ) ;
2023-12-16 14:25:46 +00:00
const errorFile = createWriteStream ( join ( outputPath , "errors.jsonl" ) , {
flags : "w" ,
} ) ;
2023-11-28 22:34:31 +00:00
try {
2023-12-10 04:37:08 +00:00
let nFinished = 0 ;
let nErrors = 0 ;
2023-11-28 22:34:31 +00:00
/** @type {DownloadJob[]} */
const jobs = parsed . dataset . flatMap ( ( dataset ) =>
dataset . distribution
2023-12-09 20:10:21 +00:00
. filter (
/** @returns {dist is import("common/schema.js").Distribution & {downloadURL: string}} */
( dist ) => {
try {
if ( ! dist . downloadURL ) {
throw new Error ( "No downloadURL in distribution" ) ;
}
patchUrl ( new URL ( dist . downloadURL ) ) ;
return true ;
} catch ( error ) {
errorFile . write (
2023-12-16 14:15:27 +00:00
JSON . stringify ( encodeError ( { dataset , dist } , error ) ) + "\n"
2023-12-09 20:10:21 +00:00
) ;
2023-12-10 04:37:08 +00:00
nErrors ++ ;
2023-12-09 20:10:21 +00:00
return false ;
}
2023-12-16 14:15:27 +00:00
}
2023-12-09 20:10:21 +00:00
)
2023-11-28 22:34:31 +00:00
. map ( ( dist ) => ( {
dataset ,
dist ,
url : patchUrl ( new URL ( dist . downloadURL ) ) ,
outputPath ,
attempts : 0 ,
2023-12-16 14:15:27 +00:00
} ) )
2023-11-28 22:34:31 +00:00
) ;
const totalJobs = jobs . length ;
2023-11-28 21:22:25 +00:00
2023-11-28 22:34:31 +00:00
// por las dudas verificar que no hayan archivos duplicados
2023-11-29 00:38:25 +00:00
chequearIdsDuplicados ( jobs , outputPath ) ;
2023-11-28 21:22:25 +00:00
2023-11-28 22:34:31 +00:00
shuffleArray ( jobs ) ;
2023-11-28 21:22:25 +00:00
2023-12-19 15:01:37 +00:00
const promise = pMap (
jobs ,
async ( job ) => {
try {
return await downloadDistWithRetries ( job ) ;
} catch ( error ) {
errorFile . write ( JSON . stringify ( encodeError ( job , error ) ) + "\n" ) ;
nErrors ++ ;
} finally {
nFinished ++ ;
}
} ,
// en realidad está limitado por el balancedpool
{ concurrency : 32 }
) ;
2023-11-27 20:01:56 +00:00
2023-11-29 00:38:25 +00:00
process . stderr . write ( ` info[ ${ outputPath } ]: 0/ ${ totalJobs } done \n ` ) ;
2023-11-28 22:34:31 +00:00
const interval = setInterval ( ( ) => {
process . stderr . write (
2023-12-16 14:15:27 +00:00
` info[ ${ outputPath } ]: ${ nFinished } / ${ totalJobs } done \n `
2023-11-28 22:34:31 +00:00
) ;
} , 30000 ) ;
2023-12-19 15:01:37 +00:00
await promise ;
2023-11-28 22:34:31 +00:00
clearInterval ( interval ) ;
if ( nErrors > 0 )
2023-11-29 00:38:25 +00:00
console . error ( ` ${ outputPath } : Finished with ${ nErrors } errors ` ) ;
2023-11-28 22:34:31 +00:00
} finally {
errorFile . close ( ) ;
}
2023-11-28 01:43:58 +00:00
}
2023-12-19 03:55:47 +00:00
/ * *
* @ param { Target } target
* @ returns { Promise < string > }
* /
async function getDataJsonForTarget ( target ) {
if ( target . type === "ckan" ) {
return JSON . stringify ( await generateDataJsonFromCkan ( target . url ) ) ;
} else if ( target . type === "datajson" ) {
2023-12-19 15:01:37 +00:00
const jsonRes = await customRequestWithRetries ( new URL ( target . url ) ) ;
2023-12-19 03:55:47 +00:00
return await jsonRes . body . text ( ) ;
} else throw new Error ( "?????????????" ) ;
}
2023-12-08 19:05:25 +00:00
/ * *
* @ param { string } jsonUrlString
* /
export function generateOutputPath ( jsonUrlString ) {
const jsonUrl = new URL ( jsonUrlString ) ;
const outputPath = ` ${ jsonUrl . host } ${ jsonUrl . pathname } ` . replaceAll ( "/" , "_" ) ;
return outputPath ;
}
2023-11-28 01:43:58 +00:00
/ * *
* @ argument { DownloadJob } job
* /
2023-12-16 14:15:27 +00:00
async function downloadDistWithRetries ( { dist , dataset , url , outputPath } ) {
2023-12-19 15:01:37 +00:00
const res = await customRequestWithRetries ( url ) ;
2023-11-27 20:01:56 +00:00
const fileDirPath = join (
outputPath ,
sanitizeSuffix ( dataset . identifier ) ,
2023-12-16 14:15:27 +00:00
sanitizeSuffix ( dist . identifier )
2023-11-27 20:01:56 +00:00
) ;
await mkdir ( fileDirPath , { recursive : true } ) ;
const filePath = join (
fileDirPath ,
2023-12-16 14:15:27 +00:00
sanitizeSuffix ( dist . fileName || dist . identifier )
2023-11-27 20:01:56 +00:00
) ;
if ( ! res . body ) throw new Error ( "no body" ) ;
2023-11-28 21:22:25 +00:00
await writeFile ( filePath , res . body ) ;
2023-11-27 20:01:56 +00:00
}
2023-11-28 01:43:58 +00:00
/ * * @ t y p e d e f D o w n l o a d J o b
2023-12-09 20:10:21 +00:00
* @ prop { import ( "common/schema.js" ) . Dataset } dataset
* @ prop { import ( "common/schema.js" ) . Distribution } dist
2023-11-28 01:43:58 +00:00
* @ prop { URL } url
2023-11-28 21:22:25 +00:00
* @ prop { string } outputPath
* @ prop { number } attempts
* @ prop { Date = } waitUntil
2023-11-28 01:43:58 +00:00
* /
2023-11-27 20:01:56 +00:00
2023-11-28 21:22:25 +00:00
/ * *
* @ param { DownloadJob [ ] } jobs
2023-11-29 00:38:25 +00:00
* @ param { string } id
2023-11-28 21:22:25 +00:00
* /
2023-11-29 00:38:25 +00:00
function chequearIdsDuplicados ( jobs , id ) {
2023-11-28 01:43:58 +00:00
const duplicated = hasDuplicates (
2023-12-16 14:15:27 +00:00
jobs . map ( ( j ) => ` ${ j . dataset . identifier } / ${ j . dist . identifier } ` )
2023-11-28 01:43:58 +00:00
) ;
if ( duplicated ) {
console . error (
2023-12-16 14:15:27 +00:00
` ADVERTENCIA[ ${ id } ]: ¡encontré duplicados! es posible que se pisen archivos entre si `
2023-11-28 01:43:58 +00:00
) ;
}
}
2023-11-27 20:01:56 +00:00
2023-11-28 22:57:35 +00:00
/ * *
2023-12-09 20:10:21 +00:00
* @ param { { dataset : import ( "common/schema.js" ) . Dataset , dist : import ( "common/schema.js" ) . Distribution , url ? : URL } } job
2023-11-28 22:57:35 +00:00
* @ param { any } error
* /
function encodeError ( job , error ) {
const always = {
2023-11-28 23:53:38 +00:00
url : job . url ? . toString ( ) || job . dist . downloadURL ,
2023-11-28 22:57:35 +00:00
datasetIdentifier : job . dataset . identifier ,
distributionIdentifier : job . dist . identifier ,
} ;
2023-11-28 01:43:58 +00:00
if ( error instanceof StatusCodeError )
2023-11-28 22:57:35 +00:00
return { ... always , kind : "http_error" , status _code : error . code } ;
2023-11-28 02:19:09 +00:00
else if ( error instanceof TooManyRedirectsError )
2023-11-28 22:57:35 +00:00
return { ... always , kind : "infinite_redirect" } ;
2023-11-28 01:43:58 +00:00
else {
2023-11-28 22:57:35 +00:00
return {
... always ,
kind : "generic_error" ,
error : error . code || error . message ,
} ;
2023-11-28 01:43:58 +00:00
}
}
2023-11-28 03:41:25 +00:00
/ * *
* parchea URLs que se rompen solas
* @ param { URL } url
* /
function patchUrl ( url ) {
if ( url . host === "www.ign.gob.ar" ) {
// por defecto, 'http://www.ign.gob.ar' redirige a 'https://ign.gob.ar' pero su certificado solo aplica para '*.ign.gob.ar'. se sirve todo el contenido correctamente en 'https://www.ign.gob.ar', así que vamos para ahí.
url . protocol = "https:" ;
}
return url ;
}