2024-11-24 02:07:38 +00:00
// @ts-check
2024-11-24 13:50:50 +00:00
import * as fsp from "fs/promises" ;
import * as fs from "fs" ;
import { CsvParserStream , parse , parseString } from "fast-csv" ;
2024-11-24 02:07:38 +00:00
import { basename , join , dirname } from "path" ;
import { $ } from "zx" ;
import PQueue from "p-queue" ;
import { Database } from "duckdb-async" ;
import fg from "fast-glob" ;
2024-11-24 13:50:50 +00:00
import {
DuckDBAppender ,
DuckDBConnection ,
DuckDBInstance ,
} from "@duckdb/node-api" ;
import Papa from "papaparse" ;
2024-11-24 20:38:44 +00:00
import { writeFile } from "fs/promises" ;
2024-11-24 02:07:38 +00:00
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
2024-11-24 20:38:44 +00:00
const instance = await DuckDBInstance . create ( "importer.db" , {
2024-12-02 02:09:44 +00:00
// threads: "1",
2024-11-24 20:38:44 +00:00
} ) ;
2024-11-24 02:07:38 +00:00
2024-12-02 02:09:44 +00:00
const queue = new PQueue ( { concurrency : 4 } ) ;
2024-11-24 13:50:50 +00:00
let hasTars = false ;
const files = await fg ( "**/*.tar.zst" , { cwd : process . argv [ 2 ] } ) ;
for ( const file of files ) {
hasTars = true ;
const tar = join ( process . argv [ 2 ] , file ) ;
queue . add ( ( ) => importDatasetTar ( tar ) ) ;
}
await queue . onIdle ( ) ;
if ( ! hasTars ) {
await importDump ( process . argv [ 2 ] ) ;
2024-11-24 02:07:38 +00:00
}
/ * *
2024-11-24 13:50:50 +00:00
* @ param { DuckDBConnection } connection
2024-11-24 02:07:38 +00:00
* @ param { number } datasetId
* @ param { string } dir
* /
2024-11-24 13:50:50 +00:00
async function importSucursales ( connection , datasetId , dir ) {
const stream = await createReadStream ( join ( dir , "sucursales.csv" ) ) ;
const appender = await connection . createAppender ( "main" , "sucursales" ) ;
2024-11-24 02:07:38 +00:00
2024-11-24 13:50:50 +00:00
stream
. pipe (
parse ( { headers : true , delimiter : "|" , ignoreEmpty : true , trim : true } )
)
. on ( "data" , ( data ) => {
if ( ! data . id _comercio || ! data . id _bandera || ! data . id _sucursal ) {
return ;
}
if ( data . sucursales _domingohorario _atencion ) {
2024-11-24 02:07:38 +00:00
data . sucursales _domingo _horario _atencion =
data . sucursales _domingohorario _atencion ;
delete data . sucursales _domingohorario _atencion ;
}
data . sucursales _nombre = data . sucursales _nombre
. replaceAll ( "\t" , " " )
. trim ( ) ;
2024-11-24 13:50:50 +00:00
if ( ! ( "sucursales_longitud" in data ) ) {
console . debug ( { data } ) ;
throw new Error (
"Alberdi S.A. strikes again! las sucursales están rotas."
) ;
}
appender . appendInteger ( datasetId ) ;
appender . appendInteger ( parseInt ( data . id _comercio ) ) ;
appender . appendInteger ( parseInt ( data . id _bandera ) ) ;
appender . appendInteger ( parseInt ( data . id _sucursal ) ) ;
appender . appendVarchar ( data . sucursales _nombre ) ;
appender . appendVarchar ( data . sucursales _tipo ) ;
appender . appendVarchar ( data . sucursales _calle ) ;
appender . appendVarchar ( data . sucursales _numero ) ;
2024-12-01 19:58:14 +00:00
/** @type {[number, number]} */
let [ lat , lon ] = [
parseFloat ( data . sucursales _latitud ) ,
parseFloat ( data . sucursales _longitud ) ,
] ;
if ( isNaN ( lat ) || isNaN ( lon ) ) {
appender . appendNull ( ) ;
appender . appendNull ( ) ;
} else {
appender . appendDouble ( lat ) ;
appender . appendDouble ( lon ) ;
}
2024-11-24 13:50:50 +00:00
appender . appendVarchar ( data . sucursales _observaciones ) ;
appender . appendVarchar ( data . sucursales _barrio ) ;
appender . appendVarchar ( data . sucursales _codigo _postal ) ;
appender . appendVarchar ( data . sucursales _localidad ) ;
appender . appendVarchar ( data . sucursales _provincia ) ;
appender . appendVarchar ( data . sucursales _lunes _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _martes _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _miercoles _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _jueves _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _viernes _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _sabado _horario _atencion ) ;
appender . appendVarchar ( data . sucursales _domingo _horario _atencion ) ;
appender . endRow ( ) ;
} )
. on ( "error" , ( err ) => {
console . error ( err ) ;
} ) ;
await new Promise ( ( resolve ) => stream . on ( "end" , resolve ) ) ;
await appender . close ( ) ;
2024-11-24 02:07:38 +00:00
}
/ * *
2024-11-24 13:50:50 +00:00
* @ param { DuckDBConnection } connection
* @ param { number } datasetId
* @ param { string } dir
2024-11-24 02:07:38 +00:00
* /
2024-11-24 13:50:50 +00:00
async function importBanderas ( connection , datasetId , dir ) {
const stream = await createReadStream ( join ( dir , "comercio.csv" ) ) ;
const appender = await connection . createAppender ( "main" , "banderas" ) ;
stream
. pipe (
parse ( { headers : true , delimiter : "|" , ignoreEmpty : true , trim : true } )
)
. on ( "data" , ( data ) => {
if ( ! data . id _comercio || ! data . id _bandera ) return ;
appender . appendInteger ( datasetId ) ;
appender . appendInteger ( parseInt ( data . id _comercio ) ) ;
appender . appendInteger ( parseInt ( data . id _bandera ) ) ;
appender . appendVarchar ( data . comercio _cuit ) ;
appender . appendVarchar ( data . comercio _razon _social ) ;
appender . appendVarchar ( data . comercio _bandera _nombre ) ;
appender . appendVarchar ( data . comercio _bandera _url ) ;
appender . appendVarchar ( data . comercio _ultima _actualizacion ) ;
appender . appendVarchar ( data . comercio _version _sepa ) ;
appender . endRow ( ) ;
} )
. on ( "error" , ( err ) => {
console . error ( err ) ;
} ) ;
await new Promise ( ( resolve ) => stream . on ( "end" , resolve ) ) ;
await appender . close ( ) ;
2024-11-24 02:07:38 +00:00
}
/ * *
2024-11-24 13:50:50 +00:00
* @ param { DuckDBConnection } connection
2024-11-24 02:07:38 +00:00
* @ param { number } datasetId
* @ param { string } dir
* /
2024-11-24 13:50:50 +00:00
async function importPrecios ( connection , datasetId , dir ) {
const { comercioCuit } = await getComercioMetadata ( dir ) ;
2024-11-24 20:38:44 +00:00
if (
[
"30707429468" ,
"30589621499" ,
"30663005843" ,
// Alberdi S.A. -- escriben id_producto en formato 7,790127e+012
"30578411174" ,
] . includes ( comercioCuit )
) {
2024-11-24 13:50:50 +00:00
// TODO: si tienen los valores, pero con otros nombres, por ejemplo
// productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva.
// pero no quiero mentir, asi que por ahora no lo importo
throw new Error (
` No voy a importar el dataset ${ dir } porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b `
2024-11-24 02:07:38 +00:00
) ;
2024-11-24 13:50:50 +00:00
}
2024-11-24 20:38:44 +00:00
if ( comercioCuit == "30543659734" ) {
throw new Error ( "Megatone envia archivos vacios que dicen 'error'. lol." ) ;
}
2024-11-24 02:07:38 +00:00
2024-11-24 20:38:44 +00:00
const sourceCsvPath = join ( dir , "productos.csv" ) ;
const temp = await fsp . mkdtemp ( "/tmp/sepa-precios-importer-csv-cleaner-" ) ;
try {
const fixedCsvPath = join ( temp , "productos.csv" ) ;
// /** @type {CsvParserStream<any,any>} */
// let csvStream;
// const appender = await connection.createAppender("main", "precios");
if ( comercioCuit == "30612929455" ) {
// Libertad S.A.
const file = ( await readFile ( sourceCsvPath ) )
. replaceAll ( "|RAPTOR 6X16X45" , "/RAPTOR 6X16X45" )
. replace ( /\r?\n *\r?\n[uúUÚÃ]/giu , "" ) ;
await writeFile ( fixedCsvPath , file ) ;
} else if ( comercioCuit == "30578411174" ) {
// Alberdi S.A.
const file = ( await readFile ( sourceCsvPath ) ) . replaceAll ( ";" , "|" ) ;
await writeFile ( fixedCsvPath , file ) ;
// TODO: remove ultima actualizacion
} else {
let file = await readFile ( sourceCsvPath ) ;
file = file . replace ( /\r?\n( )?\0? *\r?\n"?[uúUÚ]/giu , "" ) ;
file = file . replaceAll ( /[ \t]*\n/g , "\n" ) ;
await writeFile ( fixedCsvPath , file ) ;
}
const sql = ` insert into precios select ${ datasetId } as id_dataset, * from read_csv(' ${ fixedCsvPath } ', delim='|', header=true, nullstr='') ` ;
console . debug ( "sql" , sql ) ;
await connection . run ( sql ) ;
await fsp . rm ( temp , { recursive : true } ) ;
} finally {
}
2024-11-24 02:07:38 +00:00
}
/ * *
* @ param { string } dir
* /
async function importDataset ( dir ) {
console . log ( dir ) ;
const date = basename ( dir ) . match ( /(\d{4}-\d{2}-\d{2})/ ) ? . [ 1 ] ;
const id _comercio = basename ( dir ) . match ( /comercio-sepa-(\d+)/ ) ? . [ 1 ] ;
// TODO: parsear "Ultima actualizacion" al final del CSV y insertarlo en la tabla datasets
2024-11-24 13:50:50 +00:00
const connection = await instance . connect ( ) ;
2024-11-24 02:07:38 +00:00
2024-12-02 02:09:44 +00:00
await connection . run ( "begin transaction" ) ;
2024-11-24 02:07:38 +00:00
try {
2024-11-24 13:50:50 +00:00
const res = await connection . run (
2024-11-24 02:07:38 +00:00
` insert into datasets (id, name, date, id_comercio) values (nextval('seq_datasets'), ' ${ basename ( dir ) } ', ' ${ date } ', ${ id _comercio } ) returning id `
) ;
2024-11-24 13:50:50 +00:00
const rows = await res . getRows ( ) ;
if ( ! rows [ 0 ] [ 0 ] ) throw new Error ( "No se pudo insertar el dataset" ) ;
2024-11-24 02:07:38 +00:00
console . log ( "inserted dataset" ) ;
2024-11-24 13:50:50 +00:00
const datasetId = parseInt ( rows [ 0 ] [ 0 ] . toString ( ) ) ;
2024-11-24 02:07:38 +00:00
const comercios = Papa . parse ( await readFile ( join ( dir , "comercio.csv" ) ) , {
header : true ,
} ) ;
const comercioCuit = comercios . data [ 0 ] . comercio _cuit ;
console . log ( ` dataset ${ datasetId } , comercio ${ comercioCuit } ` ) ;
2024-11-24 13:50:50 +00:00
await importBanderas ( connection , datasetId , dir ) ;
await importSucursales ( connection , datasetId , dir ) ;
await importPrecios ( connection , datasetId , dir ) ;
2024-11-24 02:07:38 +00:00
2024-11-24 13:50:50 +00:00
await connection . run ( "commit" ) ;
2024-11-24 02:07:38 +00:00
} catch ( e ) {
// @ts-ignore
2024-11-24 13:50:50 +00:00
if ( e . message . includes ( "Constraint Error: Duplicate key" ) ) {
2024-11-24 02:07:38 +00:00
console . log ( ` dataset ${ basename ( dir ) } already exists ` ) ;
2024-11-24 13:50:50 +00:00
await connection . run ( "abort" ) ;
2024-11-24 02:07:38 +00:00
return ;
}
console . error ( "errored, aborting transaction" , e ) ;
2024-11-24 13:50:50 +00:00
await connection . run ( "abort" ) ;
2024-11-24 02:07:38 +00:00
} finally {
2024-12-02 02:09:44 +00:00
// await connection.run("CHECKPOINT");
2024-11-24 02:07:38 +00:00
try {
Bun . gc ( true ) ;
} catch { }
}
}
/ * *
* @ param { string } tarPath
* /
async function importDatasetTar ( tarPath ) {
console . log ( ` importing tar ${ tarPath } ` ) ;
2024-11-24 13:50:50 +00:00
const dir = await fsp . mkdtemp ( "/tmp/sepa-precios-importer-" ) ;
2024-11-24 02:07:38 +00:00
try {
await $ ` tar -x -C ${ dir } -f ${ tarPath } ` ;
await importDump ( dir ) ;
} finally {
2024-11-24 13:50:50 +00:00
await fsp . rm ( dir , { recursive : true } ) ;
2024-11-24 02:07:38 +00:00
}
}
/ * *
* @ param { string } dumpDir
* /
async function importDump ( dumpDir ) {
const files = await fg ( "**/productos.csv" , { cwd : dumpDir } ) ;
for ( const file of files ) {
const dir = join ( dumpDir , dirname ( file ) ) ;
2024-11-24 13:50:50 +00:00
await importDataset ( dir ) ;
2024-11-24 02:07:38 +00:00
}
}
2024-11-24 13:50:50 +00:00
/ * *
* @ param { string } dir
* /
async function getComercioMetadata ( dir ) {
const comercios = Papa . parse ( await readFile ( join ( dir , "comercio.csv" ) ) , {
header : true ,
} ) ;
const comercioCuit = comercios . data [ 0 ] . comercio _cuit ;
return { comercioCuit } ;
2024-11-24 02:07:38 +00:00
}
2024-11-24 13:50:50 +00:00
// -----------
// tenemos que detectar si el archivo es UTF-16 o UTF-8
// porque DORINKA SRL a veces envía archivos con UTF-16.
// ------------
/ * *
* @ param { string } path
* @ returns { Promise < string > }
* /
async function readFile ( path ) {
const buffer = await fsp . readFile ( path , { encoding : null } ) ;
if ( buffer [ 0 ] === 0xff && buffer [ 1 ] === 0xfe ) {
return buffer . toString ( "utf16le" ) ;
} else {
return buffer . toString ( "utf8" ) ;
}
}
/ * *
* @ param { string } path
* @ returns { Promise < fs . ReadStream > }
* /
async function createReadStream ( path ) {
const chunks = [ ] ;
for await ( let chunk of fs . createReadStream ( path , { start : 0 , end : 1 } ) ) {
chunks . push ( chunk ) ;
}
const header = Buffer . concat ( chunks ) ;
if ( header [ 0 ] === 0xff && header [ 1 ] === 0xfe ) {
return fs . createReadStream ( path , { encoding : "utf16le" } ) ;
} else {
return fs . createReadStream ( path , { encoding : "utf8" } ) ;
}
2024-11-24 02:07:38 +00:00
}