mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-23 06:36:19 +00:00
mejorar contenedor de scraper
poder importar warcs especificos
This commit is contained in:
parent
8a49ddab7d
commit
fee0e1b872
6 changed files with 242 additions and 195 deletions
BIN
bun.lockb
BIN
bun.lockb
Binary file not shown.
|
@ -1,4 +1,5 @@
|
|||
import Database from "bun:sqlite";
|
||||
import { join } from "node:path";
|
||||
import { drizzle } from "drizzle-orm/bun-sqlite";
|
||||
import { migrate } from "drizzle-orm/bun-sqlite/migrator";
|
||||
import * as schema from "./schema.js";
|
||||
|
@ -8,7 +9,7 @@ export function migrateDb() {
|
|||
const sqlite = new Database(DB_PATH);
|
||||
const db = drizzle(sqlite, { schema });
|
||||
|
||||
migrate(db, { migrationsFolder: "./drizzle" });
|
||||
migrate(db, { migrationsFolder: join(import.meta.dir, "drizzle") });
|
||||
sqlite.run(`
|
||||
pragma journal_mode = WAL;
|
||||
PRAGMA synchronous = NORMAL;
|
||||
|
|
|
@ -5,22 +5,23 @@ FROM base AS builder
|
|||
ENV NODE_ENV=production
|
||||
COPY . .
|
||||
RUN bun install --frozen-lockfile \
|
||||
&& bun build scraper/auto.ts --target=bun --outfile=/tmp/auto.build.js \
|
||||
&& bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \
|
||||
&& rm -rf node_modules/
|
||||
|
||||
FROM base
|
||||
RUN apk add --no-cache wget zstd cronie tini
|
||||
RUN printf "#!/bin/sh\nexec bun /app/built.js" > /etc/periodic/daily/scraper \
|
||||
RUN printf "#!/bin/sh\nexec bun /bin/scraper auto" > /etc/periodic/daily/scraper \
|
||||
&& chmod +x /etc/periodic/daily/scraper
|
||||
|
||||
COPY --from=builder /tmp/auto.build.js /app/built.js
|
||||
COPY --from=builder /usr/src/app/db-datos/drizzle /app/drizzle
|
||||
COPY --from=builder /tmp/cli.build.js /bin/scraper
|
||||
COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle
|
||||
COPY --from=builder /usr/src/app/data /listas
|
||||
WORKDIR /app
|
||||
|
||||
VOLUME /db
|
||||
ENV NODE_ENV=production
|
||||
ENV DB_PATH=/db/db.db
|
||||
ENV LISTS_DIR=/listas/
|
||||
|
||||
CMD ["tini", "/usr/sbin/crond", "-n"]
|
||||
# CMD ["bun", "/app/built.js"]
|
||||
# CMD ["bun", "/bin/scraper"]
|
398
scraper/auto.ts
398
scraper/auto.ts
|
@ -10,28 +10,6 @@ import { S3Client } from "@aws-sdk/client-s3";
|
|||
import { Upload } from "@aws-sdk/lib-storage";
|
||||
import { BunFile } from "bun";
|
||||
|
||||
if (
|
||||
!process.env.S3_ACCESS_KEY_ID ||
|
||||
!process.env.S3_SECRET_ACCESS_KEY ||
|
||||
!process.env.S3_BUCKET_NAME
|
||||
)
|
||||
throw new Error("missing s3 creds");
|
||||
if (!process.env.TELEGRAM_BOT_TOKEN)
|
||||
console.warn("no hay TELEGRAM_BOT_TOKEN, no voy a loggear por allá");
|
||||
if (!process.env.TELEGRAM_BOT_CHAT_ID)
|
||||
console.warn("no hay TELEGRAM_BOT_CHAT_ID, no voy a loggear por allá");
|
||||
const { S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY } = process.env;
|
||||
|
||||
// https://www.backblaze.com/docs/cloud-storage-use-the-aws-sdk-for-javascript-v3-with-backblaze-b2
|
||||
const s3 = new S3Client({
|
||||
endpoint: "https://s3.us-west-004.backblazeb2.com",
|
||||
region: "us-west-004",
|
||||
credentials: {
|
||||
accessKeyId: S3_ACCESS_KEY_ID,
|
||||
secretAccessKey: S3_SECRET_ACCESS_KEY,
|
||||
},
|
||||
});
|
||||
|
||||
const supermercados: Supermercado[] = [
|
||||
Supermercado.Carrefour,
|
||||
Supermercado.Coto,
|
||||
|
@ -44,162 +22,227 @@ const compressionQueue = new PQueue({ concurrency: 1 });
|
|||
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
|
||||
const scrapQueue = new PQueue({ concurrency: 1 });
|
||||
|
||||
supermercados.forEach(downloadList);
|
||||
// await recompress("sqlite.db.gz", "sqlite.db.zst");
|
||||
|
||||
async function downloadList(supermercado: Supermercado) {
|
||||
const listPath = resolve(
|
||||
join(process.env.LISTS_DIR ?? "../data", `${supermercado}.txt`)
|
||||
);
|
||||
const date = new Date();
|
||||
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-"));
|
||||
const zstdWarcName = `${supermercado}-${format(
|
||||
date,
|
||||
"yyyy-MM-dd-HH:mm"
|
||||
)}.warc.zst`;
|
||||
const zstdWarcPath = join(ctxPath, zstdWarcName);
|
||||
const subproc = Bun.spawn({
|
||||
cmd: [
|
||||
"wget",
|
||||
"--no-verbose",
|
||||
"--tries=3",
|
||||
"--delete-after",
|
||||
"--input-file",
|
||||
listPath,
|
||||
`--warc-file=temp`,
|
||||
],
|
||||
stderr: "ignore",
|
||||
stdout: "ignore",
|
||||
cwd: ctxPath,
|
||||
});
|
||||
const t0 = performance.now();
|
||||
await subproc.exited;
|
||||
inform(`wget para ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`);
|
||||
|
||||
const gzippedWarcPath = join(ctxPath, "temp.warc.gz");
|
||||
if (!(await fileExists(gzippedWarcPath))) {
|
||||
const err = report(`no encontré el ${gzippedWarcPath}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
await compressionQueue.add(() => recompress(gzippedWarcPath, zstdWarcPath));
|
||||
if (!(await fileExists(zstdWarcPath))) {
|
||||
const err = report(`no encontré el ${zstdWarcPath}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
scrapAndInform({ zstdWarcPath, zstdWarcName });
|
||||
|
||||
try {
|
||||
await uploadToBucket({
|
||||
fileName: zstdWarcName,
|
||||
file: Bun.file(zstdWarcPath),
|
||||
});
|
||||
} catch (error) {
|
||||
inform(`Falló subir ${zstdWarcName} a S3; ${error}`);
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
// TODO: borrar archivos temporales
|
||||
export async function auto() {
|
||||
const a = new Auto();
|
||||
await Promise.all(supermercados.map((supr) => a.downloadList(supr)));
|
||||
}
|
||||
|
||||
async function scrapAndInform({
|
||||
zstdWarcPath,
|
||||
zstdWarcName,
|
||||
}: {
|
||||
zstdWarcPath: string;
|
||||
zstdWarcName: string;
|
||||
}) {
|
||||
const res = await scrapQueue.add(async () => {
|
||||
class Auto {
|
||||
s3Config?: { s3: S3Client; bucketName: string };
|
||||
telegramConfig?: { token: string; chatId: string };
|
||||
|
||||
constructor() {
|
||||
if (
|
||||
!process.env.S3_ACCESS_KEY_ID ||
|
||||
!process.env.S3_SECRET_ACCESS_KEY ||
|
||||
!process.env.S3_BUCKET_NAME
|
||||
) {
|
||||
if (process.env.NODE_ENV === "development") {
|
||||
console.warn("faltan creds de s3, no voy a subir a s3");
|
||||
} else {
|
||||
throw new Error("faltan creds de s3");
|
||||
}
|
||||
} else {
|
||||
this.s3Config = {
|
||||
// https://www.backblaze.com/docs/cloud-storage-use-the-aws-sdk-for-javascript-v3-with-backblaze-b2
|
||||
s3: new S3Client({
|
||||
endpoint: "https://s3.us-west-004.backblazeb2.com",
|
||||
region: "us-west-004",
|
||||
credentials: {
|
||||
accessKeyId: process.env.S3_ACCESS_KEY_ID,
|
||||
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
|
||||
},
|
||||
}),
|
||||
bucketName: process.env.S3_BUCKET_NAME,
|
||||
};
|
||||
}
|
||||
|
||||
if (!process.env.TELEGRAM_BOT_TOKEN)
|
||||
console.warn("no hay TELEGRAM_BOT_TOKEN, no voy a loggear por allá");
|
||||
else if (!process.env.TELEGRAM_BOT_CHAT_ID)
|
||||
console.warn("no hay TELEGRAM_BOT_CHAT_ID, no voy a loggear por allá");
|
||||
else
|
||||
this.telegramConfig = {
|
||||
token: process.env.TELEGRAM_BOT_TOKEN,
|
||||
chatId: process.env.TELEGRAM_BOT_CHAT_ID,
|
||||
};
|
||||
}
|
||||
|
||||
async downloadList(supermercado: Supermercado) {
|
||||
const listPath = resolve(
|
||||
join(process.env.LISTS_DIR ?? "../data", `${supermercado}.txt`)
|
||||
);
|
||||
const date = new Date();
|
||||
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-"));
|
||||
const zstdWarcName = `${supermercado}-${format(
|
||||
date,
|
||||
"yyyy-MM-dd-HH:mm"
|
||||
)}.warc.zst`;
|
||||
const zstdWarcPath = join(ctxPath, zstdWarcName);
|
||||
const subproc = Bun.spawn({
|
||||
cmd: [
|
||||
"wget",
|
||||
"--no-verbose",
|
||||
"--tries=3",
|
||||
"--delete-after",
|
||||
"--input-file",
|
||||
listPath,
|
||||
`--warc-file=temp`,
|
||||
],
|
||||
stderr: "ignore",
|
||||
stdout: "ignore",
|
||||
cwd: ctxPath,
|
||||
});
|
||||
const t0 = performance.now();
|
||||
const progress = await parseWarc(zstdWarcPath);
|
||||
return { took: performance.now() - t0, progress };
|
||||
});
|
||||
|
||||
if (res) {
|
||||
const { took, progress } = res;
|
||||
inform(
|
||||
`Procesado ${zstdWarcName} (${progress.done} ok, ${
|
||||
progress.errors.length
|
||||
} errores) (tardó ${formatMs(took)})`
|
||||
await subproc.exited;
|
||||
this.inform(
|
||||
`wget para ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`
|
||||
);
|
||||
} else {
|
||||
inform(`Algo falló en ${zstdWarcName}`);
|
||||
|
||||
const gzippedWarcPath = join(ctxPath, "temp.warc.gz");
|
||||
if (!(await fileExists(gzippedWarcPath))) {
|
||||
const err = this.report(`no encontré el ${gzippedWarcPath}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
await compressionQueue.add(() =>
|
||||
this.recompress(gzippedWarcPath, zstdWarcPath)
|
||||
);
|
||||
if (!(await fileExists(zstdWarcPath))) {
|
||||
const err = this.report(`no encontré el ${zstdWarcPath}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
this.scrapAndInform({ zstdWarcPath, zstdWarcName });
|
||||
|
||||
try {
|
||||
await this.uploadToBucket({
|
||||
fileName: zstdWarcName,
|
||||
file: Bun.file(zstdWarcPath),
|
||||
});
|
||||
} catch (error) {
|
||||
this.inform(`Falló subir ${zstdWarcName} a S3; ${error}`);
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
// TODO: borrar archivos temporales
|
||||
}
|
||||
|
||||
async scrapAndInform({
|
||||
zstdWarcPath,
|
||||
zstdWarcName,
|
||||
}: {
|
||||
zstdWarcPath: string;
|
||||
zstdWarcName: string;
|
||||
}) {
|
||||
const res = await scrapQueue.add(async () => {
|
||||
const t0 = performance.now();
|
||||
const progress = await parseWarc(zstdWarcPath);
|
||||
return { took: performance.now() - t0, progress };
|
||||
});
|
||||
|
||||
if (res) {
|
||||
const { took, progress } = res;
|
||||
this.inform(
|
||||
`Procesado ${zstdWarcName} (${progress.done} ok, ${
|
||||
progress.errors.length
|
||||
} errores) (tardó ${formatMs(took)})`
|
||||
);
|
||||
} else {
|
||||
this.inform(`Algo falló en ${zstdWarcName}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* toma un archivo gzippeado y lo recomprime con zstd.
|
||||
* borra el archivo original.
|
||||
*/
|
||||
recompress(inputPath: string, outputPath: string) {
|
||||
// XXX: por alguna razón no funciona en Bun 1.0.20
|
||||
// const decompressor = Bun.spawn({
|
||||
// cmd: ["gzip", "-dc", inputPath],
|
||||
// stderr: "inherit",
|
||||
// });
|
||||
// const compressor = Bun.spawn({
|
||||
// cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath],
|
||||
// stdin: decompressor.stdout,
|
||||
// // stderr: "inherit",
|
||||
// });
|
||||
// const errorCode = await compressor.exited;
|
||||
// if (errorCode !== 0) {
|
||||
// const err = report(`zstd threw error code ${errorCode}`);
|
||||
// throw err;
|
||||
// }
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const decompressor = spawn("gzip", ["-dc", inputPath], {
|
||||
stdio: [null, "pipe", null],
|
||||
});
|
||||
const compressor = spawn(
|
||||
"zstd",
|
||||
["-T0", "-15", "--long", "-o", outputPath],
|
||||
{
|
||||
stdio: ["pipe", null, null],
|
||||
}
|
||||
);
|
||||
// @ts-expect-error a los types de bun no le gusta????
|
||||
decompressor.stdout.pipe(compressor.stdin);
|
||||
compressor.on("close", (code) => {
|
||||
if (code !== 0) {
|
||||
const err = this.report(`zstd threw error code ${code}`);
|
||||
reject(err);
|
||||
}
|
||||
resolve(void 0);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async uploadToBucket({
|
||||
fileName,
|
||||
file,
|
||||
}: {
|
||||
fileName: string;
|
||||
file: BunFile;
|
||||
}) {
|
||||
if (!this.s3Config) {
|
||||
this.inform(
|
||||
`[s3] Se intentó subir ${fileName} pero no tenemos creds de S3`
|
||||
);
|
||||
return;
|
||||
}
|
||||
const parallelUploads3 = new Upload({
|
||||
client: this.s3Config.s3,
|
||||
params: {
|
||||
Bucket: this.s3Config.bucketName,
|
||||
Key: fileName,
|
||||
Body: file,
|
||||
},
|
||||
});
|
||||
await parallelUploads3.done();
|
||||
}
|
||||
|
||||
inform(msg: string) {
|
||||
this.sendTelegramMsg(msg);
|
||||
console.info(msg);
|
||||
}
|
||||
report(msg: string) {
|
||||
this.inform(msg);
|
||||
const error = new Error(msg);
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
async sendTelegramMsg(text: string) {
|
||||
if (!this.telegramConfig) return;
|
||||
const url = new URL(
|
||||
`https://api.telegram.org/bot${this.telegramConfig.token}/sendMessage`
|
||||
);
|
||||
url.searchParams.set("chat_id", this.telegramConfig.chatId);
|
||||
url.searchParams.set("text", text);
|
||||
await fetch(url);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* toma un archivo gzippeado y lo recomprime con zstd.
|
||||
* borra el archivo original.
|
||||
*/
|
||||
function recompress(inputPath: string, outputPath: string) {
|
||||
// XXX: por alguna razón no funciona en Bun 1.0.20
|
||||
// const decompressor = Bun.spawn({
|
||||
// cmd: ["gzip", "-dc", inputPath],
|
||||
// stderr: "inherit",
|
||||
// });
|
||||
// const compressor = Bun.spawn({
|
||||
// cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath],
|
||||
// stdin: decompressor.stdout,
|
||||
// // stderr: "inherit",
|
||||
// });
|
||||
// const errorCode = await compressor.exited;
|
||||
// if (errorCode !== 0) {
|
||||
// const err = report(`zstd threw error code ${errorCode}`);
|
||||
// throw err;
|
||||
// }
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const decompressor = spawn("gzip", ["-dc", inputPath], {
|
||||
stdio: [null, "pipe", null],
|
||||
});
|
||||
const compressor = spawn(
|
||||
"zstd",
|
||||
["-T0", "-15", "--long", "-o", outputPath],
|
||||
{
|
||||
stdio: ["pipe", null, null],
|
||||
}
|
||||
);
|
||||
// @ts-expect-error a los types de bun no le gusta????
|
||||
decompressor.stdout.pipe(compressor.stdin);
|
||||
compressor.on("close", (code) => {
|
||||
if (code !== 0) {
|
||||
const err = report(`zstd threw error code ${code}`);
|
||||
reject(err);
|
||||
}
|
||||
resolve(void 0);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function uploadToBucket({
|
||||
fileName,
|
||||
file,
|
||||
}: {
|
||||
fileName: string;
|
||||
file: BunFile;
|
||||
}) {
|
||||
const parallelUploads3 = new Upload({
|
||||
client: s3,
|
||||
params: {
|
||||
Bucket: S3_BUCKET_NAME,
|
||||
Key: fileName,
|
||||
Body: file,
|
||||
},
|
||||
});
|
||||
await parallelUploads3.done();
|
||||
}
|
||||
|
||||
function inform(msg: string) {
|
||||
sendTelegramMsg(msg);
|
||||
console.info(msg);
|
||||
}
|
||||
function report(msg: string) {
|
||||
inform(msg);
|
||||
const error = new Error(msg);
|
||||
|
||||
return error;
|
||||
}
|
||||
// await recompress("sqlite.db.gz", "sqlite.db.zst");
|
||||
|
||||
// no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists
|
||||
async function fileExists(path: string) {
|
||||
|
@ -211,17 +254,6 @@ async function fileExists(path: string) {
|
|||
}
|
||||
}
|
||||
|
||||
async function sendTelegramMsg(text: string) {
|
||||
if (!process.env.TELEGRAM_BOT_TOKEN || !process.env.TELEGRAM_BOT_CHAT_ID)
|
||||
return;
|
||||
const url = new URL(
|
||||
`https://api.telegram.org/bot${process.env.TELEGRAM_BOT_TOKEN}/sendMessage`
|
||||
);
|
||||
url.searchParams.set("chat_id", process.env.TELEGRAM_BOT_CHAT_ID);
|
||||
url.searchParams.set("text", text);
|
||||
await fetch(url);
|
||||
}
|
||||
|
||||
function formatMs(ms: number) {
|
||||
return formatDuration(intervalToDuration({ start: 0, end: Math.round(ms) }));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
import { auto } from "./auto.js";
|
||||
import { parseWarc } from "./scrap.js";
|
||||
|
||||
if (process.argv[2] === "auto") {
|
||||
await auto();
|
||||
} else if (process.argv[2] === "scrap") {
|
||||
const warcPaths = process.argv.slice(3);
|
||||
if (warcPaths.length > 0) {
|
||||
for (const path of warcPaths) {
|
||||
await parseWarc(path);
|
||||
}
|
||||
} else {
|
||||
console.error("Especificá WARCs para scrapear.");
|
||||
process.exit(1);
|
||||
}
|
||||
} else {
|
||||
console.error("Especificá una acción (tipo `auto` o `scrap`) para hacer.");
|
||||
process.exit(1);
|
||||
}
|
|
@ -32,12 +32,6 @@ const getPrevPrecio = db
|
|||
.limit(1)
|
||||
.prepare();
|
||||
|
||||
if (process.argv[1].endsWith("/scrap.ts")) {
|
||||
for (const path of process.argv.slice(2)) {
|
||||
await parseWarc(path);
|
||||
}
|
||||
}
|
||||
|
||||
export type Precio = typeof schema.precios.$inferInsert;
|
||||
export type Precioish = Omit<
|
||||
Precio,
|
||||
|
|
Loading…
Reference in a new issue