compress+other changes

This commit is contained in:
Cat /dev/Nulo 2023-12-31 20:27:33 -03:00
parent 4535054415
commit c08a6c4a3b

View file

@ -1,5 +1,10 @@
use async_channel::{Receiver, Sender}; use async_channel::{Receiver, Sender};
use std::{env::args, fs, io::stdout, net::SocketAddr}; use std::{
env::args,
fs,
net::SocketAddr,
process::{Command, Stdio},
};
use tokio::io::{stderr, AsyncWriteExt}; use tokio::io::{stderr, AsyncWriteExt};
use warc::{RecordBuilder, WarcHeader, WarcWriter}; use warc::{RecordBuilder, WarcHeader, WarcWriter};
@ -11,7 +16,9 @@ struct FullExchange {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let links_list_path = args().skip(1).next().unwrap(); let mut args = args().skip(1);
let links_list_path = args.next().unwrap();
let output_zstd_path = args.next().unwrap();
let links_str = fs::read_to_string(links_list_path).unwrap(); let links_str = fs::read_to_string(links_list_path).unwrap();
let links = links_str let links = links_str
.split("\n") .split("\n")
@ -31,7 +38,7 @@ async fn main() {
handles.push(tokio::spawn(worker(rx, tx))); handles.push(tokio::spawn(worker(rx, tx)));
} }
let warc_writer_handle = tokio::spawn(warc_writer(res_receiver)); let warc_writer_handle = tokio::spawn(warc_writer(res_receiver, output_zstd_path));
for link in links { for link in links {
sender.send_blocking(link).unwrap(); sender.send_blocking(link).unwrap();
@ -102,8 +109,16 @@ async fn fetch(client: &reqwest::Client, url: String) -> Result<FullExchange, re
}) })
} }
async fn warc_writer(rx: Receiver<FullExchange>) { async fn warc_writer(rx: Receiver<FullExchange>, output_zstd_path: String) {
let mut writer = WarcWriter::new(stdout()); let zstd_proc = Command::new("zstd")
.args(&["-T0", "-15", "--long", "-o", &output_zstd_path])
.stdin(Stdio::piped())
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()
.unwrap();
let mut writer = WarcWriter::new(zstd_proc.stdin.unwrap());
writer writer
.write( .write(
&RecordBuilder::default() &RecordBuilder::default()