diff --git a/bun.lockb b/bun.lockb index 8dcbab0..55f034f 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/db-datos/migrate.ts b/db-datos/migrate.ts index ef793f5..cb47034 100644 --- a/db-datos/migrate.ts +++ b/db-datos/migrate.ts @@ -1,4 +1,5 @@ import Database from "bun:sqlite"; +import { join } from "node:path"; import { drizzle } from "drizzle-orm/bun-sqlite"; import { migrate } from "drizzle-orm/bun-sqlite/migrator"; import * as schema from "./schema.js"; @@ -8,7 +9,7 @@ export function migrateDb() { const sqlite = new Database(DB_PATH); const db = drizzle(sqlite, { schema }); - migrate(db, { migrationsFolder: "./drizzle" }); + migrate(db, { migrationsFolder: join(import.meta.dir, "drizzle") }); sqlite.run(` pragma journal_mode = WAL; PRAGMA synchronous = NORMAL; diff --git a/scraper/Containerfile b/scraper/Containerfile index d2aa951..da2a0ae 100644 --- a/scraper/Containerfile +++ b/scraper/Containerfile @@ -5,22 +5,23 @@ FROM base AS builder ENV NODE_ENV=production COPY . . RUN bun install --frozen-lockfile \ - && bun build scraper/auto.ts --target=bun --outfile=/tmp/auto.build.js \ + && bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js \ && rm -rf node_modules/ FROM base RUN apk add --no-cache wget zstd cronie tini -RUN printf "#!/bin/sh\nexec bun /app/built.js" > /etc/periodic/daily/scraper \ +RUN printf "#!/bin/sh\nexec bun /bin/scraper auto" > /etc/periodic/daily/scraper \ && chmod +x /etc/periodic/daily/scraper -COPY --from=builder /tmp/auto.build.js /app/built.js -COPY --from=builder /usr/src/app/db-datos/drizzle /app/drizzle +COPY --from=builder /tmp/cli.build.js /bin/scraper +COPY --from=builder /usr/src/app/db-datos/drizzle /bin/drizzle COPY --from=builder /usr/src/app/data /listas WORKDIR /app VOLUME /db +ENV NODE_ENV=production ENV DB_PATH=/db/db.db ENV LISTS_DIR=/listas/ CMD ["tini", "/usr/sbin/crond", "-n"] -# CMD ["bun", "/app/built.js"] \ No newline at end of file +# CMD ["bun", "/bin/scraper"] \ No newline at end of file diff --git a/scraper/auto.ts b/scraper/auto.ts index c7359e2..ee97627 100644 --- a/scraper/auto.ts +++ b/scraper/auto.ts @@ -10,28 +10,6 @@ import { S3Client } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; import { BunFile } from "bun"; -if ( - !process.env.S3_ACCESS_KEY_ID || - !process.env.S3_SECRET_ACCESS_KEY || - !process.env.S3_BUCKET_NAME -) - throw new Error("missing s3 creds"); -if (!process.env.TELEGRAM_BOT_TOKEN) - console.warn("no hay TELEGRAM_BOT_TOKEN, no voy a loggear por allá"); -if (!process.env.TELEGRAM_BOT_CHAT_ID) - console.warn("no hay TELEGRAM_BOT_CHAT_ID, no voy a loggear por allá"); -const { S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY } = process.env; - -// https://www.backblaze.com/docs/cloud-storage-use-the-aws-sdk-for-javascript-v3-with-backblaze-b2 -const s3 = new S3Client({ - endpoint: "https://s3.us-west-004.backblazeb2.com", - region: "us-west-004", - credentials: { - accessKeyId: S3_ACCESS_KEY_ID, - secretAccessKey: S3_SECRET_ACCESS_KEY, - }, -}); - const supermercados: Supermercado[] = [ Supermercado.Carrefour, Supermercado.Coto, @@ -44,162 +22,227 @@ const compressionQueue = new PQueue({ concurrency: 1 }); // hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU const scrapQueue = new PQueue({ concurrency: 1 }); -supermercados.forEach(downloadList); -// await recompress("sqlite.db.gz", "sqlite.db.zst"); - -async function downloadList(supermercado: Supermercado) { - const listPath = resolve( - join(process.env.LISTS_DIR ?? "../data", `${supermercado}.txt`) - ); - const date = new Date(); - const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-")); - const zstdWarcName = `${supermercado}-${format( - date, - "yyyy-MM-dd-HH:mm" - )}.warc.zst`; - const zstdWarcPath = join(ctxPath, zstdWarcName); - const subproc = Bun.spawn({ - cmd: [ - "wget", - "--no-verbose", - "--tries=3", - "--delete-after", - "--input-file", - listPath, - `--warc-file=temp`, - ], - stderr: "ignore", - stdout: "ignore", - cwd: ctxPath, - }); - const t0 = performance.now(); - await subproc.exited; - inform(`wget para ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}`); - - const gzippedWarcPath = join(ctxPath, "temp.warc.gz"); - if (!(await fileExists(gzippedWarcPath))) { - const err = report(`no encontré el ${gzippedWarcPath}`); - throw err; - } - - await compressionQueue.add(() => recompress(gzippedWarcPath, zstdWarcPath)); - if (!(await fileExists(zstdWarcPath))) { - const err = report(`no encontré el ${zstdWarcPath}`); - throw err; - } - - scrapAndInform({ zstdWarcPath, zstdWarcName }); - - try { - await uploadToBucket({ - fileName: zstdWarcName, - file: Bun.file(zstdWarcPath), - }); - } catch (error) { - inform(`Falló subir ${zstdWarcName} a S3; ${error}`); - console.error(error); - } - - // TODO: borrar archivos temporales +export async function auto() { + const a = new Auto(); + await Promise.all(supermercados.map((supr) => a.downloadList(supr))); } -async function scrapAndInform({ - zstdWarcPath, - zstdWarcName, -}: { - zstdWarcPath: string; - zstdWarcName: string; -}) { - const res = await scrapQueue.add(async () => { +class Auto { + s3Config?: { s3: S3Client; bucketName: string }; + telegramConfig?: { token: string; chatId: string }; + + constructor() { + if ( + !process.env.S3_ACCESS_KEY_ID || + !process.env.S3_SECRET_ACCESS_KEY || + !process.env.S3_BUCKET_NAME + ) { + if (process.env.NODE_ENV === "development") { + console.warn("faltan creds de s3, no voy a subir a s3"); + } else { + throw new Error("faltan creds de s3"); + } + } else { + this.s3Config = { + // https://www.backblaze.com/docs/cloud-storage-use-the-aws-sdk-for-javascript-v3-with-backblaze-b2 + s3: new S3Client({ + endpoint: "https://s3.us-west-004.backblazeb2.com", + region: "us-west-004", + credentials: { + accessKeyId: process.env.S3_ACCESS_KEY_ID, + secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, + }, + }), + bucketName: process.env.S3_BUCKET_NAME, + }; + } + + if (!process.env.TELEGRAM_BOT_TOKEN) + console.warn("no hay TELEGRAM_BOT_TOKEN, no voy a loggear por allá"); + else if (!process.env.TELEGRAM_BOT_CHAT_ID) + console.warn("no hay TELEGRAM_BOT_CHAT_ID, no voy a loggear por allá"); + else + this.telegramConfig = { + token: process.env.TELEGRAM_BOT_TOKEN, + chatId: process.env.TELEGRAM_BOT_CHAT_ID, + }; + } + + async downloadList(supermercado: Supermercado) { + const listPath = resolve( + join(process.env.LISTS_DIR ?? "../data", `${supermercado}.txt`) + ); + const date = new Date(); + const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-wget-")); + const zstdWarcName = `${supermercado}-${format( + date, + "yyyy-MM-dd-HH:mm" + )}.warc.zst`; + const zstdWarcPath = join(ctxPath, zstdWarcName); + const subproc = Bun.spawn({ + cmd: [ + "wget", + "--no-verbose", + "--tries=3", + "--delete-after", + "--input-file", + listPath, + `--warc-file=temp`, + ], + stderr: "ignore", + stdout: "ignore", + cwd: ctxPath, + }); const t0 = performance.now(); - const progress = await parseWarc(zstdWarcPath); - return { took: performance.now() - t0, progress }; - }); - - if (res) { - const { took, progress } = res; - inform( - `Procesado ${zstdWarcName} (${progress.done} ok, ${ - progress.errors.length - } errores) (tardó ${formatMs(took)})` + await subproc.exited; + this.inform( + `wget para ${zstdWarcName} tardó ${formatMs(performance.now() - t0)}` ); - } else { - inform(`Algo falló en ${zstdWarcName}`); + + const gzippedWarcPath = join(ctxPath, "temp.warc.gz"); + if (!(await fileExists(gzippedWarcPath))) { + const err = this.report(`no encontré el ${gzippedWarcPath}`); + throw err; + } + + await compressionQueue.add(() => + this.recompress(gzippedWarcPath, zstdWarcPath) + ); + if (!(await fileExists(zstdWarcPath))) { + const err = this.report(`no encontré el ${zstdWarcPath}`); + throw err; + } + + this.scrapAndInform({ zstdWarcPath, zstdWarcName }); + + try { + await this.uploadToBucket({ + fileName: zstdWarcName, + file: Bun.file(zstdWarcPath), + }); + } catch (error) { + this.inform(`Falló subir ${zstdWarcName} a S3; ${error}`); + console.error(error); + } + + // TODO: borrar archivos temporales + } + + async scrapAndInform({ + zstdWarcPath, + zstdWarcName, + }: { + zstdWarcPath: string; + zstdWarcName: string; + }) { + const res = await scrapQueue.add(async () => { + const t0 = performance.now(); + const progress = await parseWarc(zstdWarcPath); + return { took: performance.now() - t0, progress }; + }); + + if (res) { + const { took, progress } = res; + this.inform( + `Procesado ${zstdWarcName} (${progress.done} ok, ${ + progress.errors.length + } errores) (tardó ${formatMs(took)})` + ); + } else { + this.inform(`Algo falló en ${zstdWarcName}`); + } + } + + /** + * toma un archivo gzippeado y lo recomprime con zstd. + * borra el archivo original. + */ + recompress(inputPath: string, outputPath: string) { + // XXX: por alguna razón no funciona en Bun 1.0.20 + // const decompressor = Bun.spawn({ + // cmd: ["gzip", "-dc", inputPath], + // stderr: "inherit", + // }); + // const compressor = Bun.spawn({ + // cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath], + // stdin: decompressor.stdout, + // // stderr: "inherit", + // }); + // const errorCode = await compressor.exited; + // if (errorCode !== 0) { + // const err = report(`zstd threw error code ${errorCode}`); + // throw err; + // } + + return new Promise((resolve, reject) => { + const decompressor = spawn("gzip", ["-dc", inputPath], { + stdio: [null, "pipe", null], + }); + const compressor = spawn( + "zstd", + ["-T0", "-15", "--long", "-o", outputPath], + { + stdio: ["pipe", null, null], + } + ); + // @ts-expect-error a los types de bun no le gusta???? + decompressor.stdout.pipe(compressor.stdin); + compressor.on("close", (code) => { + if (code !== 0) { + const err = this.report(`zstd threw error code ${code}`); + reject(err); + } + resolve(void 0); + }); + }); + } + + async uploadToBucket({ + fileName, + file, + }: { + fileName: string; + file: BunFile; + }) { + if (!this.s3Config) { + this.inform( + `[s3] Se intentó subir ${fileName} pero no tenemos creds de S3` + ); + return; + } + const parallelUploads3 = new Upload({ + client: this.s3Config.s3, + params: { + Bucket: this.s3Config.bucketName, + Key: fileName, + Body: file, + }, + }); + await parallelUploads3.done(); + } + + inform(msg: string) { + this.sendTelegramMsg(msg); + console.info(msg); + } + report(msg: string) { + this.inform(msg); + const error = new Error(msg); + + return error; + } + + async sendTelegramMsg(text: string) { + if (!this.telegramConfig) return; + const url = new URL( + `https://api.telegram.org/bot${this.telegramConfig.token}/sendMessage` + ); + url.searchParams.set("chat_id", this.telegramConfig.chatId); + url.searchParams.set("text", text); + await fetch(url); } } - -/** - * toma un archivo gzippeado y lo recomprime con zstd. - * borra el archivo original. - */ -function recompress(inputPath: string, outputPath: string) { - // XXX: por alguna razón no funciona en Bun 1.0.20 - // const decompressor = Bun.spawn({ - // cmd: ["gzip", "-dc", inputPath], - // stderr: "inherit", - // }); - // const compressor = Bun.spawn({ - // cmd: ["zstd", "-T0", "-15", "--long", "-o", outputPath], - // stdin: decompressor.stdout, - // // stderr: "inherit", - // }); - // const errorCode = await compressor.exited; - // if (errorCode !== 0) { - // const err = report(`zstd threw error code ${errorCode}`); - // throw err; - // } - - return new Promise((resolve, reject) => { - const decompressor = spawn("gzip", ["-dc", inputPath], { - stdio: [null, "pipe", null], - }); - const compressor = spawn( - "zstd", - ["-T0", "-15", "--long", "-o", outputPath], - { - stdio: ["pipe", null, null], - } - ); - // @ts-expect-error a los types de bun no le gusta???? - decompressor.stdout.pipe(compressor.stdin); - compressor.on("close", (code) => { - if (code !== 0) { - const err = report(`zstd threw error code ${code}`); - reject(err); - } - resolve(void 0); - }); - }); -} - -async function uploadToBucket({ - fileName, - file, -}: { - fileName: string; - file: BunFile; -}) { - const parallelUploads3 = new Upload({ - client: s3, - params: { - Bucket: S3_BUCKET_NAME, - Key: fileName, - Body: file, - }, - }); - await parallelUploads3.done(); -} - -function inform(msg: string) { - sendTelegramMsg(msg); - console.info(msg); -} -function report(msg: string) { - inform(msg); - const error = new Error(msg); - - return error; -} +// await recompress("sqlite.db.gz", "sqlite.db.zst"); // no se llama exists porque bun tiene un bug en el que usa fs.exists por mas que exista una funcion llamada exists async function fileExists(path: string) { @@ -211,17 +254,6 @@ async function fileExists(path: string) { } } -async function sendTelegramMsg(text: string) { - if (!process.env.TELEGRAM_BOT_TOKEN || !process.env.TELEGRAM_BOT_CHAT_ID) - return; - const url = new URL( - `https://api.telegram.org/bot${process.env.TELEGRAM_BOT_TOKEN}/sendMessage` - ); - url.searchParams.set("chat_id", process.env.TELEGRAM_BOT_CHAT_ID); - url.searchParams.set("text", text); - await fetch(url); -} - function formatMs(ms: number) { return formatDuration(intervalToDuration({ start: 0, end: Math.round(ms) })); } diff --git a/scraper/cli.ts b/scraper/cli.ts index e69de29..3f3fb39 100644 --- a/scraper/cli.ts +++ b/scraper/cli.ts @@ -0,0 +1,19 @@ +import { auto } from "./auto.js"; +import { parseWarc } from "./scrap.js"; + +if (process.argv[2] === "auto") { + await auto(); +} else if (process.argv[2] === "scrap") { + const warcPaths = process.argv.slice(3); + if (warcPaths.length > 0) { + for (const path of warcPaths) { + await parseWarc(path); + } + } else { + console.error("Especificá WARCs para scrapear."); + process.exit(1); + } +} else { + console.error("Especificá una acción (tipo `auto` o `scrap`) para hacer."); + process.exit(1); +} diff --git a/scraper/scrap.ts b/scraper/scrap.ts index 83b418a..bcf5aa9 100644 --- a/scraper/scrap.ts +++ b/scraper/scrap.ts @@ -32,12 +32,6 @@ const getPrevPrecio = db .limit(1) .prepare(); -if (process.argv[1].endsWith("/scrap.ts")) { - for (const path of process.argv.slice(2)) { - await parseWarc(path); - } -} - export type Precio = typeof schema.precios.$inferInsert; export type Precioish = Omit< Precio,