rustificar todo

This commit is contained in:
Cat /dev/Nulo 2024-01-12 13:01:27 -03:00
parent d233dbd259
commit 0144a56158
3 changed files with 216 additions and 41 deletions

View file

@ -1,14 +1,7 @@
FROM cgr.dev/chainguard/wolfi-base AS base
WORKDIR /usr/src/app
RUN apk add --no-cache bun libgcc
RUN apk add --no-cache libgcc
FROM base as build
ENV NODE_ENV=production
COPY . .
RUN bun install --frozen-lockfile
RUN bun build scraper/cli.ts --target=bun --outfile=/tmp/cli.build.js
# nightly porque usamos tl con `simd` activado
FROM base as rs-build
RUN apk add --no-cache rust build-base sqlite-dev
@ -19,11 +12,8 @@ FROM base
RUN apk add --no-cache sqlite sqlite-libs
# Scraper
COPY --from=build /tmp/cli.build.js /bin/scraper
COPY --from=build /usr/src/app/db-datos/drizzle /bin/drizzle
COPY --from=rs-build /root/.cargo/bin/scraper-rs /usr/local/bin/scraper-rs
ENV NODE_ENV=production
ENV DB_PATH=/db/db.db
CMD ["bun", "/bin/scraper", "cron"]
CMD ["scraper-rs", "cron"]

View file

