mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-29 21:16:19 +00:00
Compare commits
No commits in common. "33d416d921f66442131aee68cbfd1a49761ff426" and "cc0af3011a9a26874aa93a61dcffdce23483df74" have entirely different histories.
33d416d921
...
cc0af3011a
17 changed files with 186 additions and 968 deletions
|
@ -2,23 +2,14 @@ FROM cgr.dev/chainguard/wolfi-base AS base
|
||||||
WORKDIR /usr/src/app
|
WORKDIR /usr/src/app
|
||||||
RUN apk add --no-cache libgcc
|
RUN apk add --no-cache libgcc
|
||||||
|
|
||||||
# tenemos que generar una DB con las migraciones aplicadas para compilar el codigo por sqlx::query!()
|
|
||||||
FROM base as db-build
|
|
||||||
RUN apk add --no-cache nodejs npm
|
|
||||||
RUN npm install --global pnpm
|
|
||||||
COPY db-datos/ .
|
|
||||||
RUN pnpm install
|
|
||||||
RUN DB_PATH=db.db pnpm migrate
|
|
||||||
|
|
||||||
FROM base as rs-build
|
FROM base as rs-build
|
||||||
RUN apk add --no-cache rust build-base sqlite-dev
|
RUN apk add --no-cache rust build-base sqlite-dev
|
||||||
|
|
||||||
COPY scraper-rs/ .
|
COPY scraper-rs/ .
|
||||||
COPY --from=db-build /usr/src/app/db.db .
|
|
||||||
RUN --mount=type=cache,sharing=locked,target=/root/.cargo/git \
|
RUN --mount=type=cache,sharing=locked,target=/root/.cargo/git \
|
||||||
--mount=type=cache,sharing=locked,target=/root/.cargo/registry \
|
--mount=type=cache,sharing=locked,target=/root/.cargo/registry \
|
||||||
--mount=type=cache,sharing=locked,target=/usr/src/app/target \
|
--mount=type=cache,sharing=locked,target=/usr/src/app/target \
|
||||||
DATABASE_URL=sqlite:db.db cargo install --locked --path .
|
cargo install --locked --path .
|
||||||
|
|
||||||
FROM base
|
FROM base
|
||||||
RUN apk add --no-cache sqlite sqlite-libs
|
RUN apk add --no-cache sqlite sqlite-libs
|
||||||
|
|
|
@ -1,3 +0,0 @@
|
||||||
import { getDb } from "./db.js";
|
|
||||||
|
|
||||||
getDb();
|
|
|
@ -2,17 +2,12 @@
|
||||||
import { migrate } from "drizzle-orm/better-sqlite3/migrator";
|
import { migrate } from "drizzle-orm/better-sqlite3/migrator";
|
||||||
import * as schema from "./schema.js";
|
import * as schema from "./schema.js";
|
||||||
import { sql } from "drizzle-orm";
|
import { sql } from "drizzle-orm";
|
||||||
import { existsSync } from "node:fs";
|
|
||||||
import { join } from "node:path";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {import("drizzle-orm/better-sqlite3").BetterSQLite3Database<schema>} db
|
* @param {import("drizzle-orm/better-sqlite3").BetterSQLite3Database<schema>} db
|
||||||
*/
|
*/
|
||||||
export function migrateDb(db) {
|
export function migrateDb(db) {
|
||||||
let path = "drizzle/";
|
migrate(db, { migrationsFolder: "node_modules/db-datos/drizzle" });
|
||||||
if (!existsSync(join(path, "meta/_journal.json")))
|
|
||||||
path = "node_modules/db-datos/drizzle";
|
|
||||||
migrate(db, { migrationsFolder: path });
|
|
||||||
db.run(sql`pragma journal_mode = WAL;`);
|
db.run(sql`pragma journal_mode = WAL;`);
|
||||||
db.run(sql`PRAGMA synchronous = NORMAL;`);
|
db.run(sql`PRAGMA synchronous = NORMAL;`);
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"generate": "drizzle-kit generate:sqlite",
|
"generate": "drizzle-kit generate:sqlite",
|
||||||
"migrate": "node migrate-cli.js"
|
"migrate": "node db.js"
|
||||||
},
|
},
|
||||||
"keywords": [],
|
"keywords": [],
|
||||||
"author": "",
|
"author": "",
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
DATABASE_URL=sqlite:../sqlite.db
|
|
|
@ -1,12 +0,0 @@
|
||||||
{
|
|
||||||
"db_name": "SQLite",
|
|
||||||
"query": "INSERT INTO producto_urls(url, first_seen, last_seen)\n VALUES (?1, ?2, ?2)\n ON CONFLICT(url) DO UPDATE SET last_seen=?2;",
|
|
||||||
"describe": {
|
|
||||||
"columns": [],
|
|
||||||
"parameters": {
|
|
||||||
"Right": 2
|
|
||||||
},
|
|
||||||
"nullable": []
|
|
||||||
},
|
|
||||||
"hash": "08d55fc80c8a6ad73d311e8b1cd535425e5c2f39cf98735b5f67cb91d01937ce"
|
|
||||||
}
|
|
|
@ -1,12 +0,0 @@
|
||||||
{
|
|
||||||
"db_name": "SQLite",
|
|
||||||
"query": "INSERT INTO db_best_selling(fetched_at, category, eans_json)\n VALUES (?1, ?2, ?3);",
|
|
||||||
"describe": {
|
|
||||||
"columns": [],
|
|
||||||
"parameters": {
|
|
||||||
"Right": 3
|
|
||||||
},
|
|
||||||
"nullable": []
|
|
||||||
},
|
|
||||||
"hash": "144f4622ac9a937aa4885ceb5a67f6c0a78e7e025cf152a8c176b9fd1de241da"
|
|
||||||
}
|
|
|
@ -1,20 +0,0 @@
|
||||||
{
|
|
||||||
"db_name": "SQLite",
|
|
||||||
"query": "SELECT url FROM producto_urls WHERE url LIKE ?1;",
|
|
||||||
"describe": {
|
|
||||||
"columns": [
|
|
||||||
{
|
|
||||||
"name": "url",
|
|
||||||
"ordinal": 0,
|
|
||||||
"type_info": "Text"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"parameters": {
|
|
||||||
"Right": 1
|
|
||||||
},
|
|
||||||
"nullable": [
|
|
||||||
false
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"hash": "aa5c2a04aec149d88f6e25a9bd7df4e257f3c9b0efa62c8342d077d69d826a69"
|
|
||||||
}
|
|
|
@ -1,12 +0,0 @@
|
||||||
{
|
|
||||||
"db_name": "SQLite",
|
|
||||||
"query": "INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
|
|
||||||
"describe": {
|
|
||||||
"columns": [],
|
|
||||||
"parameters": {
|
|
||||||
"Right": 9
|
|
||||||
},
|
|
||||||
"nullable": []
|
|
||||||
},
|
|
||||||
"hash": "d0c3a557a81f6685b242ed0be8e8c67b47ec8a575d2a14a487b3294e0faec438"
|
|
||||||
}
|
|
|
@ -1,20 +0,0 @@
|
||||||
{
|
|
||||||
"db_name": "SQLite",
|
|
||||||
"query": "SELECT ean FROM precios WHERE url = ?1;",
|
|
||||||
"describe": {
|
|
||||||
"columns": [
|
|
||||||
{
|
|
||||||
"name": "ean",
|
|
||||||
"ordinal": 0,
|
|
||||||
"type_info": "Text"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"parameters": {
|
|
||||||
"Right": 1
|
|
||||||
},
|
|
||||||
"nullable": [
|
|
||||||
false
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"hash": "f249765f2fb013a81a4157a6ce19744a8d5f70c83ed9ddddfd55009136088a52"
|
|
||||||
}
|
|
755
scraper-rs/Cargo.lock
generated
755
scraper-rs/Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -12,7 +12,8 @@ base64 = "0.21.7"
|
||||||
chrono = "0.4.32"
|
chrono = "0.4.32"
|
||||||
clap = { version = "4.4.15", features = ["derive"] }
|
clap = { version = "4.4.15", features = ["derive"] }
|
||||||
cron = "0.12.0"
|
cron = "0.12.0"
|
||||||
sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite" ] }
|
deadpool = "0.10.0"
|
||||||
|
deadpool-sqlite = "0.7.0"
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
html-escape = "0.2.13"
|
html-escape = "0.2.13"
|
||||||
itertools = "0.12.0"
|
itertools = "0.12.0"
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::{build_client, db::Db, sites::vtex, supermercado::Supermercado};
|
use crate::{build_client, sites::vtex, supermercado::Supermercado};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use clap::ValueEnum;
|
use clap::ValueEnum;
|
||||||
|
use deadpool_sqlite::Pool;
|
||||||
use futures::{stream, FutureExt, StreamExt, TryStreamExt};
|
use futures::{stream, FutureExt, StreamExt, TryStreamExt};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
@ -48,11 +49,21 @@ pub struct BestSellingRecord {
|
||||||
pub eans: Vec<String>,
|
pub eans: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_best_selling_eans(db: &Db, urls: Vec<String>) -> anyhow::Result<Vec<String>> {
|
async fn get_best_selling_eans(pool: &Pool, urls: Vec<String>) -> anyhow::Result<Vec<String>> {
|
||||||
let mut eans: Vec<String> = Vec::new();
|
let mut eans: Vec<String> = Vec::new();
|
||||||
|
|
||||||
for url in urls {
|
for url in urls {
|
||||||
let ean = db.get_ean_by_url(&url).await?;
|
let q = url.clone();
|
||||||
|
let ean = pool
|
||||||
|
.get()
|
||||||
|
.await?
|
||||||
|
.interact(move |conn| {
|
||||||
|
conn.prepare(r#"SELECT ean FROM precios WHERE url = ?1;"#)?
|
||||||
|
.query_map(rusqlite::params![q], |r| r.get::<_, String>(0))
|
||||||
|
.map(|r| r.map(|r| r.unwrap()).next())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
match ean {
|
match ean {
|
||||||
Some(e) => eans.push(e),
|
Some(e) => eans.push(e),
|
||||||
None => warn!("No encontré EAN para {}", url),
|
None => warn!("No encontré EAN para {}", url),
|
||||||
|
@ -64,13 +75,13 @@ async fn get_best_selling_eans(db: &Db, urls: Vec<String>) -> anyhow::Result<Vec
|
||||||
|
|
||||||
async fn try_get_best_selling_eans(
|
async fn try_get_best_selling_eans(
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
db: Db,
|
pool: Pool,
|
||||||
supermercado: &Supermercado,
|
supermercado: &Supermercado,
|
||||||
category: &Category,
|
category: &Category,
|
||||||
) -> anyhow::Result<Option<Vec<String>>> {
|
) -> anyhow::Result<Option<Vec<String>>> {
|
||||||
if let Some(query) = category.query(supermercado) {
|
if let Some(query) = category.query(supermercado) {
|
||||||
let urls = vtex::get_best_selling_by_category(&client, supermercado.host(), query).await?;
|
let urls = vtex::get_best_selling_by_category(&client, supermercado.host(), query).await?;
|
||||||
let eans = get_best_selling_eans(&db, urls).await?;
|
let eans = get_best_selling_eans(&pool, urls).await?;
|
||||||
Ok(Some(eans))
|
Ok(Some(eans))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
@ -96,18 +107,18 @@ fn rank_eans(eans: Vec<Vec<String>>) -> Vec<String> {
|
||||||
.collect_vec()
|
.collect_vec()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_all_best_selling(db: &Db) -> anyhow::Result<Vec<BestSellingRecord>> {
|
pub async fn get_all_best_selling(pool: &Pool) -> anyhow::Result<Vec<BestSellingRecord>> {
|
||||||
let client = &build_client();
|
let client = &build_client();
|
||||||
|
|
||||||
stream::iter(Category::value_variants())
|
stream::iter(Category::value_variants())
|
||||||
.map(|category| {
|
.map(|category| {
|
||||||
stream::iter(Supermercado::value_variants())
|
stream::iter(Supermercado::value_variants())
|
||||||
.map(|supermercado| {
|
.map(|supermercado| {
|
||||||
let db = db.clone();
|
let pool = pool.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
tokio::spawn(try_get_best_selling_eans(
|
tokio::spawn(try_get_best_selling_eans(
|
||||||
client,
|
client,
|
||||||
db,
|
pool,
|
||||||
supermercado,
|
supermercado,
|
||||||
category,
|
category,
|
||||||
))
|
))
|
||||||
|
|
|
@ -1,109 +0,0 @@
|
||||||
use std::{
|
|
||||||
env,
|
|
||||||
str::FromStr,
|
|
||||||
time::{SystemTime, UNIX_EPOCH},
|
|
||||||
};
|
|
||||||
|
|
||||||
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
|
|
||||||
|
|
||||||
use crate::{best_selling::BestSellingRecord, PrecioPoint};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Db {
|
|
||||||
pool: SqlitePool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Db {
|
|
||||||
pub async fn connect() -> anyhow::Result<Self> {
|
|
||||||
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
|
|
||||||
let pool = sqlx::pool::PoolOptions::new()
|
|
||||||
.max_connections(1)
|
|
||||||
.connect_with(
|
|
||||||
SqliteConnectOptions::from_str(&db_path)?
|
|
||||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
|
||||||
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
|
|
||||||
.optimize_on_close(true, None),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Ok(Self { pool })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn insert_precio(&self, point: PrecioPoint) -> anyhow::Result<()> {
|
|
||||||
sqlx::query!("INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
|
|
||||||
point.ean,
|
|
||||||
point.fetched_at,
|
|
||||||
point.precio_centavos,
|
|
||||||
point.in_stock,
|
|
||||||
point.url,
|
|
||||||
None::<String>,
|
|
||||||
point.parser_version,
|
|
||||||
point.name,
|
|
||||||
point.image_url,
|
|
||||||
).execute(&self.pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_ean_by_url(&self, url: &str) -> anyhow::Result<Option<String>> {
|
|
||||||
Ok(sqlx::query!("SELECT ean FROM precios WHERE url = ?1;", url)
|
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await?
|
|
||||||
.map(|r| r.ean))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_urls_by_domain(&self, domain: &str) -> anyhow::Result<Vec<String>> {
|
|
||||||
let query = format!("%{}%", domain);
|
|
||||||
Ok(
|
|
||||||
sqlx::query!("SELECT url FROM producto_urls WHERE url LIKE ?1;", query)
|
|
||||||
.fetch_all(&self.pool)
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.map(|r| r.url)
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn save_producto_urls(&self, urls: Vec<String>) -> anyhow::Result<()> {
|
|
||||||
let now: i64 = now_ms().try_into()?;
|
|
||||||
let mut tx = self.pool.begin().await?;
|
|
||||||
for url in urls {
|
|
||||||
sqlx::query!(
|
|
||||||
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
|
|
||||||
VALUES (?1, ?2, ?2)
|
|
||||||
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
|
|
||||||
url,
|
|
||||||
now
|
|
||||||
)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
tx.commit().await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn save_best_selling(&self, records: Vec<BestSellingRecord>) -> anyhow::Result<()> {
|
|
||||||
let mut tx = self.pool.begin().await?;
|
|
||||||
for record in records {
|
|
||||||
let fetched_at = record.fetched_at.timestamp_millis();
|
|
||||||
let category = record.category.id();
|
|
||||||
let eans_json = serde_json::Value::from(record.eans).to_string();
|
|
||||||
sqlx::query!(
|
|
||||||
r#"INSERT INTO db_best_selling(fetched_at, category, eans_json)
|
|
||||||
VALUES (?1, ?2, ?3);"#,
|
|
||||||
fetched_at,
|
|
||||||
category,
|
|
||||||
eans_json
|
|
||||||
)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
tx.commit().await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now_ms() -> u128 {
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.expect("Time went backwards")
|
|
||||||
.as_millis()
|
|
||||||
}
|
|
|
@ -1,7 +1,8 @@
|
||||||
use again::RetryPolicy;
|
use again::RetryPolicy;
|
||||||
|
use best_selling::BestSellingRecord;
|
||||||
use clap::{Parser, ValueEnum};
|
use clap::{Parser, ValueEnum};
|
||||||
use cron::Schedule;
|
use cron::Schedule;
|
||||||
use db::Db;
|
use deadpool_sqlite::Pool;
|
||||||
use futures::{future, stream, Future, StreamExt};
|
use futures::{future, stream, Future, StreamExt};
|
||||||
use nanoid::nanoid;
|
use nanoid::nanoid;
|
||||||
use reqwest::{header::HeaderMap, StatusCode, Url};
|
use reqwest::{header::HeaderMap, StatusCode, Url};
|
||||||
|
@ -72,7 +73,7 @@ async fn scrap_url_cli(url: String) -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
mod best_selling;
|
mod best_selling;
|
||||||
async fn scrap_best_selling_cli() -> anyhow::Result<()> {
|
async fn scrap_best_selling_cli() -> anyhow::Result<()> {
|
||||||
let db = Db::connect().await?;
|
let db = connect_db();
|
||||||
let res = best_selling::get_all_best_selling(&db).await;
|
let res = best_selling::get_all_best_selling(&db).await;
|
||||||
|
|
||||||
println!("Result: {:#?}", res);
|
println!("Result: {:#?}", res);
|
||||||
|
@ -88,14 +89,14 @@ async fn fetch_list_cli(links_list_path: String) -> anyhow::Result<()> {
|
||||||
.map(|s| s.to_owned())
|
.map(|s| s.to_owned())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let db = Db::connect().await?;
|
let pool = connect_db();
|
||||||
let counters = fetch_list(&db, links).await;
|
let counters = fetch_list(&pool, links).await;
|
||||||
|
|
||||||
println!("Finished: {:?}", counters);
|
println!("Finished: {:?}", counters);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
|
async fn fetch_list(pool: &Pool, links: Vec<String>) -> Counters {
|
||||||
let n_coroutines = env::var("N_COROUTINES")
|
let n_coroutines = env::var("N_COROUTINES")
|
||||||
.map_or(Ok(24), |s| s.parse::<usize>())
|
.map_or(Ok(24), |s| s.parse::<usize>())
|
||||||
.expect("N_COROUTINES no es un número");
|
.expect("N_COROUTINES no es un número");
|
||||||
|
@ -104,9 +105,9 @@ async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
|
||||||
|
|
||||||
stream::iter(links)
|
stream::iter(links)
|
||||||
.map(|url| {
|
.map(|url| {
|
||||||
let db = db.clone();
|
let pool = pool.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
tokio::spawn(fetch_and_save(client, url, db))
|
tokio::spawn(fetch_and_save(client, url, pool))
|
||||||
})
|
})
|
||||||
.buffer_unordered(n_coroutines)
|
.buffer_unordered(n_coroutines)
|
||||||
.fold(Counters::default(), move |x, y| {
|
.fold(Counters::default(), move |x, y| {
|
||||||
|
@ -120,7 +121,11 @@ async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
mod db;
|
fn connect_db() -> Pool {
|
||||||
|
let db_path = env::var("DB_PATH").unwrap_or("../sqlite.db".to_string());
|
||||||
|
let cfg = deadpool_sqlite::Config::new(db_path);
|
||||||
|
cfg.create_pool(deadpool_sqlite::Runtime::Tokio1).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
struct Counters {
|
struct Counters {
|
||||||
|
@ -129,13 +134,26 @@ struct Counters {
|
||||||
skipped: u64,
|
skipped: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_and_save(client: reqwest::Client, url: String, db: Db) -> Counters {
|
async fn fetch_and_save(client: reqwest::Client, url: String, pool: Pool) -> Counters {
|
||||||
let res = fetch_and_parse(&client, url.clone()).await;
|
let res = fetch_and_parse(&client, url.clone()).await;
|
||||||
let mut counters = Counters::default();
|
let mut counters = Counters::default();
|
||||||
match res {
|
match res {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
counters.success += 1;
|
counters.success += 1;
|
||||||
db.insert_precio(res).await.unwrap();
|
pool.get().await.unwrap().interact(move |conn| conn.execute(
|
||||||
|
"INSERT INTO precios(ean, fetched_at, precio_centavos, in_stock, url, warc_record_id, parser_version, name, image_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9);",
|
||||||
|
rusqlite::params![
|
||||||
|
res.ean,
|
||||||
|
res.fetched_at,
|
||||||
|
res.precio_centavos,
|
||||||
|
res.in_stock,
|
||||||
|
res.url,
|
||||||
|
None::<String>,
|
||||||
|
res.parser_version,
|
||||||
|
res.name,
|
||||||
|
res.image_url,
|
||||||
|
]
|
||||||
|
)).await.unwrap().unwrap();
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
match err.downcast_ref::<reqwest::Error>() {
|
match err.downcast_ref::<reqwest::Error>() {
|
||||||
|
@ -283,7 +301,7 @@ struct AutoTelegram {
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Auto {
|
struct Auto {
|
||||||
db: Db,
|
pool: Pool,
|
||||||
telegram: Option<AutoTelegram>,
|
telegram: Option<AutoTelegram>,
|
||||||
}
|
}
|
||||||
impl Auto {
|
impl Auto {
|
||||||
|
@ -298,7 +316,24 @@ impl Auto {
|
||||||
))
|
))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
let links: Vec<String> = self.db.get_urls_by_domain(supermercado.host()).await?;
|
let links: Vec<String> = {
|
||||||
|
let search = format!("%{}%", supermercado.host());
|
||||||
|
self.pool
|
||||||
|
.get()
|
||||||
|
.await?
|
||||||
|
.interact(move |conn| -> anyhow::Result<Vec<String>> {
|
||||||
|
Ok(conn
|
||||||
|
.prepare(
|
||||||
|
r#"SELECT url FROM producto_urls
|
||||||
|
WHERE url LIKE ?1;"#,
|
||||||
|
)?
|
||||||
|
.query_map(rusqlite::params![search], |r| r.get::<_, String>(0))?
|
||||||
|
.map(|r| r.unwrap())
|
||||||
|
.collect())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?
|
||||||
|
};
|
||||||
// {
|
// {
|
||||||
// let debug_path = PathBuf::from("debug/");
|
// let debug_path = PathBuf::from("debug/");
|
||||||
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
||||||
|
@ -310,7 +345,7 @@ impl Auto {
|
||||||
// }
|
// }
|
||||||
{
|
{
|
||||||
let t0 = now_sec();
|
let t0 = now_sec();
|
||||||
let counters = fetch_list(&self.db, links).await;
|
let counters = fetch_list(&self.pool, links).await;
|
||||||
self.inform(&format!(
|
self.inform(&format!(
|
||||||
"Downloaded {:?}: {:?} (took {})",
|
"Downloaded {:?}: {:?} (took {})",
|
||||||
&supermercado,
|
&supermercado,
|
||||||
|
@ -333,7 +368,56 @@ impl Auto {
|
||||||
|
|
||||||
async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
|
async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
|
||||||
let urls = get_urls(supermercado).await?;
|
let urls = get_urls(supermercado).await?;
|
||||||
self.db.save_producto_urls(urls).await?;
|
self.pool
|
||||||
|
.get()
|
||||||
|
.await?
|
||||||
|
.interact(|conn| -> Result<(), anyhow::Error> {
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
{
|
||||||
|
let mut stmt = tx.prepare(
|
||||||
|
r#"INSERT INTO producto_urls(url, first_seen, last_seen)
|
||||||
|
VALUES (?1, ?2, ?2)
|
||||||
|
ON CONFLICT(url) DO UPDATE SET last_seen=?2;"#,
|
||||||
|
)?;
|
||||||
|
let now: u64 = now_ms().try_into()?;
|
||||||
|
for url in urls {
|
||||||
|
stmt.execute(rusqlite::params![url, now])?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.commit()?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_best_selling(&self, best_selling: Vec<BestSellingRecord>) -> anyhow::Result<()> {
|
||||||
|
self.pool
|
||||||
|
.get()
|
||||||
|
.await?
|
||||||
|
.interact(move |conn| -> Result<(), anyhow::Error> {
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
{
|
||||||
|
let mut stmt = tx.prepare(
|
||||||
|
r#"INSERT INTO db_best_selling(fetched_at, category, eans_json)
|
||||||
|
VALUES (?1, ?2, ?3);"#,
|
||||||
|
)?;
|
||||||
|
for record in best_selling {
|
||||||
|
let eans_json = serde_json::Value::from(record.eans).to_string();
|
||||||
|
let fetched_at = record.fetched_at.timestamp_millis();
|
||||||
|
stmt.execute(rusqlite::params![
|
||||||
|
fetched_at,
|
||||||
|
record.category.id(),
|
||||||
|
eans_json
|
||||||
|
])?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.commit()?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,22 +438,20 @@ impl Auto {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn auto_cli() -> anyhow::Result<()> {
|
async fn auto_cli() -> anyhow::Result<()> {
|
||||||
let auto = {
|
let db = connect_db();
|
||||||
let db = Db::connect().await?;
|
let telegram = {
|
||||||
let telegram = {
|
match (
|
||||||
match (
|
env::var("TELEGRAM_BOT_TOKEN"),
|
||||||
env::var("TELEGRAM_BOT_TOKEN"),
|
env::var("TELEGRAM_BOT_CHAT_ID"),
|
||||||
env::var("TELEGRAM_BOT_CHAT_ID"),
|
) {
|
||||||
) {
|
(Ok(token), Ok(chat_id)) => Some(AutoTelegram { token, chat_id }),
|
||||||
(Ok(token), Ok(chat_id)) => Some(AutoTelegram { token, chat_id }),
|
_ => {
|
||||||
_ => {
|
tracing::warn!("No token or chat_id for telegram");
|
||||||
tracing::warn!("No token or chat_id for telegram");
|
None
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
Auto { db, telegram }
|
|
||||||
};
|
};
|
||||||
|
let auto = Auto { pool: db, telegram };
|
||||||
auto.inform("[auto] Empezando scrap").await;
|
auto.inform("[auto] Empezando scrap").await;
|
||||||
let handles: Vec<_> = Supermercado::value_variants()
|
let handles: Vec<_> = Supermercado::value_variants()
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -380,10 +462,10 @@ async fn auto_cli() -> anyhow::Result<()> {
|
||||||
let best_selling = auto
|
let best_selling = auto
|
||||||
.inform_time(
|
.inform_time(
|
||||||
"Downloaded best selling",
|
"Downloaded best selling",
|
||||||
best_selling::get_all_best_selling(&auto.db),
|
best_selling::get_all_best_selling(&auto.pool),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
auto.db.save_best_selling(best_selling).await?;
|
auto.save_best_selling(best_selling).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -412,8 +494,8 @@ mod sites;
|
||||||
struct PrecioPoint {
|
struct PrecioPoint {
|
||||||
ean: String,
|
ean: String,
|
||||||
// unix
|
// unix
|
||||||
fetched_at: i64,
|
fetched_at: u64,
|
||||||
precio_centavos: Option<i64>,
|
precio_centavos: Option<u64>,
|
||||||
in_stock: Option<bool>,
|
in_stock: Option<bool>,
|
||||||
url: String,
|
url: String,
|
||||||
parser_version: u16,
|
parser_version: u16,
|
||||||
|
@ -421,9 +503,13 @@ struct PrecioPoint {
|
||||||
image_url: Option<String>,
|
image_url: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn now_sec() -> i64 {
|
fn now_sec() -> u64 {
|
||||||
since_the_epoch().as_secs().try_into().unwrap()
|
since_the_epoch().as_secs()
|
||||||
}
|
}
|
||||||
|
fn now_ms() -> u128 {
|
||||||
|
since_the_epoch().as_millis()
|
||||||
|
}
|
||||||
|
|
||||||
fn since_the_epoch() -> Duration {
|
fn since_the_epoch() -> Duration {
|
||||||
SystemTime::now()
|
SystemTime::now()
|
||||||
.duration_since(UNIX_EPOCH)
|
.duration_since(UNIX_EPOCH)
|
||||||
|
|
|
@ -11,9 +11,9 @@ pub fn get_meta_content<'a>(dom: &'a VDom<'a>, prop: &str) -> Option<Cow<'a, str
|
||||||
.map(|s| s.as_utf8_str())
|
.map(|s| s.as_utf8_str())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn price_from_meta(dom: &tl::VDom<'_>) -> Result<Option<i64>, anyhow::Error> {
|
pub fn price_from_meta(dom: &tl::VDom<'_>) -> Result<Option<u64>, anyhow::Error> {
|
||||||
let precio_centavos = get_meta_content(dom, "product:price:amount")
|
let precio_centavos = get_meta_content(dom, "product:price:amount")
|
||||||
.map(|s| s.parse::<f64>().map(|f| (f * 100.0) as i64))
|
.map(|s| s.parse::<f64>().map(|f| (f * 100.0) as u64))
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
Ok(precio_centavos)
|
Ok(precio_centavos)
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
||||||
})
|
})
|
||||||
.transpose()
|
.transpose()
|
||||||
.context("Parseando precio")?
|
.context("Parseando precio")?
|
||||||
.map(|f| (f * 100.0) as i64);
|
.map(|f| (f * 100.0) as u64);
|
||||||
|
|
||||||
let in_stock = Some(
|
let in_stock = Some(
|
||||||
dom.query_selector(".product_not_available")
|
dom.query_selector(".product_not_available")
|
||||||
|
|
Loading…
Reference in a new issue