File selection based on stdin
* Spawn to threads as a client to handle stdin (blocking) & connection/transfer * Input from stdin gets transfered to connection-thread via MPSC-channel * Included a new flag (-a) to skip stdin and instead download all available files from the host * Removed earlier typos
This commit is contained in:
parent
5224e24725
commit
c933339771
317
src/client.rs
317
src/client.rs
@ -1,4 +1,13 @@
|
||||
use std::{path::PathBuf, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
io::stdin,
|
||||
path::PathBuf,
|
||||
process::exit,
|
||||
sync::mpsc::{self, Receiver, Sender},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
|
||||
@ -6,133 +15,203 @@ use tokio::{
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
pub async fn connect(addr: String, fileroot: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("[+] Connecting to {}", addr);
|
||||
let mut stream = TcpStream::connect(addr.clone()).await?;
|
||||
pub async fn connect(
|
||||
addr: String,
|
||||
fileroot: PathBuf,
|
||||
download_all: bool,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();
|
||||
|
||||
let (reader, writer) = stream.split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
let connection_task = thread::spawn(move || async move {
|
||||
println!("[+] Connecting to {}", addr);
|
||||
let mut stream = TcpStream::connect(addr.clone()).await?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
let (reader, writer) = stream.split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
|
||||
loop {
|
||||
let bytes_read = reader.read_buf(&mut buf).await?;
|
||||
if bytes_read == 0 {
|
||||
println!("[-] No more bytes received, closing connection");
|
||||
break;
|
||||
}
|
||||
let mut buf = Vec::new();
|
||||
|
||||
// Receive buffersize
|
||||
let buffersize = String::from_utf8(buf.clone())?.parse::<usize>()?;
|
||||
println!("[+] Selected buffersize: {}", buffersize);
|
||||
buf.clear();
|
||||
|
||||
// ACK buffersize
|
||||
writer.write_all(b"ACK\n").await.unwrap();
|
||||
writer.flush().await?;
|
||||
|
||||
// Receive file amount (or termination request if the server does not have any files available)
|
||||
let file_amount: usize;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut buf).await?;
|
||||
let msg = String::from_utf8(buf.clone())?;
|
||||
if msg.trim() == "FIN" {
|
||||
println!("[-] Server does not have any files available, closing connection");
|
||||
writer.write_all(b"FIN\n").await?;
|
||||
writer.flush().await?;
|
||||
break;
|
||||
} else {
|
||||
file_amount = msg.trim().parse::<usize>()?;
|
||||
println!("[+] Total of {} files available", file_amount);
|
||||
buf.clear();
|
||||
|
||||
// ACK file amount
|
||||
writer.write_all(b"ACK\n").await?;
|
||||
writer.flush().await?;
|
||||
}
|
||||
|
||||
// Receive file metadata
|
||||
println!("[+] Receiving file metadata");
|
||||
let mut metadata = Vec::<(String, u64)>::new();
|
||||
while metadata.len() < file_amount {
|
||||
reader.read_until(b'\n', &mut buf).await?;
|
||||
let msg = String::from_utf8(buf.clone())?;
|
||||
buf.clear();
|
||||
|
||||
// Parse 'filesize:filename'
|
||||
let split = msg.split(":").collect::<Vec<&str>>();
|
||||
let filesize = split[0].trim().parse::<u64>()?;
|
||||
let filename = split[1].trim().to_string();
|
||||
|
||||
metadata.push((filename, filesize));
|
||||
}
|
||||
println!("[INFO] Metadata: {:?}", metadata);
|
||||
|
||||
// Send request for each file by filename
|
||||
// TODO: Choose files based on input
|
||||
println!("[+] Requesting files individually");
|
||||
for file in &metadata {
|
||||
println!("[INFO] Current request: [{:?}]", file);
|
||||
let msg = file.0.to_string() + "\n";
|
||||
writer.write_all(msg.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
|
||||
// Create file locally
|
||||
let mut output_path = fileroot.clone();
|
||||
output_path.push(file.0.clone());
|
||||
|
||||
let output_file = File::create(output_path.clone()).await?;
|
||||
println!("[+] New file: {:#?}", output_path);
|
||||
let mut file_buf = BufWriter::new(output_file);
|
||||
|
||||
// Receive the file itself
|
||||
let mut remaining_data = file.1;
|
||||
let mut buf = vec![0u8; buffersize];
|
||||
|
||||
while remaining_data != 0 {
|
||||
if remaining_data >= buffersize as u64 {
|
||||
let read_result = reader.read(&mut buf);
|
||||
|
||||
match read_result.await {
|
||||
Ok(0) => {
|
||||
println!("[-] Waiting for data to become available...");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
Ok(n) => {
|
||||
file_buf.write_all(&mut buf).await?;
|
||||
file_buf.flush().await?;
|
||||
remaining_data = remaining_data - n as u64;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
let read_result = reader.read(&mut buf);
|
||||
|
||||
match read_result.await {
|
||||
Ok(_) => {
|
||||
let mut buf_slice = &buf[0..(remaining_data as usize)];
|
||||
file_buf.write_all(&mut buf_slice).await?;
|
||||
file_buf.flush().await?;
|
||||
remaining_data = 0;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let bytes_read = reader.read_buf(&mut buf).await?;
|
||||
if bytes_read == 0 {
|
||||
println!("[-] No more bytes received, closing connection");
|
||||
break;
|
||||
}
|
||||
|
||||
// ACK file
|
||||
writer.write_all(b"ACK\n").await?;
|
||||
// Receive buffersize
|
||||
let buffersize = String::from_utf8(buf.clone())?.parse::<usize>()?;
|
||||
println!("[+] Selected buffersize: {}", buffersize);
|
||||
buf.clear();
|
||||
|
||||
// ACK buffersize
|
||||
writer.write_all(b"ACK\n").await.unwrap();
|
||||
writer.flush().await?;
|
||||
|
||||
// Receive file amount (or termination request if the server does not have any files available)
|
||||
let file_amount: usize;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut buf).await?;
|
||||
let msg = String::from_utf8(buf.clone())?;
|
||||
if msg.trim() == "FIN" {
|
||||
println!("[-] Server does not have any files available, closing connection");
|
||||
writer.write_all(b"FIN\n").await?;
|
||||
writer.flush().await?;
|
||||
break;
|
||||
} else {
|
||||
file_amount = msg.trim().parse::<usize>()?;
|
||||
println!("[+] Total of {} files available", file_amount);
|
||||
buf.clear();
|
||||
|
||||
// ACK file amount
|
||||
writer.write_all(b"ACK\n").await?;
|
||||
writer.flush().await?;
|
||||
}
|
||||
|
||||
// Receive file metadata
|
||||
println!("[+] Receiving file metadata");
|
||||
let mut metadata = HashMap::new();
|
||||
while metadata.len() < file_amount {
|
||||
reader.read_until(b'\n', &mut buf).await?;
|
||||
let msg = String::from_utf8(buf.clone())?;
|
||||
buf.clear();
|
||||
|
||||
// Parse 'filesize:filename'
|
||||
let split = msg.split(":").collect::<Vec<&str>>();
|
||||
let filesize = split[0].trim().parse::<u64>()?;
|
||||
let filename = split[1].trim().to_string();
|
||||
|
||||
metadata.insert(filename, filesize);
|
||||
}
|
||||
println!("[INFO] Metadata: {:?}", metadata);
|
||||
|
||||
// Send request for each file by filename
|
||||
println!("[+] Requesting files individually\n");
|
||||
let filenames = metadata.keys().collect::<Vec<&String>>();
|
||||
let mut filenames_iter = filenames.iter();
|
||||
|
||||
let mut input = String::new();
|
||||
loop {
|
||||
input.clear();
|
||||
|
||||
if download_all {
|
||||
match filenames_iter.next() {
|
||||
Some(filename) => {
|
||||
input.push_str(filename);
|
||||
}
|
||||
None => input.push_str("DISCONNECT"),
|
||||
}
|
||||
} else {
|
||||
// Blocks the current thread until a message is readable
|
||||
// Requests (messages) get queued if they can't be served immediately
|
||||
let msg = rx.recv()?;
|
||||
|
||||
input.push_str(msg.trim());
|
||||
}
|
||||
|
||||
if input == "DISCONNECT" {
|
||||
break;
|
||||
} else if !metadata.contains_key(input.as_str()) {
|
||||
println!("[-] No file named '{}' available\n", input);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle request based on input received from channel
|
||||
println!("[+] Requesting file named '{}'", input);
|
||||
let msg = input.to_string() + "\n";
|
||||
writer.write_all(msg.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
|
||||
// Create file locally
|
||||
let mut output_path = fileroot.clone();
|
||||
output_path.push(input.clone());
|
||||
|
||||
let output_file = File::create(output_path.clone()).await?;
|
||||
println!("[+] New file: {:#?}", output_path);
|
||||
let mut file_buf = BufWriter::new(output_file);
|
||||
|
||||
// Receive the file itself
|
||||
let filesize = metadata.get(input.as_str()).unwrap().clone();
|
||||
let mut remaining_data = filesize;
|
||||
let mut buf = vec![0u8; buffersize];
|
||||
|
||||
while remaining_data != 0 {
|
||||
if remaining_data >= buffersize as u64 {
|
||||
let read_result = reader.read(&mut buf);
|
||||
|
||||
match read_result.await {
|
||||
Ok(0) => {
|
||||
println!("[-] Waiting for data to become available...");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
Ok(n) => {
|
||||
file_buf.write_all(&mut buf).await?;
|
||||
file_buf.flush().await?;
|
||||
remaining_data = remaining_data - n as u64;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
let read_result = reader.read(&mut buf);
|
||||
|
||||
match read_result.await {
|
||||
Ok(_) => {
|
||||
let mut buf_slice = &buf[0..(remaining_data as usize)];
|
||||
file_buf.write_all(&mut buf_slice).await?;
|
||||
file_buf.flush().await?;
|
||||
remaining_data = 0;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ACK file
|
||||
writer.write_all(b"ACK\n").await?;
|
||||
writer.flush().await?;
|
||||
println!(
|
||||
"[+] Successfully wrote {} bytes to {:#?}\n",
|
||||
filesize, output_path
|
||||
);
|
||||
}
|
||||
|
||||
println!("[+] Requesting connection termination");
|
||||
writer.write_all(b"FIN\n").await?;
|
||||
writer.flush().await?;
|
||||
println!(
|
||||
"[+] Successfully wrote {} bytes to {:#?}\n",
|
||||
file.1, output_path
|
||||
);
|
||||
}
|
||||
|
||||
println!("[+] All files finished, requesting connection termination");
|
||||
writer.write_all(b"FIN\n").await?;
|
||||
writer.flush().await?;
|
||||
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
|
||||
});
|
||||
|
||||
// Separate thread for blocking stdin
|
||||
let input_task = thread::spawn(move || {
|
||||
let mut input_string = String::new();
|
||||
while input_string.trim() != "DISCONNECT" {
|
||||
input_string.clear();
|
||||
stdin().read_line(&mut input_string)?;
|
||||
print!("\n");
|
||||
tx.send(input_string.clone())?;
|
||||
}
|
||||
|
||||
Ok::<(), Box<dyn Error + Send + Sync>>(())
|
||||
});
|
||||
|
||||
match connection_task.join().unwrap().await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
eprintln!("[-] Error inside connection thread: {}", e);
|
||||
exit(0x0100);
|
||||
}
|
||||
}
|
||||
|
||||
if !download_all {
|
||||
match input_task.join().unwrap() {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
eprintln!("[-] Error inside input thread: {}", e);
|
||||
exit(0x0100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -25,6 +25,9 @@ struct Args {
|
||||
/// Path to the folder where the files are outputted as a client or
|
||||
/// served from as a server [default: './output' / './data']
|
||||
fileroot: Option<PathBuf>,
|
||||
#[clap(default_value_t = false, short = 'a', long, action)]
|
||||
/// Automatically download every available file from the host (skips stdin)
|
||||
all: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@ -39,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
None => PathBuf::from("./output"),
|
||||
};
|
||||
|
||||
client::connect(addr, fileroot)
|
||||
client::connect(addr, fileroot, args.all)
|
||||
.await
|
||||
.expect("Error initializing client");
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use local_ip_address::local_ip;
|
||||
use std::{
|
||||
error::Error,
|
||||
fs::read_dir,
|
||||
net::{IpAddr, SocketAddr},
|
||||
path::PathBuf,
|
||||
@ -20,7 +21,7 @@ pub async fn listen(
|
||||
buffersize: usize,
|
||||
localhost: bool,
|
||||
timeout_duration: u64,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let addr = match localhost {
|
||||
true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port),
|
||||
false => SocketAddr::new(local_ip()?, port),
|
||||
@ -57,13 +58,12 @@ pub async fn listen(
|
||||
writer.flush().await?;
|
||||
|
||||
// Read ACK
|
||||
//let _bytes_read = reader.read_buf(&mut vec_buf).await?;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
|
||||
let msg = String::from_utf8(vec_buf.clone())?;
|
||||
if msg.trim() != "ACK" {
|
||||
let e: Box<dyn std::error::Error + Send + Sync> =
|
||||
let e: Box<dyn Error + Send + Sync> =
|
||||
format!("ACK not received (buffersize)").into();
|
||||
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(e);
|
||||
return Err::<(), Box<dyn Error + Send + Sync>>(e);
|
||||
} else {
|
||||
vec_buf.clear();
|
||||
}
|
||||
@ -79,17 +79,16 @@ pub async fn listen(
|
||||
writer.flush().await?;
|
||||
|
||||
// Read FIN
|
||||
//let _bytes_read = reader.read_buf(&mut vec_buf).await?;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
|
||||
let msg = String::from_utf8(vec_buf.clone())?;
|
||||
if msg.trim() != "FIN" {
|
||||
let e: Box<dyn std::error::Error + Send + Sync> =
|
||||
let e: Box<dyn Error + Send + Sync> =
|
||||
format!("ACK not received (server-side termination)").into();
|
||||
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(e);
|
||||
return Err::<(), Box<dyn Error + Send + Sync>>(e);
|
||||
} else {
|
||||
// Empty error as error's reason is already logged with println
|
||||
let e: Box<dyn std::error::Error + Send + Sync> = "".into();
|
||||
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(e);
|
||||
let e: Box<dyn Error + Send + Sync> = "".into();
|
||||
return Err::<(), Box<dyn Error + Send + Sync>>(e);
|
||||
}
|
||||
} else {
|
||||
// Send file amount
|
||||
@ -99,13 +98,12 @@ pub async fn listen(
|
||||
}
|
||||
|
||||
// Read ACK
|
||||
//let _bytes_read = reader.read_buf(&mut vec_buf).await?;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
|
||||
let msg = String::from_utf8(vec_buf.clone())?;
|
||||
if msg.trim() != "ACK" {
|
||||
let e: Box<dyn std::error::Error + Send + Sync> =
|
||||
let e: Box<dyn Error + Send + Sync> =
|
||||
format!("ACK not received (file amount)").into();
|
||||
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(e);
|
||||
return Err::<(), Box<dyn Error + Send + Sync>>(e);
|
||||
} else {
|
||||
vec_buf.clear();
|
||||
}
|
||||
@ -121,11 +119,10 @@ pub async fn listen(
|
||||
// Handle file request(s)
|
||||
println!("[+] Ready to serve files");
|
||||
loop {
|
||||
//let bytes_read = reader.read_buf(&mut vec_buf).await?;
|
||||
let bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
|
||||
|
||||
if bytes_read == 0 {
|
||||
println!("[-] File request never received");
|
||||
println!("[-] File request never received or client crashed");
|
||||
vec_buf.clear();
|
||||
break;
|
||||
} else {
|
||||
@ -133,7 +130,7 @@ pub async fn listen(
|
||||
vec_buf.clear();
|
||||
|
||||
if msg.trim() == "FIN" {
|
||||
println!("[+] FIN received, terminating individual connection...");
|
||||
println!("[+] FIN received, terminating connection with {}", addr);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -159,20 +156,19 @@ pub async fn listen(
|
||||
}
|
||||
|
||||
// Read ACK
|
||||
//let _bytes_read = reader.read_buf(&mut vec_buf).await?;
|
||||
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
|
||||
let msg = String::from_utf8(vec_buf.clone())?;
|
||||
if msg.trim() != "ACK" {
|
||||
let e: Box<dyn std::error::Error + Send + Sync> =
|
||||
let e: Box<dyn Error + Send + Sync> =
|
||||
format!("ACK not received (single file's data)").into();
|
||||
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(e);
|
||||
return Err::<(), Box<dyn Error + Send + Sync>>(e);
|
||||
} else {
|
||||
println!("[+] File transfer successfully done");
|
||||
vec_buf.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
|
||||
Ok::<(), Box<dyn Error + Send + Sync>>(())
|
||||
});
|
||||
|
||||
match task.await? {
|
||||
@ -186,7 +182,7 @@ pub async fn listen(
|
||||
|
||||
async fn get_metadata(
|
||||
fileroot: PathBuf,
|
||||
) -> Result<(Vec<(String, u64)>, usize), Box<dyn std::error::Error + Send + Sync>> {
|
||||
) -> Result<(Vec<(String, u64)>, usize), Box<dyn Error + Send + Sync>> {
|
||||
let mut metadata = Vec::<(String, u64)>::new();
|
||||
let paths = read_dir(fileroot)?;
|
||||
|
||||
|
@ -11,10 +11,9 @@ use std::{
|
||||
use tokio_test::block_on;
|
||||
|
||||
#[test]
|
||||
#[timeout(10000)]
|
||||
/// Syncs three textfiles from ./data to ./output and checks
|
||||
/// that their contents match.
|
||||
fn sync_txt_files() {
|
||||
#[timeout(8000)]
|
||||
/// Syncs three textfiles from ./data to ./output and checks that their contents match
|
||||
fn inputless_filesync_test() {
|
||||
let data = vec![
|
||||
("1.txt", create_data()),
|
||||
("2.txt", create_data()),
|
||||
@ -28,6 +27,7 @@ fn sync_txt_files() {
|
||||
}
|
||||
|
||||
let server_handle = thread::spawn(|| {
|
||||
// Start the server in the local network, timeouts after 5 secs of inactivity
|
||||
block_on(server::listen(
|
||||
8080u16,
|
||||
PathBuf::from("./data"),
|
||||
@ -38,18 +38,21 @@ fn sync_txt_files() {
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// Sleep to give server time to start up
|
||||
sleep(Duration::from_millis(500));
|
||||
|
||||
let client_handle = thread::spawn(|| {
|
||||
// Run the client inputless
|
||||
block_on(client::connect(
|
||||
String::from("127.0.0.1:8080"),
|
||||
PathBuf::from("./output"),
|
||||
true,
|
||||
))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
client_handle.join().unwrap();
|
||||
|
||||
// Sleep to give server time to start up
|
||||
sleep(Duration::from_millis(500));
|
||||
|
||||
server_handle.join().unwrap();
|
||||
|
||||
for file in data {
|
||||
|
Loading…
x
Reference in New Issue
Block a user