@ -1,6 +1,7 @@
use again::RetryPolicy;
use async_channel::Receiver;
use clap::{Parser, ValueEnum};
use futures::{stream, StreamExt, TryStreamExt};
use nanoid::nanoid;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
@ -13,8 +14,9 @@ use std::{
time::Duration,
};
use thiserror::Error;
use tokio::time;
#[derive(ValueEnum, Clone)]
#[derive(ValueEnum, Clone, Debug)]
enum Supermercado {
Dia,
Jumbo,
@ -27,6 +29,8 @@ enum Args {
FetchList(FetchListArgs),
ParseFile(ParseFileArgs),
GetUrlList(GetUrlListArgs),
Auto(AutoArgs),
Cron(AutoArgs),
}
#[derive(clap::Args)]
struct FetchListArgs {
@ -41,6 +45,8 @@ struct GetUrlListArgs {
#[arg(value_enum)]
supermercado: Supermercado,
}
#[derive(clap::Args)]
struct AutoArgs {}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -50,6 +56,8 @@ async fn main() -> anyhow::Result<()> {
Args::FetchList(a) => fetch_list_cli(a.list_path).await,
Args::ParseFile(a) => parse_file_cli(a.file_path).await,
Args::GetUrlList(a) => get_url_list_cli(a.supermercado).await,
Args::Auto(_) => auto_cli().await,
Args::Cron(_) => cron_cli().await,
}
}
@ -62,14 +70,18 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
.map(|s| s.to_owned())
.collect::<Vec<_>>();
let pool = connect_db();
let counters = fetch_list(&pool, links).await;
println!("Finished: {:?}", counters);
Ok(())
}
async fn fetch_list(pool: &Pool<SqliteConnectionManager>, links: Vec<String>) -> Counters {
let (sender, receiver) = async_channel::bounded::<String>(1);
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::new(manager).unwrap();
let n_coroutines = env::var("N_COROUTINES")
.map_or(Ok(128), |s| s.parse::<usize>())
.map_or(Ok(24), |s| s.parse::<usize>())
.expect("N_COROUTINES no es un número");
let handles = (1..n_coroutines)
.map(|_| {
@ -91,9 +103,14 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
counters.errored += c.errored;
counters.skipped += c.skipped;
}
counters
}
println!("Finished: {:?}", counters);
Ok(())
fn connect_db() -> Pool<SqliteConnectionManager> {
let db_path = env::var("DB_PATH").unwrap_or("../scraper/sqlite.db".to_string());
let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::new(manager).unwrap();
pool
}
fn build_client() -> reqwest::Client {
@ -130,7 +147,10 @@ async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Co
}
Err(err) => {
match err.downcast_ref::<FetchError>() {
Some(FetchError::HttpStatus(StatusCode::NOT_FOUND)) => counters.skipped += 1,
Some(FetchError::Http(e)) => match e.status() {
Some(StatusCode::NOT_FOUND) => counters.skipped += 1,
_ => counters.errored += 1,
},
_ => counters.errored += 1,
}
@ -146,20 +166,15 @@ async fn worker(rx: Receiver<String>, pool: Pool<SqliteConnectionManager>) -> Co
enum FetchError {
#[error("reqwest error")]
Http(#[from] reqwest::Error),
#[error("http status: {0}")]
HttpStatus(reqwest::StatusCode),
#[error("parse error")]
Parse(#[from] SimpleError),
#[error("tl error")]
Tl(#[from] tl::ParseError),
}
pub async fn do_request(client: &reqwest::Client, url: &str) -> anyhow::Result<reqwest::Response> {
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
let request = client.get(url).build()?;
let response = client.execute(request).await?;
if !response.status().is_success() {
bail!(FetchError::HttpStatus(response.status()));
}
let response = client.execute(request).await?.error_for_status()?;
Ok(response)
}
@ -221,12 +236,7 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
}
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
let urls = match supermercado {
Supermercado::Dia => sites::dia::get_urls().await?,
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
_ => todo!(),
};
let urls = get_urls(&supermercado).await?;
urls.iter().for_each(|s| {
println!("{}", s);
});
@ -234,6 +244,15 @@ async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
Ok(())
}
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
Ok(match supermercado {
Supermercado::Dia => sites::dia::get_urls().await?,
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
Supermercado::Coto => sites::coto::get_urls().await?,
})
}
async fn scrap_url(
client: &reqwest::Client,
url: String,
@ -255,6 +274,105 @@ async fn scrap_url(
}
}
struct Auto {
pool: Pool<SqliteConnectionManager>,
telegram_token: String,
telegram_chat_id: String,
}
impl Auto {
async fn download_supermercado(self: &Self, supermercado: Supermercado) -> anyhow::Result<()> {
{
let t0 = now_sec();
self.get_and_save_urls(&supermercado).await?;
self.inform(&format!(
"Downloaded url list {:?} (took {})",
&supermercado,
now_sec() - t0
))
.await;
}
let links: Vec<String> = self
.pool
.get()?
.prepare(r#"SELECT url FROM producto_urls;"#)?
.query_map([], |r| r.get::<_, String>(0))?
.map(|r| r.unwrap())
.collect();
{
let t0 = now_sec();
let counters = fetch_list(&self.pool, links).await;
self.inform(&format!(
"Downloaded {:?}: {:?} (took {})",
&supermercado,
counters,
now_sec() - t0
))
.await;
}
Ok(())
}
async fn get_and_save_urls(self: &Self, supermercado: &Supermercado) -> anyhow::Result<()> {
let urls = get_urls(supermercado).await?;
let connection = &mut self.pool.get()?;
let tx = connection.transaction()?;
{
let mut stmt = tx.prepare(
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
VALUES (?1, ?2, ?2)
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
)?;
let now: u64 = now_ms().try_into()?;
for url in urls {
stmt.execute(rusqlite::params![url, now])?;
}
}
tx.commit()?;
Ok(())
}
async fn inform(self: &Self, msg: &str) {
println!("{}", msg);
let u = Url::parse_with_params(
&format!(
"https://api.telegram.org/bot{}/sendMessage",
self.telegram_token
),
&[
("chat_id", self.telegram_chat_id.clone()),
("text", msg.to_string()),
],
)
.unwrap();
reqwest::get(u).await.unwrap();
}
}
async fn auto_cli() -> anyhow::Result<()> {
let db = connect_db();
let auto = Auto {
pool: db,
telegram_token: env::var("TELEGRAM_BOT_TOKEN")?,
telegram_chat_id: env::var("TELEGRAM_BOT_CHAT_ID")?,
};
auto.inform("[auto] Empezando scrap").await;
stream::iter(Supermercado::value_variants().iter())
.map(|s| auto.download_supermercado(s.to_owned()))
.buffer_unordered(64)
.try_collect()
.await?;
Ok(())
}
async fn cron_cli() -> anyhow::Result<()> {
let mut interval = time::interval(std::time::Duration::from_secs(60 * 60 * 24));
loop {
interval.tick().await;
auto_cli().await.unwrap();
}
}
use std::time::{SystemTime, UNIX_EPOCH};
mod sites;
@ -273,9 +391,14 @@ struct PrecioPoint {
}
fn now_sec() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_secs()
since_the_epoch().as_secs()
}
fn now_ms() -> u128 {
since_the_epoch().as_millis()
}
fn since_the_epoch() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
}

View file

@ -1,6 +1,9 @@
use anyhow::Context;
use anyhow::{anyhow, Context};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use reqwest::Url;
use crate::PrecioPoint;
use crate::{build_client, do_request, get_retry_policy, PrecioPoint};
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
let ean = dom
@ -71,3 +74,62 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
url,
})
}
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
// let (sender, recv) = async_channel::unbounded();
let client = build_client();
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
let page_size = 100;
let handles: Vec<Vec<String>> = stream::iter(0..29000 / page_size)
.map(|i| {
let mut u = initial.clone();
u.query_pairs_mut()
.append_pair("No", &(i * page_size).to_string())
.append_pair("Nrpp", &(page_size).to_string())
.finish();
let client = &client;
async move {
let text = get_retry_policy()
.retry(|| do_request(client, u.as_str()).and_then(|r| r.text()))
.await?;
let dom = tl::parse(&text, tl::ParserOptions::default())?;
let list: Vec<String> = dom
.query_selector(".product_info_container")
.unwrap()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.filter_map(|t| -> Option<anyhow::Result<String>> {
t.children()
.top()
.iter()
.filter_map(|h| h.get(dom.parser()))
.filter_map(|n| n.as_tag())
.find(|t| t.name() == "a")
.map(|t| {
t.attributes()
.get("href")
.flatten()
.ok_or(anyhow!("No tiene href="))
})
.map(|s| {
Ok(Url::options()
.base_url(Some(&Url::parse("https://www.cotodigital3.com.ar")?))
.parse(s?.as_utf8_str().as_ref())?
.to_string())
})
})
.try_collect()?;
Ok::<Vec<String>, anyhow::Error>(list)
}
})
.buffer_unordered(8)
.try_collect()
.await?;
let mut total: Vec<String> = vec![];
for mut urls in handles {
total.append(&mut urls);
}
Ok(total.into_iter().unique().collect())
}