diff --git a/warcificator/src/main.rs b/warcificator/src/main.rs index 0450ef4..a2368e7 100644 --- a/warcificator/src/main.rs +++ b/warcificator/src/main.rs @@ -1,5 +1,6 @@ use async_channel::{Receiver, Sender}; use std::{env::args, fs, io::stdout, net::SocketAddr}; +use tokio::io::{stderr, AsyncWriteExt}; use warc::{RecordBuilder, WarcHeader, WarcWriter}; struct FullExchange { @@ -18,6 +19,7 @@ async fn main() { .filter(|s| s.len() > 0) .map(|s| s.to_owned()) .collect::>(); + let handle = { let (sender, receiver) = async_channel::bounded::(1); let (res_sender, res_receiver) = async_channel::unbounded::(); @@ -48,55 +50,67 @@ async fn main() { async fn worker(rx: Receiver, tx: Sender) { let client = reqwest::ClientBuilder::default().build().unwrap(); while let Ok(url) = rx.recv().await { - let request = client.get(url).build().unwrap(); - let mut http_request_builder = http::Request::builder() - .method(request.method()) - .uri(request.url().as_str()); - for (key, val) in request.headers() { - http_request_builder = http_request_builder.header(key, val); - } - let response = client.execute(request).await.unwrap(); - - let ip_address = response.remote_addr(); - - let http_request = { - http_request_builder - .version(response.version()) - .body("") - .unwrap() - }; - - let http_response = { - let mut http_response_builder = http::Response::<()>::builder() - .status(response.status()) - .version(response.version()); - for (key, val) in response.headers() { - http_response_builder = http_response_builder.header(key, val); + let res = fetch(&client, url.clone()).await; + match res { + Ok(ex) => { + tx.send(ex).await.unwrap(); } - let body = response.bytes().await.unwrap(); - http_response_builder.body(body.to_vec()).unwrap() - }; - - tx.send(FullExchange { - socket_addr: ip_address, - request: http_request, - response: http_response, - }) - .await - .unwrap(); + Err(err) => { + stderr() + .write_all(format!("Failed to fetch {}: {:#?}", url.as_str(), err).as_bytes()) + .await + .unwrap(); + } + } } } +async fn fetch(client: &reqwest::Client, url: String) -> Result { + let request = client.get(url).build().unwrap(); + let mut http_request_builder = http::Request::builder() + .method(request.method()) + .uri(request.url().as_str()); + for (key, val) in request.headers() { + http_request_builder = http_request_builder.header(key, val); + } + let response = client.execute(request).await?; + + let ip_address = response.remote_addr(); + + let http_request = { + http_request_builder + .version(response.version()) + .body("") + .unwrap() + }; + + let http_response = { + let mut http_response_builder = http::Response::<()>::builder() + .status(response.status()) + .version(response.version()); + for (key, val) in response.headers() { + http_response_builder = http_response_builder.header(key, val); + } + let body = response.bytes().await?; + http_response_builder.body(body.to_vec()).unwrap() + }; + + Ok(FullExchange { + socket_addr: ip_address, + request: http_request, + response: http_response, + }) +} + async fn warc_writer(rx: Receiver) { let mut writer = WarcWriter::new(stdout()); - let warc_fields = format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html"); writer .write( &RecordBuilder::default() .version("1.0".to_owned()) .warc_type(warc::RecordType::WarcInfo) .header(WarcHeader::ContentType, "application/warc-fields") - .body(warc_fields.into()) + .body(format!("software: preciazo-warcificator/0.0.0\nformat: WARC file version 1.0\nconformsTo: http://www.archive.org/documents/WarcFileFormat-1.0.html").into()) .build() .unwrap(), )