mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-22 22:26:19 +00:00
Compare commits
7 commits
4c7d11840b
...
076abab943
Author | SHA1 | Date | |
---|---|---|---|
076abab943 | |||
41e7c54ab3 | |||
c24c931d46 | |||
1644d8b207 | |||
c8c603aade | |||
b7ced28868 | |||
d16e25fd69 |
4 changed files with 99 additions and 68 deletions
BIN
sepa/bun.lockb
BIN
sepa/bun.lockb
Binary file not shown.
|
@ -9,6 +9,7 @@ export const zResource = z.object({
|
||||||
url: z.string(),
|
url: z.string(),
|
||||||
modified: z.coerce.date().optional(),
|
modified: z.coerce.date().optional(),
|
||||||
description: z.string(),
|
description: z.string(),
|
||||||
|
name: z.string(),
|
||||||
});
|
});
|
||||||
export type Resource = z.infer<typeof zResource>;
|
export type Resource = z.infer<typeof zResource>;
|
||||||
export const zDatasetInfo = z.object({
|
export const zDatasetInfo = z.object({
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
import { readFile } from "fs/promises";
|
import * as fs from "fs/promises";
|
||||||
import Papa from "papaparse";
|
import Papa from "papaparse";
|
||||||
import { basename, join, dirname } from "path";
|
import { basename, join, dirname } from "path";
|
||||||
import postgres from "postgres";
|
import postgres from "postgres";
|
||||||
import { Readable } from "stream";
|
import { Readable } from "stream";
|
||||||
import { pipeline } from "node:stream/promises";
|
import { pipeline } from "node:stream/promises";
|
||||||
import { Glob } from "bun";
|
import { Glob } from "bun";
|
||||||
|
import PQueue from "p-queue";
|
||||||
|
|
||||||
|
// TODO: verificar que pasa cuando hay varios datasets del mismo día (como los suele haber cuando actualizan el dataset con nuevos comercios)
|
||||||
|
|
||||||
const sql = postgres({
|
const sql = postgres({
|
||||||
database: "sepa-precios",
|
database: "sepa-precios",
|
||||||
|
@ -65,21 +68,38 @@ await sql`
|
||||||
productos_precio_unitario_promo1 NUMERIC(10, 2),
|
productos_precio_unitario_promo1 NUMERIC(10, 2),
|
||||||
productos_leyenda_promo1 TEXT,
|
productos_leyenda_promo1 TEXT,
|
||||||
productos_precio_unitario_promo2 NUMERIC(10, 2),
|
productos_precio_unitario_promo2 NUMERIC(10, 2),
|
||||||
productos_leyenda_promo2 TEXT,
|
productos_leyenda_promo2 TEXT
|
||||||
FOREIGN KEY (id_dataset, id_comercio, id_bandera, id_sucursal) REFERENCES sucursales(id_dataset, id_comercio, id_bandera, id_sucursal)
|
|
||||||
);
|
);
|
||||||
`;
|
`;
|
||||||
|
|
||||||
|
await sql`
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_precios_composite ON precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto);
|
||||||
|
`;
|
||||||
|
|
||||||
|
await sql`
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sucursales_composite ON sucursales (id_dataset, id_comercio, id_bandera, id_sucursal);
|
||||||
|
`;
|
||||||
|
|
||||||
|
async function readFile(path: string) {
|
||||||
|
// XXX: DORINKA SRL a veces envía archivos con UTF-16.
|
||||||
|
const buffer = await fs.readFile(path, { encoding: null });
|
||||||
|
if (buffer[0] === 0xff && buffer[1] === 0xfe) {
|
||||||
|
return buffer.toString("utf16le");
|
||||||
|
} else {
|
||||||
|
return buffer.toString("utf8");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function importSucursales(
|
async function importSucursales(
|
||||||
sql: postgres.Sql,
|
sql: postgres.Sql,
|
||||||
datasetId: number,
|
datasetId: number,
|
||||||
dir: string,
|
dir: string
|
||||||
) {
|
) {
|
||||||
const sucursales: Papa.ParseResult<any> = Papa.parse(
|
const sucursales: Papa.ParseResult<any> = Papa.parse(
|
||||||
await readFile(join(dir, "sucursales.csv"), "utf-8"),
|
await readFile(join(dir, "sucursales.csv")),
|
||||||
{
|
{
|
||||||
header: true,
|
header: true,
|
||||||
},
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
const objs = sucursales.data
|
const objs = sucursales.data
|
||||||
|
@ -98,7 +118,7 @@ async function importSucursales(
|
||||||
});
|
});
|
||||||
const keys = Object.keys(objs[0]);
|
const keys = Object.keys(objs[0]);
|
||||||
const lines = Readable.from(
|
const lines = Readable.from(
|
||||||
objs.map((data) => keys.map((key) => (data as any)[key]).join("\t") + "\n"),
|
objs.map((data) => keys.map((key) => (data as any)[key]).join("\t") + "\n")
|
||||||
);
|
);
|
||||||
const writable =
|
const writable =
|
||||||
await sql`copy sucursales (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
|
await sql`copy sucursales (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
|
||||||
|
@ -121,18 +141,17 @@ async function importDataset(dir: string) {
|
||||||
const res =
|
const res =
|
||||||
await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`;
|
await sql`insert into datasets (name, date) values (${basename(dir)}, ${date}) returning id`;
|
||||||
datasetId = res[0].id;
|
datasetId = res[0].id;
|
||||||
const datas: any[] = [];
|
|
||||||
|
|
||||||
const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse(
|
const comercios: Papa.ParseResult<{ comercio_cuit: string }> = Papa.parse(
|
||||||
await readFile(join(dir, "comercio.csv"), "utf-8"),
|
await readFile(join(dir, "comercio.csv")),
|
||||||
{ header: true },
|
{ header: true }
|
||||||
);
|
);
|
||||||
const comercioCuit = comercios.data[0].comercio_cuit;
|
const comercioCuit = comercios.data[0].comercio_cuit;
|
||||||
console.log(`dataset ${datasetId}, comercio ${comercioCuit}`);
|
console.log(`dataset ${datasetId}, comercio ${comercioCuit}`);
|
||||||
|
|
||||||
await importSucursales(sql, datasetId, dir);
|
await importSucursales(sql, datasetId, dir);
|
||||||
|
|
||||||
let file = await readFile(join(dir, "productos.csv"), "utf-8");
|
let file = await readFile(join(dir, "productos.csv"));
|
||||||
// WALL OF SHAME: estos proveedores no saben producir CSVs correctos
|
// WALL OF SHAME: estos proveedores no saben producir CSVs correctos
|
||||||
if (comercioCuit == "30612929455") {
|
if (comercioCuit == "30612929455") {
|
||||||
// Libertad S.A.
|
// Libertad S.A.
|
||||||
|
@ -148,67 +167,75 @@ async function importDataset(dir: string) {
|
||||||
// productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva.
|
// productos_precio_lista seria precio_unitario_bulto_por_unidad_venta_con_iva.
|
||||||
// pero no quiero mentir, asi que por ahora no lo importo
|
// pero no quiero mentir, asi que por ahora no lo importo
|
||||||
console.error(
|
console.error(
|
||||||
`No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b`,
|
`No voy a importar el dataset ${dir} porque el formato está mal. Pero se podría importar. Pero por ahora no lo voy a hacer. Véase https://gist.github.com/catdevnull/587d5c63c4bab11b9798861c917db93b`
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.time("parse");
|
console.time("parse");
|
||||||
return await new Promise((resolve, reject) => {
|
|
||||||
Papa.parse(file, {
|
const writable =
|
||||||
|
await sql`copy precios (id_dataset, id_comercio, id_bandera, id_sucursal, id_producto, productos_ean, productos_descripcion, productos_cantidad_presentacion, productos_unidad_medida_presentacion, productos_marca, productos_precio_lista, productos_precio_referencia, productos_cantidad_referencia, productos_unidad_medida_referencia, productos_precio_unitario_promo1, productos_leyenda_promo1, productos_precio_unitario_promo2, productos_leyenda_promo2) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
|
||||||
|
|
||||||
|
let rowCount = 0;
|
||||||
|
|
||||||
|
async function* processRows() {
|
||||||
|
const parsedData = Papa.parse(file, {
|
||||||
header: true,
|
header: true,
|
||||||
step: function (result: any) {
|
skipEmptyLines: true,
|
||||||
const { data } = result;
|
});
|
||||||
|
|
||||||
|
for (const data of parsedData.data as any[]) {
|
||||||
if (
|
if (
|
||||||
data.id_comercio &&
|
data.id_comercio &&
|
||||||
data.id_bandera &&
|
data.id_bandera &&
|
||||||
data.id_sucursal &&
|
data.id_sucursal &&
|
||||||
data.id_producto
|
data.id_producto
|
||||||
)
|
) {
|
||||||
datas.push(data);
|
|
||||||
},
|
|
||||||
complete: async function () {
|
|
||||||
try {
|
|
||||||
console.timeEnd("parse");
|
|
||||||
console.time("map");
|
|
||||||
const objs = datas.map((data) => {
|
|
||||||
delete data.id_dun_14;
|
delete data.id_dun_14;
|
||||||
return {
|
const row = {
|
||||||
id_dataset: datasetId,
|
id_dataset: datasetId,
|
||||||
...data,
|
...data,
|
||||||
productos_descripcion: data.productos_descripcion.replaceAll(
|
productos_descripcion: data.productos_descripcion
|
||||||
"\t",
|
.replaceAll("\t", " ")
|
||||||
" ",
|
.trim(),
|
||||||
),
|
productos_marca: data.productos_marca.trim(),
|
||||||
};
|
};
|
||||||
});
|
const values =
|
||||||
if (!objs.length) {
|
[
|
||||||
console.error(`No hay datos para el dataset ${dir}`);
|
row.id_dataset,
|
||||||
return;
|
row.id_comercio,
|
||||||
|
row.id_bandera,
|
||||||
|
row.id_sucursal,
|
||||||
|
row.id_producto,
|
||||||
|
row.productos_ean,
|
||||||
|
row.productos_descripcion,
|
||||||
|
row.productos_cantidad_presentacion,
|
||||||
|
row.productos_unidad_medida_presentacion,
|
||||||
|
row.productos_marca,
|
||||||
|
row.productos_precio_lista,
|
||||||
|
row.productos_precio_referencia,
|
||||||
|
row.productos_cantidad_referencia,
|
||||||
|
row.productos_unidad_medida_referencia,
|
||||||
|
row.productos_precio_unitario_promo1,
|
||||||
|
row.productos_leyenda_promo1,
|
||||||
|
row.productos_precio_unitario_promo2,
|
||||||
|
row.productos_leyenda_promo2,
|
||||||
|
].join("\t") + "\n";
|
||||||
|
|
||||||
|
rowCount++;
|
||||||
|
yield values;
|
||||||
}
|
}
|
||||||
const keys = Object.keys(objs[0]);
|
}
|
||||||
const lines = Readable.from(
|
}
|
||||||
objs.map(
|
|
||||||
(data) => keys.map((key) => data[key]).join("\t") + "\n",
|
const generator = processRows();
|
||||||
),
|
await pipeline(Readable.from(generator), writable);
|
||||||
);
|
|
||||||
console.timeEnd("map");
|
console.timeEnd("parse");
|
||||||
console.time("copy");
|
console.info(`saved ${rowCount} rows`);
|
||||||
const writable =
|
|
||||||
await sql`copy precios (${sql.unsafe(keys.join(", "))}) from stdin with CSV DELIMITER E'\t' QUOTE E'\b'`.writable();
|
|
||||||
await pipeline(lines, writable);
|
|
||||||
console.timeEnd("copy");
|
|
||||||
console.info(`saved ${objs.length} rows`);
|
|
||||||
} catch (e) {
|
|
||||||
reject(e);
|
|
||||||
return;
|
|
||||||
} finally {
|
|
||||||
Bun.gc(true);
|
Bun.gc(true);
|
||||||
resolve(void 0);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
skipEmptyLines: true,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if ((e as any).code == "23505") {
|
if ((e as any).code == "23505") {
|
||||||
|
@ -219,13 +246,16 @@ async function importDataset(dir: string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const pQueue = new PQueue({ concurrency: 4 });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const glob = new Glob("**/productos.csv");
|
const glob = new Glob("**/productos.csv");
|
||||||
for await (const file of glob.scan(process.argv[2])) {
|
for await (const file of glob.scan(process.argv[2])) {
|
||||||
const dir = join(process.argv[2], dirname(file));
|
const dir = join(process.argv[2], dirname(file));
|
||||||
console.log(dir);
|
console.log(dir);
|
||||||
await importDataset(dir);
|
pQueue.add(() => importDataset(dir));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
await pQueue.onIdle();
|
||||||
await sql.end();
|
await sql.end();
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,9 +10,9 @@
|
||||||
"typescript": "^5.0.0"
|
"typescript": "^5.0.0"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"ckan": "workspace:*",
|
||||||
"p-queue": "^8.0.1",
|
"p-queue": "^8.0.1",
|
||||||
"papaparse": "^5.4.1",
|
"papaparse": "^5.4.1",
|
||||||
"postgres": "^3.4.4",
|
"postgres": "^3.4.4"
|
||||||
"ckan": "workspace:*"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue