conectar auto con scraper-rs!

This commit is contained in:
Cat /dev/Nulo 2024-01-11 17:20:12 -03:00
parent cfae80cb9a
commit 4b211c89af
4 changed files with 150 additions and 72 deletions

43
scraper-rs/Cargo.lock generated
View file

@ -908,6 +908,28 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.7.3" version = "0.7.3"
@ -1111,6 +1133,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -1126,6 +1157,8 @@ dependencies = [
"async-channel", "async-channel",
"clap", "clap",
"nanoid", "nanoid",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"rusqlite", "rusqlite",
@ -1514,6 +1547,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom 0.2.11",
"rand 0.8.5",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View file

@ -11,6 +11,8 @@ anyhow = "1.0.79"
async-channel = "2.1.1" async-channel = "2.1.1"
clap = { version = "4.4.15", features = ["derive"] } clap = { version = "4.4.15", features = ["derive"] }
nanoid = "0.4.0" nanoid = "0.4.0"
r2d2 = "0.8.10"
r2d2_sqlite = "0.23.0"
rand = "0.8.5" rand = "0.8.5"
# lol_html = "1.2.0" # lol_html = "1.2.0"
reqwest = { version = "0.11.23", default-features = false, features = [ reqwest = { version = "0.11.23", default-features = false, features = [

View file

@ -1,9 +1,10 @@
use again::RetryPolicy; use again::RetryPolicy;
use async_channel::{Receiver, Sender}; use async_channel::Receiver;
use clap::Parser; use clap::Parser;
use nanoid::nanoid; use nanoid::nanoid;
use reqwest::Url; use r2d2::Pool;
use rusqlite::Connection; use r2d2_sqlite::SqliteConnectionManager;
use reqwest::{StatusCode, Url};
use simple_error::{bail, SimpleError}; use simple_error::{bail, SimpleError};
use std::{ use std::{
env::{self}, env::{self},
@ -12,7 +13,6 @@ use std::{
time::Duration, time::Duration,
}; };
use thiserror::Error; use thiserror::Error;
use tl::VDom;
#[derive(Parser)] // requires `derive` feature #[derive(Parser)] // requires `derive` feature
enum Args { enum Args {
@ -47,34 +47,37 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let handle = {
let (sender, receiver) = async_channel::bounded::<String>(1); let (sender, receiver) = async_channel::bounded::<String>(1);
let (res_sender, res_receiver) = async_channel::unbounded::<PrecioPoint>();
let mut handles = Vec::new(); let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
for _ in 1..env::var("N_COROUTINES") let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::new(manager).unwrap();
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(128), |s| s.parse::<usize>()) .map_or(Ok(128), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número") .expect("N_COROUTINES no es un número");
{ let handles = (1..n_coroutines)
.map(|_| {
let rx = receiver.clone(); let rx = receiver.clone();
let tx = res_sender.clone(); let pool = pool.clone();
handles.push(tokio::spawn(worker(rx, tx))); tokio::spawn(worker(rx, pool))
} })
.collect::<Vec<_>>();
let db_writer_handle = tokio::spawn(db_writer(res_receiver));
for link in links { for link in links {
sender.send_blocking(link).unwrap(); sender.send_blocking(link).unwrap();
} }
sender.close(); sender.close();
let mut counters = Counters::default();
for handle in handles { for handle in handles {
handle.await.unwrap(); let c = handle.await.unwrap();
counters.success += c.success;
counters.errored += c.errored;
counters.skipped += c.skipped;
} }
db_writer_handle println!("Finished: {:?}", counters);
};
handle.await.unwrap();
Ok(()) Ok(())
} }
@ -82,19 +85,46 @@ fn build_client() -> reqwest::Client {
reqwest::ClientBuilder::default().build().unwrap() reqwest::ClientBuilder::default().build().unwrap()
} }
async fn worker(rx: Receiver<String>, tx: Sender<PrecioPoint>) { #[derive(Default, Debug)]
struct Counters {
success: u64,
errored: u64,
skipped: u64,
}
async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Counters {
let client = build_client(); let client = build_client();
let mut counters = Counters::default();
while let Ok(url) = rx.recv().await { while let Ok(url) = rx.recv().await {
let res = fetch_and_parse(&client, url.clone()).await; let res = fetch_and_parse(&client, url.clone()).await;
match res { match res {
Ok(ex) => { Ok(res) => {
tx.send(ex).await.unwrap(); counters.success += 1;
pool.get().unwrap().execute("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",rusqlite::params![
res.ean,
res.fetched_at,
res.precio_centavos,
res.in_stock,
res.url,
None::<String>,
res.parser_version,
res.name,
res.image_url,
]).unwrap();
} }
Err(err) => { Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1,
_ => counters.errored += 1,
}
tracing::error!(error=%err, url=url); tracing::error!(error=%err, url=url);
} }
} }
} }
counters
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -191,17 +221,6 @@ async fn scrap_url(
} }
} }
async fn db_writer(rx: Receiver<PrecioPoint>) {
// let conn = Connection::open("../scraper/sqlite.db").unwrap();
// let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
let mut n = 0;
while let Ok(res) = rx.recv().await {
n += 1;
println!("{}", n);
// println!("{:?}", res)
}
}
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
mod sites; mod sites;

View file

@ -4,7 +4,6 @@ import { join } from "node:path";
import { Supermercado, hosts, supermercados } from "db-datos/supermercado.js"; import { Supermercado, hosts, supermercados } from "db-datos/supermercado.js";
import PQueue from "p-queue"; import PQueue from "p-queue";
import { formatDuration, intervalToDuration } from "date-fns"; import { formatDuration, intervalToDuration } from "date-fns";
import { downloadList } from "./scrap.js";
import { db } from "db-datos/db.js"; import { db } from "db-datos/db.js";
import { like } from "drizzle-orm"; import { like } from "drizzle-orm";
import { productoUrls } from "db-datos/schema.js"; import { productoUrls } from "db-datos/schema.js";
@ -12,13 +11,16 @@ import { scrapDiaProducts } from "../link-scrapers/dia.js";
import { scrapCotoProducts } from "../link-scrapers/coto.js"; import { scrapCotoProducts } from "../link-scrapers/coto.js";
import { scrapCarrefourProducts } from "../link-scrapers/carrefour.js"; import { scrapCarrefourProducts } from "../link-scrapers/carrefour.js";
import { scrapJumboProducts } from "../link-scrapers/jumbo.js"; import { scrapJumboProducts } from "../link-scrapers/jumbo.js";
import { readableStreamToText } from "bun";
// hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU // hacemos una cola para el scrapeo para no tener varios writers a la BD y no sobrecargar la CPU
const scrapQueue = new PQueue({ concurrency: 4 }); const scrapQueue = new PQueue({ concurrency: 1 });
export async function auto() { export async function auto() {
const a = new Auto(); const a = new Auto();
await Promise.all(supermercados.map((supr) => a.downloadList(supr))); await Promise.all(
supermercados.filter((x) => x === "Dia").map((supr) => a.downloadList(supr))
);
} }
class Auto { class Auto {
@ -38,11 +40,7 @@ class Auto {
this.inform("[auto] Empezando scrap"); this.inform("[auto] Empezando scrap");
} }
async downloadList(supermercado: Supermercado) { async scrapUrls(supermercado: Supermercado) {
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-download-"));
let listPath: string;
{
const t0 = performance.now(); const t0 = performance.now();
switch (supermercado) { switch (supermercado) {
case "Dia": case "Dia":
@ -63,7 +61,14 @@ class Auto {
); );
} }
listPath = join(ctxPath, `lista-${supermercado}.txt`); async downloadList(supermercado: Supermercado) {
const ctxPath = await mkdtemp(join(tmpdir(), "preciazo-scraper-download-"));
await scrapQueue.add(async () => {
await this.scrapUrls(supermercado);
});
const listPath = join(ctxPath, `lista-${supermercado}.txt`);
const host = Object.entries(hosts).find( const host = Object.entries(hosts).find(
([host, supe]) => supe === supermercado ([host, supe]) => supe === supermercado
)![0]; )![0];
@ -82,16 +87,25 @@ class Auto {
async scrapAndInform({ listPath }: { listPath: string }) { async scrapAndInform({ listPath }: { listPath: string }) {
const res = await scrapQueue.add(async () => { const res = await scrapQueue.add(async () => {
const t0 = performance.now(); const t0 = performance.now();
const progress = await downloadList(listPath);
return { took: performance.now() - t0, progress }; const sub = Bun.spawn({
cmd: ["scraper-rs", "fetch-list", listPath],
stdio: ["ignore", "pipe", "inherit"],
});
const text = await readableStreamToText(sub.stdout);
const code = await sub.exited;
if (code !== 0) throw new Error(`scraper-rs threw ${code}`);
return { took: performance.now() - t0, text };
}); });
if (res) { if (res) {
const { took, progress } = res; const { took, text } = res;
this.inform( this.inform(
`Procesado ${listPath} (${progress.done} ok, ${ `Procesado ${listPath} (${text}) (tardó ${formatMs(took)})`
progress.skipped //(${progress.done} ok, ${
} skipped, ${progress.errors.length} errores) (tardó ${formatMs(took)})` // progress.skipped
// } skipped, ${progress.errors.length} errores)
); );
} else { } else {
this.inform(`Algo falló en ${listPath}`); this.inform(`Algo falló en ${listPath}`);