Buffer for request-futures

This commit is contained in:
einisto 2022-06-04 01:05:27 +03:00
parent 351a534ce1
commit a101ec3344

View File

@ -1,14 +1,12 @@
use clap::{Arg, ArgGroup, Command}; use clap::{Arg, ArgGroup, Command};
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
//use image::io::Reader;
use regex::Regex; use regex::Regex;
use reqwest::Client; use reqwest::Client;
use serde_json::Value; use serde_json::Value;
//use std::io::Cursor;
use std::{path::PathBuf, process::exit}; use std::{path::PathBuf, process::exit};
use tokio; use tokio::{fs::File, io::AsyncWriteExt};
// General error type to make error handling easier // General type to make error handling easier
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -122,51 +120,39 @@ async fn get_imagelist(
} }
async fn download_images(input_output_data: &Vec<(String, PathBuf)>) -> Result<u16> { async fn download_images(input_output_data: &Vec<(String, PathBuf)>) -> Result<u16> {
let client = Client::new(); let client = Client::builder().build()?;
let bodies = stream::iter(input_output_data.iter()) let futures = stream::iter(input_output_data.iter().map(|data| async {
.map(|data| { let (url, path) = data;
let client = &client; let send_fut = client.get(url).send();
async move {
let res = client.get(&data.0).send().await?;
res.bytes().await
}
})
.buffer_unordered(input_output_data.len());
bodies match send_fut.await {
.for_each(|body| async move { Ok(res) => match res.bytes().await {
println!("{:?}", body);
match body {
Ok(bytes) => { Ok(bytes) => {
//let img = Reader::new(Cursor::new(bytes)) let mut file = File::create(path).await.unwrap();
// .with_guessed_format() file.write_all(&bytes).await.unwrap();
// .map(|guess| guess.decode())
// .map(|decode| decode);
// TODO: Write 'img' to file asynchronously println!("Got {} bytes from {:?} to {:?}", bytes.len(), &url, &path)
// Requires access to pathnames (stored alongside urls in tuples inside input_output_data-vec) }
Err(_) => eprintln!("Error reading bytes from {}", url),
},
Err(_) => eprintln!("Error downloading {}", url),
}
}))
.buffer_unordered(100)
.collect::<Vec<()>>();
println!("Got {} bytes", bytes.len()); futures.await;
}
Err(e) => {
eprintln!("Error during request: {e}");
}
}
})
.await;
Ok(0) Ok(0)
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
// TODO: add possible config-file for default output path (similar to wgdl.py)
let (path, target, mode) = parse_cli_args()?; let (path, target, mode) = parse_cli_args()?;
println!( println!(
"\nCONFIG:\n\tPATH: {:?}\n\tTARGET: {}\n\tMODE: {:?}\n", "\nDownload configuration:\n\tOUTPUT: {:?}\n\tURL: {}\n\tDOWNLOAD-MODE: {:?}\n",
path, target, mode path, target, mode
); );
@ -174,7 +160,6 @@ async fn main() -> Result<()> {
Mode::Thread => { Mode::Thread => {
let (json_url, board_name) = create_thread_url(target); let (json_url, board_name) = create_thread_url(target);
let targets = get_imagelist(&json_url.as_str(), &board_name.as_str(), &path).await?; let targets = get_imagelist(&json_url.as_str(), &board_name.as_str(), &path).await?;
println!("LIST of URLs:\n{:?}", targets);
let _total_file_amount = download_images(&targets).await?; let _total_file_amount = download_images(&targets).await?;
//println!("Total files downloaded: {}", total_file_amount); //println!("Total files downloaded: {}", total_file_amount);