safely unwrap socket & concurrent to

This commit is contained in:
Cat /dev/Nulo 2023-12-31 19:59:11 -03:00
parent 50873cca8c
commit aedc9c1ed0

View file

@ -3,7 +3,7 @@ use std::{env::args, fs, io::stdout, net::SocketAddr};
use warc::{RecordBuilder, WarcHeader, WarcWriter}; use warc::{RecordBuilder, WarcHeader, WarcWriter};
struct FullExchange { struct FullExchange {
socket_addr: SocketAddr, socket_addr: Option<SocketAddr>,
request: http::Request<&'static str>, request: http::Request<&'static str>,
response: http::Response<Vec<u8>>, response: http::Response<Vec<u8>>,
} }
@ -57,7 +57,7 @@ async fn worker(rx: Receiver<String>, tx: Sender<FullExchange>) {
} }
let response = client.execute(request).await.unwrap(); let response = client.execute(request).await.unwrap();
let ip_address = response.remote_addr().unwrap(); let ip_address = response.remote_addr();
let http_request = { let http_request = {
http_request_builder http_request_builder
@ -103,35 +103,41 @@ async fn warc_writer(rx: Receiver<FullExchange>) {
.unwrap(); .unwrap();
while let Ok(res) = rx.recv().await { while let Ok(res) = rx.recv().await {
let uri = res.request.uri().to_string(); let uri = res.request.uri().to_string();
writer let req_record = {
.write( let mut builder = RecordBuilder::default()
&RecordBuilder::default()
.version("1.0".to_owned()) .version("1.0".to_owned())
.warc_type(warc::RecordType::Request) .warc_type(warc::RecordType::Request)
.header(WarcHeader::TargetURI, uri.clone()) .header(WarcHeader::TargetURI, uri.clone())
.header(WarcHeader::IPAddress, res.socket_addr.ip().to_string())
.header(WarcHeader::ContentType, "application/http;msgtype=request") .header(WarcHeader::ContentType, "application/http;msgtype=request")
.header( .header(
WarcHeader::Unknown("X-Warcificator-Lying".to_string()), WarcHeader::Unknown("X-Warcificator-Lying".to_string()),
"the request contains other headers not included here", "the request contains other headers not included here",
) );
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_request(res.request).into_bytes()) .body(format_http11_request(res.request).into_bytes())
.build() .build()
.unwrap(), .unwrap()
) };
.unwrap(); writer.write(&req_record).unwrap();
writer writer
.write( .write(&{
&RecordBuilder::default() let mut builder = RecordBuilder::default()
.version("1.0".to_owned()) .version("1.0".to_owned())
.warc_type(warc::RecordType::Response) .warc_type(warc::RecordType::Response)
.header(WarcHeader::TargetURI, uri) .header(WarcHeader::TargetURI, uri)
.header(WarcHeader::IPAddress, res.socket_addr.ip().to_string()) .header(WarcHeader::ConcurrentTo, req_record.warc_id())
.header(WarcHeader::ContentType, "application/http;msgtype=response") .header(WarcHeader::ContentType, "application/http;msgtype=response");
if let Some(addr) = res.socket_addr {
builder = builder.header(WarcHeader::IPAddress, addr.ip().to_string());
}
builder
.body(format_http11_response(res.response)) .body(format_http11_response(res.response))
.build() .build()
.unwrap(), .unwrap()
) })
.unwrap(); .unwrap();
} }
} }