From c933339771a71cc4b4e24344010100d7177cc535 Mon Sep 17 00:00:00 2001 From: einisto <79069176+einisto@users.noreply.github.com> Date: Tue, 26 Jul 2022 22:08:31 +0300 Subject: [PATCH] File selection based on stdin * Spawn to threads as a client to handle stdin (blocking) & connection/transfer * Input from stdin gets transfered to connection-thread via MPSC-channel * Included a new flag (-a) to skip stdin and instead download all available files from the host * Removed earlier typos --- src/client.rs | 317 ++++++++++++++++++++++++-------------- src/main.rs | 5 +- src/server.rs | 36 ++--- tests/integration_test.rs | 17 +- 4 files changed, 228 insertions(+), 147 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4d667ac..2bea755 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,13 @@ -use std::{path::PathBuf, time::Duration}; +use std::{ + collections::HashMap, + error::Error, + io::stdin, + path::PathBuf, + process::exit, + sync::mpsc::{self, Receiver, Sender}, + thread, + time::Duration, +}; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, @@ -6,133 +15,203 @@ use tokio::{ time::sleep, }; -pub async fn connect(addr: String, fileroot: PathBuf) -> Result<(), Box> { - println!("[+] Connecting to {}", addr); - let mut stream = TcpStream::connect(addr.clone()).await?; +pub async fn connect( + addr: String, + fileroot: PathBuf, + download_all: bool, +) -> Result<(), Box> { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); - let (reader, writer) = stream.split(); - let mut reader = BufReader::new(reader); - let mut writer = BufWriter::new(writer); + let connection_task = thread::spawn(move || async move { + println!("[+] Connecting to {}", addr); + let mut stream = TcpStream::connect(addr.clone()).await?; - let mut buf = Vec::new(); + let (reader, writer) = stream.split(); + let mut reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); - loop { - let bytes_read = reader.read_buf(&mut buf).await?; - if bytes_read == 0 { - println!("[-] No more bytes received, closing connection"); - break; - } + let mut buf = Vec::new(); - // Receive buffersize - let buffersize = String::from_utf8(buf.clone())?.parse::()?; - println!("[+] Selected buffersize: {}", buffersize); - buf.clear(); - - // ACK buffersize - writer.write_all(b"ACK\n").await.unwrap(); - writer.flush().await?; - - // Receive file amount (or termination request if the server does not have any files available) - let file_amount: usize; - let _bytes_read = reader.read_until(b'\n', &mut buf).await?; - let msg = String::from_utf8(buf.clone())?; - if msg.trim() == "FIN" { - println!("[-] Server does not have any files available, closing connection"); - writer.write_all(b"FIN\n").await?; - writer.flush().await?; - break; - } else { - file_amount = msg.trim().parse::()?; - println!("[+] Total of {} files available", file_amount); - buf.clear(); - - // ACK file amount - writer.write_all(b"ACK\n").await?; - writer.flush().await?; - } - - // Receive file metadata - println!("[+] Receiving file metadata"); - let mut metadata = Vec::<(String, u64)>::new(); - while metadata.len() < file_amount { - reader.read_until(b'\n', &mut buf).await?; - let msg = String::from_utf8(buf.clone())?; - buf.clear(); - - // Parse 'filesize:filename' - let split = msg.split(":").collect::>(); - let filesize = split[0].trim().parse::()?; - let filename = split[1].trim().to_string(); - - metadata.push((filename, filesize)); - } - println!("[INFO] Metadata: {:?}", metadata); - - // Send request for each file by filename - // TODO: Choose files based on input - println!("[+] Requesting files individually"); - for file in &metadata { - println!("[INFO] Current request: [{:?}]", file); - let msg = file.0.to_string() + "\n"; - writer.write_all(msg.as_bytes()).await?; - writer.flush().await?; - - // Create file locally - let mut output_path = fileroot.clone(); - output_path.push(file.0.clone()); - - let output_file = File::create(output_path.clone()).await?; - println!("[+] New file: {:#?}", output_path); - let mut file_buf = BufWriter::new(output_file); - - // Receive the file itself - let mut remaining_data = file.1; - let mut buf = vec![0u8; buffersize]; - - while remaining_data != 0 { - if remaining_data >= buffersize as u64 { - let read_result = reader.read(&mut buf); - - match read_result.await { - Ok(0) => { - println!("[-] Waiting for data to become available..."); - sleep(Duration::from_secs(5)).await; - continue; - } - Ok(n) => { - file_buf.write_all(&mut buf).await?; - file_buf.flush().await?; - remaining_data = remaining_data - n as u64; - } - _ => {} - } - } else { - let read_result = reader.read(&mut buf); - - match read_result.await { - Ok(_) => { - let mut buf_slice = &buf[0..(remaining_data as usize)]; - file_buf.write_all(&mut buf_slice).await?; - file_buf.flush().await?; - remaining_data = 0; - } - _ => {} - } - } + loop { + let bytes_read = reader.read_buf(&mut buf).await?; + if bytes_read == 0 { + println!("[-] No more bytes received, closing connection"); + break; } - // ACK file - writer.write_all(b"ACK\n").await?; + // Receive buffersize + let buffersize = String::from_utf8(buf.clone())?.parse::()?; + println!("[+] Selected buffersize: {}", buffersize); + buf.clear(); + + // ACK buffersize + writer.write_all(b"ACK\n").await.unwrap(); + writer.flush().await?; + + // Receive file amount (or termination request if the server does not have any files available) + let file_amount: usize; + let _bytes_read = reader.read_until(b'\n', &mut buf).await?; + let msg = String::from_utf8(buf.clone())?; + if msg.trim() == "FIN" { + println!("[-] Server does not have any files available, closing connection"); + writer.write_all(b"FIN\n").await?; + writer.flush().await?; + break; + } else { + file_amount = msg.trim().parse::()?; + println!("[+] Total of {} files available", file_amount); + buf.clear(); + + // ACK file amount + writer.write_all(b"ACK\n").await?; + writer.flush().await?; + } + + // Receive file metadata + println!("[+] Receiving file metadata"); + let mut metadata = HashMap::new(); + while metadata.len() < file_amount { + reader.read_until(b'\n', &mut buf).await?; + let msg = String::from_utf8(buf.clone())?; + buf.clear(); + + // Parse 'filesize:filename' + let split = msg.split(":").collect::>(); + let filesize = split[0].trim().parse::()?; + let filename = split[1].trim().to_string(); + + metadata.insert(filename, filesize); + } + println!("[INFO] Metadata: {:?}", metadata); + + // Send request for each file by filename + println!("[+] Requesting files individually\n"); + let filenames = metadata.keys().collect::>(); + let mut filenames_iter = filenames.iter(); + + let mut input = String::new(); + loop { + input.clear(); + + if download_all { + match filenames_iter.next() { + Some(filename) => { + input.push_str(filename); + } + None => input.push_str("DISCONNECT"), + } + } else { + // Blocks the current thread until a message is readable + // Requests (messages) get queued if they can't be served immediately + let msg = rx.recv()?; + + input.push_str(msg.trim()); + } + + if input == "DISCONNECT" { + break; + } else if !metadata.contains_key(input.as_str()) { + println!("[-] No file named '{}' available\n", input); + continue; + } + + // Handle request based on input received from channel + println!("[+] Requesting file named '{}'", input); + let msg = input.to_string() + "\n"; + writer.write_all(msg.as_bytes()).await?; + writer.flush().await?; + + // Create file locally + let mut output_path = fileroot.clone(); + output_path.push(input.clone()); + + let output_file = File::create(output_path.clone()).await?; + println!("[+] New file: {:#?}", output_path); + let mut file_buf = BufWriter::new(output_file); + + // Receive the file itself + let filesize = metadata.get(input.as_str()).unwrap().clone(); + let mut remaining_data = filesize; + let mut buf = vec![0u8; buffersize]; + + while remaining_data != 0 { + if remaining_data >= buffersize as u64 { + let read_result = reader.read(&mut buf); + + match read_result.await { + Ok(0) => { + println!("[-] Waiting for data to become available..."); + sleep(Duration::from_secs(5)).await; + continue; + } + Ok(n) => { + file_buf.write_all(&mut buf).await?; + file_buf.flush().await?; + remaining_data = remaining_data - n as u64; + } + _ => {} + } + } else { + let read_result = reader.read(&mut buf); + + match read_result.await { + Ok(_) => { + let mut buf_slice = &buf[0..(remaining_data as usize)]; + file_buf.write_all(&mut buf_slice).await?; + file_buf.flush().await?; + remaining_data = 0; + } + _ => {} + } + } + } + + // ACK file + writer.write_all(b"ACK\n").await?; + writer.flush().await?; + println!( + "[+] Successfully wrote {} bytes to {:#?}\n", + filesize, output_path + ); + } + + println!("[+] Requesting connection termination"); + writer.write_all(b"FIN\n").await?; writer.flush().await?; - println!( - "[+] Successfully wrote {} bytes to {:#?}\n", - file.1, output_path - ); } - println!("[+] All files finished, requesting connection termination"); - writer.write_all(b"FIN\n").await?; - writer.flush().await?; + Ok::<(), Box>(()) + }); + + // Separate thread for blocking stdin + let input_task = thread::spawn(move || { + let mut input_string = String::new(); + while input_string.trim() != "DISCONNECT" { + input_string.clear(); + stdin().read_line(&mut input_string)?; + print!("\n"); + tx.send(input_string.clone())?; + } + + Ok::<(), Box>(()) + }); + + match connection_task.join().unwrap().await { + Ok(_) => {} + Err(e) => { + eprintln!("[-] Error inside connection thread: {}", e); + exit(0x0100); + } + } + + if !download_all { + match input_task.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("[-] Error inside input thread: {}", e); + exit(0x0100); + } + } } Ok(()) diff --git a/src/main.rs b/src/main.rs index 9b8fdf0..8ef3b31 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,9 @@ struct Args { /// Path to the folder where the files are outputted as a client or /// served from as a server [default: './output' / './data'] fileroot: Option, + #[clap(default_value_t = false, short = 'a', long, action)] + /// Automatically download every available file from the host (skips stdin) + all: bool, } #[tokio::main] @@ -39,7 +42,7 @@ async fn main() -> Result<(), Box> { None => PathBuf::from("./output"), }; - client::connect(addr, fileroot) + client::connect(addr, fileroot, args.all) .await .expect("Error initializing client"); } diff --git a/src/server.rs b/src/server.rs index 5d9a794..374f61a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,6 @@ use local_ip_address::local_ip; use std::{ + error::Error, fs::read_dir, net::{IpAddr, SocketAddr}, path::PathBuf, @@ -20,7 +21,7 @@ pub async fn listen( buffersize: usize, localhost: bool, timeout_duration: u64, -) -> Result<(), Box> { +) -> Result<(), Box> { let addr = match localhost { true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port), false => SocketAddr::new(local_ip()?, port), @@ -57,13 +58,12 @@ pub async fn listen( writer.flush().await?; // Read ACK - //let _bytes_read = reader.read_buf(&mut vec_buf).await?; let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; let msg = String::from_utf8(vec_buf.clone())?; if msg.trim() != "ACK" { - let e: Box = + let e: Box = format!("ACK not received (buffersize)").into(); - return Err::<(), Box>(e); + return Err::<(), Box>(e); } else { vec_buf.clear(); } @@ -79,17 +79,16 @@ pub async fn listen( writer.flush().await?; // Read FIN - //let _bytes_read = reader.read_buf(&mut vec_buf).await?; let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; let msg = String::from_utf8(vec_buf.clone())?; if msg.trim() != "FIN" { - let e: Box = + let e: Box = format!("ACK not received (server-side termination)").into(); - return Err::<(), Box>(e); + return Err::<(), Box>(e); } else { // Empty error as error's reason is already logged with println - let e: Box = "".into(); - return Err::<(), Box>(e); + let e: Box = "".into(); + return Err::<(), Box>(e); } } else { // Send file amount @@ -99,13 +98,12 @@ pub async fn listen( } // Read ACK - //let _bytes_read = reader.read_buf(&mut vec_buf).await?; let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; let msg = String::from_utf8(vec_buf.clone())?; if msg.trim() != "ACK" { - let e: Box = + let e: Box = format!("ACK not received (file amount)").into(); - return Err::<(), Box>(e); + return Err::<(), Box>(e); } else { vec_buf.clear(); } @@ -121,11 +119,10 @@ pub async fn listen( // Handle file request(s) println!("[+] Ready to serve files"); loop { - //let bytes_read = reader.read_buf(&mut vec_buf).await?; let bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; if bytes_read == 0 { - println!("[-] File request never received"); + println!("[-] File request never received or client crashed"); vec_buf.clear(); break; } else { @@ -133,7 +130,7 @@ pub async fn listen( vec_buf.clear(); if msg.trim() == "FIN" { - println!("[+] FIN received, terminating individual connection..."); + println!("[+] FIN received, terminating connection with {}", addr); break; } @@ -159,20 +156,19 @@ pub async fn listen( } // Read ACK - //let _bytes_read = reader.read_buf(&mut vec_buf).await?; let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; let msg = String::from_utf8(vec_buf.clone())?; if msg.trim() != "ACK" { - let e: Box = + let e: Box = format!("ACK not received (single file's data)").into(); - return Err::<(), Box>(e); + return Err::<(), Box>(e); } else { println!("[+] File transfer successfully done"); vec_buf.clear(); } } - Ok::<(), Box>(()) + Ok::<(), Box>(()) }); match task.await? { @@ -186,7 +182,7 @@ pub async fn listen( async fn get_metadata( fileroot: PathBuf, -) -> Result<(Vec<(String, u64)>, usize), Box> { +) -> Result<(Vec<(String, u64)>, usize), Box> { let mut metadata = Vec::<(String, u64)>::new(); let paths = read_dir(fileroot)?; diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 4b61f79..0cb7259 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -11,10 +11,9 @@ use std::{ use tokio_test::block_on; #[test] -#[timeout(10000)] -/// Syncs three textfiles from ./data to ./output and checks -/// that their contents match. -fn sync_txt_files() { +#[timeout(8000)] +/// Syncs three textfiles from ./data to ./output and checks that their contents match +fn inputless_filesync_test() { let data = vec![ ("1.txt", create_data()), ("2.txt", create_data()), @@ -28,6 +27,7 @@ fn sync_txt_files() { } let server_handle = thread::spawn(|| { + // Start the server in the local network, timeouts after 5 secs of inactivity block_on(server::listen( 8080u16, PathBuf::from("./data"), @@ -38,18 +38,21 @@ fn sync_txt_files() { .unwrap(); }); - // Sleep to give server time to start up - sleep(Duration::from_millis(500)); - let client_handle = thread::spawn(|| { + // Run the client inputless block_on(client::connect( String::from("127.0.0.1:8080"), PathBuf::from("./output"), + true, )) .unwrap(); }); client_handle.join().unwrap(); + + // Sleep to give server time to start up + sleep(Duration::from_millis(500)); + server_handle.join().unwrap(); for file in data {