mirror of
https://github.com/catdevnull/preciazo.git
synced 2024-11-22 14:16:19 +00:00
refactor + proxies
This commit is contained in:
parent
8e04089843
commit
78ac22bd68
8 changed files with 1660 additions and 562 deletions
1459
scraper-rs/Cargo.lock
generated
1459
scraper-rs/Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -18,12 +18,13 @@ html-escape = "0.2.13"
|
|||
itertools = "0.12.0"
|
||||
nanoid = "0.4.0"
|
||||
quick-xml = "0.31.0"
|
||||
rand = "0.8.5"
|
||||
reqwest = { version = "0.11.23", default-features = false, features = [
|
||||
rand = "0.8"
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
"rustls-tls",
|
||||
"gzip",
|
||||
"brotli",
|
||||
"socks",
|
||||
"json",
|
||||
] }
|
||||
rusqlite = "0.30.0"
|
||||
serde = { version = "1.0.193", features = ["derive"] }
|
||||
|
@ -34,3 +35,5 @@ tl = { git = "https://github.com/evertedsphere/tl", branch = "patch-1" }
|
|||
tokio = { version = "1.35.1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
proxy-scraper-checker = { git = "https://github.com/catdevnull/Proxy-Scraper-Checker", rev = "429ca83d137abdf5377a1d22ee85d62bfb00437c" }
|
||||
indicatif = "0.17.8"
|
||||
|
|
102
scraper-rs/src/auto.rs
Normal file
102
scraper-rs/src/auto.rs
Normal file
|
@ -0,0 +1,102 @@
|
|||
use super::fetch_list;
|
||||
use super::now_sec;
|
||||
use super::supermercado::Supermercado;
|
||||
use super::AutoArgs;
|
||||
use super::AutoTelegram;
|
||||
use crate::db::Db;
|
||||
use crate::scraper::Scraper;
|
||||
use futures::Future;
|
||||
use reqwest::Url;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Auto {
|
||||
pub db: Db,
|
||||
pub telegram: Option<AutoTelegram>,
|
||||
pub args: AutoArgs,
|
||||
pub scraper: Scraper,
|
||||
}
|
||||
|
||||
impl Auto {
|
||||
pub async fn download_supermercado(self, supermercado: Supermercado) -> anyhow::Result<()> {
|
||||
{
|
||||
let t0 = now_sec();
|
||||
match self.get_and_save_urls(&supermercado).await {
|
||||
Ok(_) => {
|
||||
self.inform(&format!(
|
||||
"Downloaded url list {:?} (took {})",
|
||||
&supermercado,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await
|
||||
}
|
||||
Err(err) => {
|
||||
self.inform(&format!(
|
||||
"[{:?}] FAILED url list: {:?} (took {})",
|
||||
&supermercado,
|
||||
err,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
let links: Vec<String> = {
|
||||
let mut links = self.db.get_urls_by_domain(supermercado.host()).await?;
|
||||
if let Some(n) = self.args.n_products {
|
||||
links.truncate(n);
|
||||
}
|
||||
links
|
||||
};
|
||||
// {
|
||||
// let debug_path = PathBuf::from("debug/");
|
||||
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
||||
// let file_path = debug_path.join(format!("{}.txt", nanoid!()));
|
||||
// tokio::fs::write(&file_path, &links.join("\n"))
|
||||
// .await
|
||||
// .unwrap();
|
||||
// tracing::info!("Lista de {:?}: {:?}", &supermercado, file_path.display());
|
||||
// }
|
||||
{
|
||||
let t0 = now_sec();
|
||||
let counters = fetch_list(&self.db, links).await;
|
||||
self.inform(&format!(
|
||||
"Downloaded {:?}: {:?} (took {})",
|
||||
&supermercado,
|
||||
counters,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn inform_time<T: Future<Output = R>, R>(&self, msg: &str, action: T) -> R {
|
||||
let t0 = now_sec();
|
||||
let res = action.await;
|
||||
self.inform(&format!("{} (took {})", msg, now_sec() - t0))
|
||||
.await;
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
|
||||
let urls = self.scraper.get_urls_for_supermercado(supermercado).await?;
|
||||
self.db.save_producto_urls(urls).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn inform(&self, msg: &str) {
|
||||
tracing::info!("{}", msg);
|
||||
if let Some(telegram) = &self.telegram {
|
||||
let u = Url::parse_with_params(
|
||||
&format!("https://api.telegram.org/bot{}/sendMessage", telegram.token),
|
||||
&[
|
||||
("chat_id", telegram.chat_id.clone()),
|
||||
("text", msg.to_string()),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
reqwest::get(u).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,12 +1,12 @@
|
|||
use again::RetryPolicy;
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::{Parser, ValueEnum};
|
||||
use cron::Schedule;
|
||||
use db::Db;
|
||||
use futures::{future, stream, Future, StreamExt, TryFutureExt};
|
||||
use futures::{future, stream, StreamExt, TryFutureExt};
|
||||
|
||||
use reqwest::{header::HeaderMap, StatusCode, Url};
|
||||
use simple_error::{bail, SimpleError};
|
||||
use reqwest::{header::HeaderMap, IntoUrl, StatusCode};
|
||||
use scraper::Scraper;
|
||||
use simple_error::SimpleError;
|
||||
use std::{
|
||||
env::{self},
|
||||
fs,
|
||||
|
@ -17,6 +17,10 @@ use thiserror::Error;
|
|||
|
||||
mod supermercado;
|
||||
use supermercado::Supermercado;
|
||||
mod auto;
|
||||
use auto::Auto;
|
||||
mod proxy_client;
|
||||
mod scraper;
|
||||
|
||||
#[derive(Parser)] // requires `derive` feature
|
||||
enum Args {
|
||||
|
@ -69,8 +73,8 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn scrap_url_cli(url: String) -> anyhow::Result<()> {
|
||||
let client = build_client();
|
||||
let res = fetch_and_parse(&client, url.clone()).await;
|
||||
let scraper = Scraper::new();
|
||||
let res = scraper.fetch_and_parse(url.clone()).await;
|
||||
|
||||
println!("Result: {:#?}", res);
|
||||
res.map(|_| ())
|
||||
|
@ -105,13 +109,13 @@ async fn fetch_list(db: &Db, links: Vec<String>) -> Counters {
|
|||
.map_or(Ok(24), |s| s.parse::<usize>())
|
||||
.expect("N_COROUTINES no es un número");
|
||||
|
||||
let client = build_client();
|
||||
let scraper = Scraper::new();
|
||||
|
||||
stream::iter(links)
|
||||
.map(|url| {
|
||||
let db = db.clone();
|
||||
let client = client.clone();
|
||||
tokio::spawn(fetch_and_save(client, url, db))
|
||||
let scraper = scraper.clone();
|
||||
tokio::spawn(fetch_and_save(scraper, url, db))
|
||||
})
|
||||
.buffer_unordered(n_coroutines)
|
||||
.fold(Counters::default(), move |x, y| {
|
||||
|
@ -134,8 +138,8 @@ struct Counters {
|
|||
skipped: u64,
|
||||
}
|
||||
|
||||
async fn fetch_and_save(client: reqwest::Client, url: String, db: Db) -> Counters {
|
||||
let res = fetch_and_parse(&client, url.clone()).await;
|
||||
async fn fetch_and_save(scraper: Scraper, url: String, db: Db) -> Counters {
|
||||
let res = scraper.fetch_and_parse(url.clone()).await;
|
||||
let mut counters = Counters::default();
|
||||
match res {
|
||||
Ok(res) => {
|
||||
|
@ -182,28 +186,14 @@ fn build_client() -> reqwest::Client {
|
|||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
fn build_coto_client() -> reqwest::Client {
|
||||
reqwest::ClientBuilder::default()
|
||||
.timeout(Duration::from_secs(300))
|
||||
.connect_timeout(Duration::from_secs(150))
|
||||
.default_headers(build_header_map())
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
pub async fn do_request(client: &reqwest::Client, url: &str) -> reqwest::Result<reqwest::Response> {
|
||||
pub async fn do_request<U: IntoUrl>(
|
||||
client: &reqwest::Client,
|
||||
url: U,
|
||||
) -> reqwest::Result<reqwest::Response> {
|
||||
let request = client.get(url).build()?;
|
||||
let response = client.execute(request).await?.error_for_status()?;
|
||||
Ok(response)
|
||||
}
|
||||
async fn request_and_body(client: &reqwest::Client, url: &str) -> reqwest::Result<String> {
|
||||
let res = do_request(client, url).await?;
|
||||
res.text().await
|
||||
}
|
||||
pub async fn fetch_body(client: &reqwest::Client, url: &str) -> reqwest::Result<String> {
|
||||
get_fetch_retry_policy()
|
||||
.retry_if(|| request_and_body(client, url), retry_if_wasnt_not_found)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn get_fetch_retry_policy() -> again::RetryPolicy {
|
||||
RetryPolicy::exponential(Duration::from_millis(300))
|
||||
|
@ -222,51 +212,17 @@ pub fn get_parse_retry_policy() -> again::RetryPolicy {
|
|||
pub fn retry_if_wasnt_not_found(err: &reqwest::Error) -> bool {
|
||||
!err.status().is_some_and(|s| s == StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(client))]
|
||||
async fn fetch_and_parse(
|
||||
client: &reqwest::Client,
|
||||
url: String,
|
||||
) -> Result<PrecioPoint, anyhow::Error> {
|
||||
async fn fetch_and_scrap(
|
||||
client: &reqwest::Client,
|
||||
url: String,
|
||||
) -> Result<PrecioPoint, anyhow::Error> {
|
||||
let body = fetch_body(client, &url).await?;
|
||||
let maybe_point = { scrap_url(client, url, &body).await };
|
||||
|
||||
let point = match maybe_point {
|
||||
Ok(p) => Ok(p),
|
||||
Err(err) => {
|
||||
let now: DateTime<Utc> = Utc::now();
|
||||
// let debug_path = PathBuf::from(format!("debug-{}/", now.format("%Y-%m-%d")));
|
||||
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
||||
// let file_path = debug_path.join(format!("{}.html", nanoid!()));
|
||||
// tokio::fs::write(&file_path, &body).await.unwrap();
|
||||
// tracing::debug!(error=%err, "Failed to parse, saved body at {}",file_path.display());
|
||||
tracing::debug!(error=%err, "Failed to parse");
|
||||
Err(err)
|
||||
}
|
||||
}?;
|
||||
|
||||
Ok(point)
|
||||
pub fn anyhow_retry_if_wasnt_not_found(err: &anyhow::Error) -> bool {
|
||||
match err.downcast_ref::<reqwest::Error>() {
|
||||
Some(e) => retry_if_wasnt_not_found(e),
|
||||
None => true,
|
||||
}
|
||||
|
||||
get_parse_retry_policy()
|
||||
.retry_if(
|
||||
|| fetch_and_scrap(client, url.clone()),
|
||||
|err: &anyhow::Error| match err.downcast_ref::<reqwest::Error>() {
|
||||
Some(e) => !e.status().is_some_and(|s| s == StatusCode::NOT_FOUND),
|
||||
None => true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
|
||||
let file = tokio::fs::read_to_string(file_path).await?;
|
||||
|
||||
let client = build_client();
|
||||
let scraper = Scraper::new();
|
||||
|
||||
let url = {
|
||||
let dom = tl::parse(&file, tl::ParserOptions::default())?;
|
||||
|
@ -281,12 +237,13 @@ async fn parse_file_cli(file_path: String) -> anyhow::Result<()> {
|
|||
};
|
||||
|
||||
println!("URL: {}", &url);
|
||||
println!("{:?}", scrap_url(&client, url, &file).await);
|
||||
println!("{:?}", scraper.scrap_url(url, &file).await);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
|
||||
let urls = get_urls(&supermercado).await?;
|
||||
let scraper = Scraper::new();
|
||||
let urls = scraper.get_urls_for_supermercado(&supermercado).await?;
|
||||
urls.iter().for_each(|s| {
|
||||
println!("{}", s);
|
||||
});
|
||||
|
@ -294,137 +251,12 @@ async fn get_url_list_cli(supermercado: Supermercado) -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_urls(supermercado: &Supermercado) -> Result<Vec<String>, anyhow::Error> {
|
||||
Ok(match supermercado {
|
||||
Supermercado::Dia => sites::dia::get_urls().await?,
|
||||
Supermercado::Jumbo => sites::jumbo::get_urls().await?,
|
||||
Supermercado::Carrefour => sites::carrefour::get_urls().await?,
|
||||
Supermercado::Coto => sites::coto::get_urls().await?,
|
||||
Supermercado::Farmacity => sites::farmacity::get_urls().await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn scrap_url(
|
||||
client: &reqwest::Client,
|
||||
url: String,
|
||||
body: &str,
|
||||
) -> anyhow::Result<PrecioPoint> {
|
||||
let url_p = Url::parse(&url).unwrap();
|
||||
match url_p.host_str().unwrap() {
|
||||
"www.carrefour.com.ar" => {
|
||||
sites::carrefour::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
|
||||
}
|
||||
"diaonline.supermercadosdia.com.ar" => {
|
||||
sites::dia::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
|
||||
}
|
||||
"www.cotodigital3.com.ar" => {
|
||||
sites::coto::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
|
||||
}
|
||||
"www.jumbo.com.ar" => sites::jumbo::scrap(client, url, body).await,
|
||||
"www.farmacity.com" => {
|
||||
sites::farmacity::parse(url, &tl::parse(body, tl::ParserOptions::default())?)
|
||||
}
|
||||
s => bail!("Unknown host {}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AutoTelegram {
|
||||
token: String,
|
||||
chat_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Auto {
|
||||
db: Db,
|
||||
telegram: Option<AutoTelegram>,
|
||||
args: AutoArgs,
|
||||
}
|
||||
impl Auto {
|
||||
async fn download_supermercado(self, supermercado: Supermercado) -> anyhow::Result<()> {
|
||||
{
|
||||
let t0 = now_sec();
|
||||
match self.get_and_save_urls(&supermercado).await {
|
||||
Ok(_) => {
|
||||
self.inform(&format!(
|
||||
"Downloaded url list {:?} (took {})",
|
||||
&supermercado,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await
|
||||
}
|
||||
Err(err) => {
|
||||
self.inform(&format!(
|
||||
"[{:?}] FAILED url list: {:?} (took {})",
|
||||
&supermercado,
|
||||
err,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
let links: Vec<String> = {
|
||||
let mut links = self.db.get_urls_by_domain(supermercado.host()).await?;
|
||||
if let Some(n) = self.args.n_products {
|
||||
links.truncate(n);
|
||||
}
|
||||
links
|
||||
};
|
||||
// {
|
||||
// let debug_path = PathBuf::from("debug/");
|
||||
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
||||
// let file_path = debug_path.join(format!("{}.txt", nanoid!()));
|
||||
// tokio::fs::write(&file_path, &links.join("\n"))
|
||||
// .await
|
||||
// .unwrap();
|
||||
// tracing::info!("Lista de {:?}: {:?}", &supermercado, file_path.display());
|
||||
// }
|
||||
{
|
||||
let t0 = now_sec();
|
||||
let counters = fetch_list(&self.db, links).await;
|
||||
self.inform(&format!(
|
||||
"Downloaded {:?}: {:?} (took {})",
|
||||
&supermercado,
|
||||
counters,
|
||||
now_sec() - t0
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn inform_time<T: Future<Output = R>, R>(&self, msg: &str, action: T) -> R {
|
||||
let t0 = now_sec();
|
||||
let res = action.await;
|
||||
self.inform(&format!("{} (took {})", msg, now_sec() - t0))
|
||||
.await;
|
||||
res
|
||||
}
|
||||
|
||||
async fn get_and_save_urls(&self, supermercado: &Supermercado) -> anyhow::Result<()> {
|
||||
let urls = get_urls(supermercado).await?;
|
||||
self.db.save_producto_urls(urls).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn inform(&self, msg: &str) {
|
||||
tracing::info!("{}", msg);
|
||||
if let Some(telegram) = &self.telegram {
|
||||
let u = Url::parse_with_params(
|
||||
&format!("https://api.telegram.org/bot{}/sendMessage", telegram.token),
|
||||
&[
|
||||
("chat_id", telegram.chat_id.clone()),
|
||||
("text", msg.to_string()),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
reqwest::get(u).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
|
||||
let auto = {
|
||||
let db = Db::connect().await?;
|
||||
|
@ -440,7 +272,12 @@ async fn auto_cli(args: AutoArgs) -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
};
|
||||
Auto { db, telegram, args }
|
||||
Auto {
|
||||
db,
|
||||
telegram,
|
||||
args,
|
||||
scraper: Scraper::new(),
|
||||
}
|
||||
};
|
||||
auto.inform("[auto] Empezando scrap").await;
|
||||
|
||||
|
|
274
scraper-rs/src/proxy_client.rs
Normal file
274
scraper-rs/src/proxy_client.rs
Normal file
|
@ -0,0 +1,274 @@
|
|||
use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::{future::join_all, stream, FutureExt, StreamExt};
|
||||
use indicatif::ProgressBar;
|
||||
use itertools::Itertools;
|
||||
use proxy_scraper_checker::{Proxy, ProxyChecker};
|
||||
use rand::Rng;
|
||||
use reqwest::{IntoUrl, Url};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::{RwLock, Semaphore};
|
||||
|
||||
use crate::build_header_map;
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct ProxyClient {
|
||||
proxies: RwLock<Option<Vec<Url>>>,
|
||||
clients: RwLock<[Option<reqwest::Client>; 10]>,
|
||||
}
|
||||
|
||||
impl ProxyClient {
|
||||
pub async fn do_request(&self, url: impl IntoUrl + Clone) -> anyhow::Result<reqwest::Response> {
|
||||
loop {
|
||||
let client = {
|
||||
let mut client_ptr = self.clients.write().await;
|
||||
if let Some(client) = (*client_ptr).clone() {
|
||||
client
|
||||
} else {
|
||||
let proxies = self.get_proxies().await?;
|
||||
// let proxy = stream::iter(proxies)
|
||||
// .filter_map(|proxy| async {
|
||||
// println!("trying proxy {}", proxy);
|
||||
// check_proxy(
|
||||
// proxy.clone(),
|
||||
// "https://www.cotodigital3.com.ar/sitios/cdigi/".to_string(),
|
||||
// 3,
|
||||
// )
|
||||
// .map(|r| r.ok())
|
||||
// .await
|
||||
// })
|
||||
// .next()
|
||||
// .await
|
||||
// .unwrap()
|
||||
// .clone();
|
||||
let proxy = loop {
|
||||
let proxy = proxies[rand::thread_rng().gen_range(0..proxies.len())].clone();
|
||||
println!("trying proxy {}", proxy);
|
||||
match check_proxy(
|
||||
proxy,
|
||||
"https://www.cotodigital3.com.ar/sitios/cdigi/".to_string(),
|
||||
10,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(proxy) => break proxy,
|
||||
Err(_) => continue,
|
||||
}
|
||||
};
|
||||
|
||||
println!("chose proxy {}", proxy);
|
||||
let new_client = reqwest::ClientBuilder::default()
|
||||
.timeout(Duration::from_secs(300))
|
||||
.connect_timeout(Duration::from_secs(150))
|
||||
.default_headers(build_header_map())
|
||||
.proxy(reqwest::Proxy::all(proxy)?)
|
||||
.build()
|
||||
.unwrap();
|
||||
let ret = new_client.clone();
|
||||
*client_ptr = Some(new_client);
|
||||
ret
|
||||
}
|
||||
};
|
||||
let req = client.get(url.clone()).build()?;
|
||||
match client.execute(req).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(_) => {
|
||||
// possibly IP locked, reset client to get another IP
|
||||
{
|
||||
println!("request failed, resetting client");
|
||||
*(self.clients.write().await) = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_proxies(&self) -> anyhow::Result<Vec<Url>> {
|
||||
let mut proxies_ptr = self.proxies.write().await;
|
||||
if let Some(proxies) = (*proxies_ptr).clone() {
|
||||
Ok(proxies)
|
||||
} else {
|
||||
// let scraper = proxy_scraper_checker::ProxyScraper::default();
|
||||
|
||||
// let archive_urls = scraper.scrape_archive_urls().await?;
|
||||
// let futures: Vec<_> = archive_urls
|
||||
// .into_iter()
|
||||
// .map(|url| {
|
||||
// tokio::task::spawn({
|
||||
// let value = scraper.clone();
|
||||
// async move { value.scrape_proxies(url, true).await }
|
||||
// })
|
||||
// })
|
||||
// .collect();
|
||||
// let results: Vec<_> = join_all(futures).await.into_iter().try_collect()?;
|
||||
// let proxies: Vec<_> = results
|
||||
// .into_iter()
|
||||
// .filter_map(|res| if let Ok(res) = res { Some(res) } else { None })
|
||||
// .flatten()
|
||||
// .filter(|x| {
|
||||
// if let Proxy::Socks5(_) = x {
|
||||
// true
|
||||
// } else {
|
||||
// false
|
||||
// }
|
||||
// })
|
||||
// .collect();
|
||||
|
||||
let socks5_proxies = get_proxy_list_from_raw_list(
|
||||
"https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt",
|
||||
"socks5",
|
||||
)
|
||||
.await?;
|
||||
let http_proxies = get_proxy_list_from_raw_list(
|
||||
"https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt",
|
||||
"http",
|
||||
)
|
||||
.await?;
|
||||
let fosy_http_proxies =
|
||||
get_proxy_list_from_raw_list("https://fosy.club/api/free/list?type=http", "http")
|
||||
.await?;
|
||||
let fosy_socks5_proxies = get_proxy_list_from_raw_list(
|
||||
"https://fosy.club/api/free/list?type=socks5",
|
||||
"socks5",
|
||||
)
|
||||
.await?;
|
||||
let geonode_proxies = get_proxy_list_geonode()
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("getting proxy list ({error})", error = e))?;
|
||||
|
||||
// let proxies: Vec<_> = [
|
||||
// // socks5_proxies,
|
||||
// // http_proxies,
|
||||
// fosy_http_proxies,
|
||||
// fosy_socks5_proxies,
|
||||
// geonode_proxies,
|
||||
// ]
|
||||
// .into_iter()
|
||||
// .flatten()
|
||||
// .collect();
|
||||
|
||||
let checked_proxies: Vec<_> = {
|
||||
let proxiess: HashSet<_> = proxies
|
||||
.into_iter()
|
||||
.filter_map(|p| match p.scheme() {
|
||||
"socks5" => Some(Proxy::Socks5(p.host_str()?.to_string())),
|
||||
"http" => Some(Proxy::Http(p.host_str()?.to_string())),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
let checker = ProxyChecker::new(
|
||||
Arc::new(Semaphore::new(32)),
|
||||
ProgressBar::new(proxiess.len().try_into().unwrap()),
|
||||
);
|
||||
checker
|
||||
.check_proxies(proxiess.into(), "https://milei.nulo.in/".to_string(), 8)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| Url::from_str(&p.url()))
|
||||
.try_collect()?
|
||||
};
|
||||
|
||||
let ret = checked_proxies.clone();
|
||||
println!("got {} proxies", ret.len());
|
||||
*proxies_ptr = Some(checked_proxies);
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_proxy(proxy: Url, url: String, timeout: u64) -> anyhow::Result<Url> {
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(reqwest::Proxy::all(proxy.clone())?)
|
||||
.timeout(Duration::from_secs(timeout))
|
||||
.build()?;
|
||||
|
||||
client
|
||||
.get(url)
|
||||
.send()
|
||||
.await
|
||||
.context("Request failed")?
|
||||
.error_for_status()
|
||||
.context("Request returned an error status code")?;
|
||||
|
||||
Ok(proxy)
|
||||
}
|
||||
|
||||
// pub async fn find_first_working_proxy(proxies: Vec<String>) -> anyhow::Result<Url> {
|
||||
// let semaphore = Arc::new(Semaphore::new(64));
|
||||
// for proxy in proxies {
|
||||
// let semaphore = semaphore.clone();
|
||||
|
||||
// }
|
||||
|
||||
// let proxy = stream::iter(proxies)
|
||||
// .filter_map(|proxy| async {
|
||||
// println!("trying proxy {}", proxy);
|
||||
// check_proxy(
|
||||
// proxy.clone(),
|
||||
// "https://www.cotodigital3.com.ar/sitios/cdigi/".to_string(),
|
||||
// 3,
|
||||
// )
|
||||
// .map(|r| r.ok())
|
||||
// .await
|
||||
// }).concu
|
||||
// .next()
|
||||
// .await
|
||||
// .unwrap()
|
||||
// .clone();
|
||||
|
||||
// }
|
||||
|
||||
pub async fn get_proxy_list_from_raw_list<U: IntoUrl>(
|
||||
list_url: U,
|
||||
protocol: &str,
|
||||
) -> anyhow::Result<Vec<Url>> {
|
||||
let res = reqwest::get(list_url).await?;
|
||||
let text = res.text().await?;
|
||||
Ok(text
|
||||
.lines()
|
||||
.map(|l| Url::from_str(&format!("{}://{}", protocol, l)))
|
||||
.filter_map(|r| r.ok())
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Ips {
|
||||
data: Vec<Ip>,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct Ip {
|
||||
ip: String,
|
||||
port: String,
|
||||
protocols: Vec<String>,
|
||||
}
|
||||
pub async fn get_proxy_list_geonode() -> anyhow::Result<Vec<Url>> {
|
||||
let ips = reqwest::get("https://proxylist.geonode.com/api/proxy-list?protocols=socks5%2Chttp&filterUpTime=90&limit=500&page=1&sort_by=lastChecked&sort_type=asc").await?.json::<Ips>().await?;
|
||||
Ok(ips
|
||||
.data
|
||||
.into_iter()
|
||||
.map(|i| Url::from_str(&format!("{}://{}:{}", i.protocols[0], i.ip, i.port)))
|
||||
.filter_map(|r| r.ok())
|
||||
.collect())
|
||||
}
|
||||
pub async fn get_proxy_list_checkerproxy() -> anyhow::Result<Vec<Url>> {
|
||||
let scraper = proxy_scraper_checker::ProxyScraper::default();
|
||||
let archive_urls = scraper.scrape_archive_urls().await?;
|
||||
let futures: Vec<_> = archive_urls
|
||||
.into_iter()
|
||||
.map(|url| {
|
||||
tokio::task::spawn({
|
||||
let value = scraper.clone();
|
||||
async move { value.scrape_proxies(url, true).await }
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let results: Vec<_> = join_all(futures).await.into_iter().try_collect()?;
|
||||
let proxies: Vec<_> = results
|
||||
.into_iter()
|
||||
.filter_map(|res| if let Ok(res) = res { Some(res) } else { None })
|
||||
.flatten()
|
||||
.map(|p| Url::from_str(&p.url()))
|
||||
.try_collect()?;
|
||||
Ok(proxies)
|
||||
}
|
116
scraper-rs/src/scraper.rs
Normal file
116
scraper-rs/src/scraper.rs
Normal file
|
@ -0,0 +1,116 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use reqwest::Url;
|
||||
use simple_error::bail;
|
||||
|
||||
use crate::{
|
||||
anyhow_retry_if_wasnt_not_found, build_client, get_fetch_retry_policy, get_parse_retry_policy,
|
||||
proxy_client::ProxyClient, sites, supermercado::Supermercado, PrecioPoint,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Scraper {
|
||||
default_client: reqwest::Client,
|
||||
proxy_client: Arc<ProxyClient>,
|
||||
}
|
||||
|
||||
impl Scraper {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
default_client: build_client(),
|
||||
proxy_client: ProxyClient::default().into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_urls_for_supermercado(
|
||||
&self,
|
||||
supermercado: &Supermercado,
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
match supermercado {
|
||||
Supermercado::Dia => sites::dia::get_urls().await,
|
||||
Supermercado::Jumbo => sites::jumbo::get_urls().await,
|
||||
Supermercado::Carrefour => sites::carrefour::get_urls().await,
|
||||
Supermercado::Coto => sites::coto::get_urls(&self.proxy_client).await,
|
||||
Supermercado::Farmacity => sites::farmacity::get_urls().await,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn fetch_and_parse(&self, url: String) -> Result<PrecioPoint, anyhow::Error> {
|
||||
async fn fetch_and_scrap(
|
||||
scraper: &Scraper,
|
||||
url: String,
|
||||
) -> Result<PrecioPoint, anyhow::Error> {
|
||||
let body = scraper.fetch_body(&url).await?;
|
||||
let maybe_point = { scraper.scrap_url(url, &body).await };
|
||||
|
||||
let point = match maybe_point {
|
||||
Ok(p) => Ok(p),
|
||||
Err(err) => {
|
||||
// let now: DateTime<Utc> = Utc::now();
|
||||
// let debug_path = PathBuf::from(format!("debug-{}/", now.format("%Y-%m-%d")));
|
||||
// tokio::fs::create_dir_all(&debug_path).await.unwrap();
|
||||
// let file_path = debug_path.join(format!("{}.html", nanoid!()));
|
||||
// tokio::fs::write(&file_path, &body).await.unwrap();
|
||||
// tracing::debug!(error=%err, "Failed to parse, saved body at {}",file_path.display());
|
||||
tracing::debug!(error=%err, "Failed to parse");
|
||||
Err(err)
|
||||
}
|
||||
}?;
|
||||
|
||||
Ok(point)
|
||||
}
|
||||
|
||||
get_parse_retry_policy()
|
||||
.retry_if(
|
||||
|| fetch_and_scrap(self, url.clone()),
|
||||
anyhow_retry_if_wasnt_not_found,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn fetch_body(&self, url_string: &str) -> anyhow::Result<String> {
|
||||
let url = Url::parse(url_string)?;
|
||||
|
||||
get_fetch_retry_policy()
|
||||
.retry_if(
|
||||
|| self.request_and_body(url.clone()),
|
||||
anyhow_retry_if_wasnt_not_found,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn request_and_body(&self, url: Url) -> anyhow::Result<String> {
|
||||
let res = match Supermercado::from_url(&url) {
|
||||
Some(Supermercado::Coto) => self.proxy_client.do_request(url).await?,
|
||||
_ => self
|
||||
.default_client
|
||||
.execute(self.default_client.get(url).build()?)
|
||||
.await?
|
||||
.error_for_status()?,
|
||||
};
|
||||
Ok(res.text().await?)
|
||||
}
|
||||
|
||||
pub async fn scrap_url(&self, url: String, res_body: &str) -> anyhow::Result<PrecioPoint> {
|
||||
let url_p = Url::parse(&url).unwrap();
|
||||
match Supermercado::from_url(&url_p) {
|
||||
Some(Supermercado::Carrefour) => {
|
||||
sites::carrefour::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
|
||||
}
|
||||
Some(Supermercado::Dia) => {
|
||||
sites::dia::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
|
||||
}
|
||||
Some(Supermercado::Coto) => {
|
||||
sites::coto::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
|
||||
}
|
||||
Some(Supermercado::Jumbo) => {
|
||||
sites::jumbo::scrap(&self.default_client, url, res_body).await
|
||||
}
|
||||
Some(Supermercado::Farmacity) => {
|
||||
sites::farmacity::parse(url, &tl::parse(res_body, tl::ParserOptions::default())?)
|
||||
}
|
||||
None => bail!("Unknown URL host {}", url),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,10 +1,12 @@
|
|||
use again::Task;
|
||||
use anyhow::{anyhow, Context};
|
||||
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use reqwest::Url;
|
||||
|
||||
use crate::{
|
||||
build_client, build_coto_client, do_request, get_fetch_retry_policy, retry_if_wasnt_not_found, PrecioPoint
|
||||
anyhow_retry_if_wasnt_not_found, get_fetch_retry_policy, proxy_client::ProxyClient,
|
||||
retry_if_wasnt_not_found, PrecioPoint,
|
||||
};
|
||||
|
||||
pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error> {
|
||||
|
@ -78,8 +80,7 @@ pub fn parse(url: String, dom: &tl::VDom) -> Result<PrecioPoint, anyhow::Error>
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
||||
let client = build_coto_client();
|
||||
pub async fn get_urls(proxy_client: &ProxyClient) -> anyhow::Result<Vec<String>> {
|
||||
let initial = Url::parse("https://www.cotodigital3.com.ar/sitios/cdigi/browse?Nf=product.endDate%7CGTEQ+1.7032032E12%7C%7Cproduct.startDate%7CLTEQ+1.7032032E12&Nr=AND%28product.sDisp_200%3A1004%2Cproduct.language%3Aespa%C3%B1ol%2COR%28product.siteId%3ACotoDigital%29%29")?;
|
||||
|
||||
let page_size = 50;
|
||||
|
@ -90,12 +91,21 @@ pub async fn get_urls() -> anyhow::Result<Vec<String>> {
|
|||
.append_pair("No", &(i * page_size).to_string())
|
||||
.append_pair("Nrpp", &(page_size).to_string())
|
||||
.finish();
|
||||
let client = &client;
|
||||
async move {
|
||||
let text = get_fetch_retry_policy()
|
||||
let text: String = get_fetch_retry_policy()
|
||||
.retry_if(
|
||||
|| do_request(client, u.as_str()).and_then(|r| r.text()),
|
||||
retry_if_wasnt_not_found,
|
||||
|| {
|
||||
async fn asdf(
|
||||
proxy_client: &ProxyClient,
|
||||
url: Url,
|
||||
) -> anyhow::Result<String> {
|
||||
let res = proxy_client.do_request(url).await?.error_for_status()?;
|
||||
Ok(res.text().await?)
|
||||
}
|
||||
let url = u.clone();
|
||||
asdf(proxy_client, url)
|
||||
},
|
||||
anyhow_retry_if_wasnt_not_found,
|
||||
)
|
||||
.await?;
|
||||
let dom = tl::parse(&text, tl::ParserOptions::default())?;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use clap::ValueEnum;
|
||||
use reqwest::Url;
|
||||
|
||||
#[derive(ValueEnum, Clone, Debug, Copy)]
|
||||
pub enum Supermercado {
|
||||
|
@ -18,4 +19,14 @@ impl Supermercado {
|
|||
Self::Farmacity => "www.farmacity.com",
|
||||
}
|
||||
}
|
||||
pub fn from_url(url: &Url) -> Option<Self> {
|
||||
match url.host_str().unwrap() {
|
||||
"www.carrefour.com.ar" => Some(Self::Carrefour),
|
||||
"diaonline.supermercadosdia.com.ar" => Some(Self::Dia),
|
||||
"www.cotodigital3.com.ar" => Some(Self::Coto),
|
||||
"www.jumbo.com.ar" => Some(Self::Jumbo),
|
||||
"www.farmacity.com" => Some(Self::Farmacity),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue