diff --git a/Cargo.lock b/Cargo.lock index 630a7ca..415cf2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ checksum = "fb58b6451e8c2a812ad979ed1d83378caa5e927eef2622017a45f251457c2c9d" [[package]] name = "fragilebyte" -version = "0.2.0" +version = "0.3.0" dependencies = [ "clap", "local-ip-address", diff --git a/Cargo.toml b/Cargo.toml index c67bf52..3642469 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "fragilebyte" authors = ["Arttu Einistö"] description = "TCP socket pair for file transfer, backend for https://github.com/einisto/leightbox" -version = "0.2.0" +version = "0.3.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -11,8 +11,8 @@ edition = "2021" tokio = { version = "1.19.2", features = ["full"] } clap = { version = "3.2.8", features = ["derive"] } local-ip-address = "0.4.4" +rand = "0.8.5" [dev-dependencies] tokio-test = "0.4.2" -rand = "0.8.5" ntest = "0.8.1" diff --git a/doc/structure.svg b/doc/structure.svg index 8b8ce3c..29be261 100644 --- a/doc/structure.svg +++ b/doc/structure.svg @@ -1,18 +1,18 @@ - - + - + fragilebyte - -fragilebyte + +fragilebyte @@ -23,8 +23,8 @@ fragilebyte->clap - - + + @@ -35,20 +35,32 @@ fragilebyte->local-ip-address - - + + + + + +rand + +rand + + + +fragilebyte->rand + + - + tokio - -tokio + +tokio - + fragilebyte->tokio - - + + diff --git a/src/client.rs b/src/client.rs index 2bea755..ed22987 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,13 +11,17 @@ use std::{ use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::TcpStream, + net::{ + tcp::{ReadHalf, WriteHalf}, + TcpStream, + }, time::sleep, }; pub async fn connect( addr: String, fileroot: PathBuf, + access_key: String, download_all: bool, ) -> Result<(), Box> { let (tx, rx): (Sender, Receiver) = mpsc::channel(); @@ -32,174 +36,60 @@ pub async fn connect( let mut buf = Vec::new(); - loop { - let bytes_read = reader.read_buf(&mut buf).await?; - if bytes_read == 0 { - println!("[-] No more bytes received, closing connection"); - break; + // Authenticate connection + match authenticate_connection(&mut reader, &mut writer, &mut buf, &access_key).await? { + None => println!("[+] Connection authenticated successfully"), + Some(err_msg) => { + println!("{}", err_msg); + exit(0x0100); } - - // Receive buffersize - let buffersize = String::from_utf8(buf.clone())?.parse::()?; - println!("[+] Selected buffersize: {}", buffersize); - buf.clear(); - - // ACK buffersize - writer.write_all(b"ACK\n").await.unwrap(); - writer.flush().await?; - - // Receive file amount (or termination request if the server does not have any files available) - let file_amount: usize; - let _bytes_read = reader.read_until(b'\n', &mut buf).await?; - let msg = String::from_utf8(buf.clone())?; - if msg.trim() == "FIN" { - println!("[-] Server does not have any files available, closing connection"); - writer.write_all(b"FIN\n").await?; - writer.flush().await?; - break; - } else { - file_amount = msg.trim().parse::()?; - println!("[+] Total of {} files available", file_amount); - buf.clear(); - - // ACK file amount - writer.write_all(b"ACK\n").await?; - writer.flush().await?; - } - - // Receive file metadata - println!("[+] Receiving file metadata"); - let mut metadata = HashMap::new(); - while metadata.len() < file_amount { - reader.read_until(b'\n', &mut buf).await?; - let msg = String::from_utf8(buf.clone())?; - buf.clear(); - - // Parse 'filesize:filename' - let split = msg.split(":").collect::>(); - let filesize = split[0].trim().parse::()?; - let filename = split[1].trim().to_string(); - - metadata.insert(filename, filesize); - } - println!("[INFO] Metadata: {:?}", metadata); - - // Send request for each file by filename - println!("[+] Requesting files individually\n"); - let filenames = metadata.keys().collect::>(); - let mut filenames_iter = filenames.iter(); - - let mut input = String::new(); - loop { - input.clear(); - - if download_all { - match filenames_iter.next() { - Some(filename) => { - input.push_str(filename); - } - None => input.push_str("DISCONNECT"), - } - } else { - // Blocks the current thread until a message is readable - // Requests (messages) get queued if they can't be served immediately - let msg = rx.recv()?; - - input.push_str(msg.trim()); - } - - if input == "DISCONNECT" { - break; - } else if !metadata.contains_key(input.as_str()) { - println!("[-] No file named '{}' available\n", input); - continue; - } - - // Handle request based on input received from channel - println!("[+] Requesting file named '{}'", input); - let msg = input.to_string() + "\n"; - writer.write_all(msg.as_bytes()).await?; - writer.flush().await?; - - // Create file locally - let mut output_path = fileroot.clone(); - output_path.push(input.clone()); - - let output_file = File::create(output_path.clone()).await?; - println!("[+] New file: {:#?}", output_path); - let mut file_buf = BufWriter::new(output_file); - - // Receive the file itself - let filesize = metadata.get(input.as_str()).unwrap().clone(); - let mut remaining_data = filesize; - let mut buf = vec![0u8; buffersize]; - - while remaining_data != 0 { - if remaining_data >= buffersize as u64 { - let read_result = reader.read(&mut buf); - - match read_result.await { - Ok(0) => { - println!("[-] Waiting for data to become available..."); - sleep(Duration::from_secs(5)).await; - continue; - } - Ok(n) => { - file_buf.write_all(&mut buf).await?; - file_buf.flush().await?; - remaining_data = remaining_data - n as u64; - } - _ => {} - } - } else { - let read_result = reader.read(&mut buf); - - match read_result.await { - Ok(_) => { - let mut buf_slice = &buf[0..(remaining_data as usize)]; - file_buf.write_all(&mut buf_slice).await?; - file_buf.flush().await?; - remaining_data = 0; - } - _ => {} - } - } - } - - // ACK file - writer.write_all(b"ACK\n").await?; - writer.flush().await?; - println!( - "[+] Successfully wrote {} bytes to {:#?}\n", - filesize, output_path - ); - } - - println!("[+] Requesting connection termination"); - writer.write_all(b"FIN\n").await?; - writer.flush().await?; } + // Receive buffersize + let buffersize = recv_msg_string(&mut reader, &mut buf) + .await? + .parse::()?; + println!("[+] Selected buffersize: {}", buffersize); + + // ACK buffersize + send_msg(&mut writer, "ACK\n").await?; + + // Receive metadata + let metadata = match receive_metadata(&mut reader, &mut writer, &mut buf).await? { + Some(metadata) => metadata, + None => exit(0x0100), + }; + + println!("[+] Received metadata: {:#?}", metadata); + + // Send request for each file by filename + println!("[+] [ + Enter] to make a request\n"); + handle_file_reqs( + &mut reader, + &mut writer, + rx, + &buffersize, + &metadata, + &fileroot, + &download_all, + ) + .await?; + + // Terminating connection + println!("[+] Requesting connection termination"); + writer.write_all(b"FIN\n").await?; + writer.flush().await?; + Ok::<(), Box>(()) }); // Separate thread for blocking stdin - let input_task = thread::spawn(move || { - let mut input_string = String::new(); - while input_string.trim() != "DISCONNECT" { - input_string.clear(); - stdin().read_line(&mut input_string)?; - print!("\n"); - tx.send(input_string.clone())?; - } - - Ok::<(), Box>(()) - }); + let input_task = thread::spawn(move || handle_stdin(tx)); match connection_task.join().unwrap().await { Ok(_) => {} Err(e) => { - eprintln!("[-] Error inside connection thread: {}", e); + eprintln!("[ERROR] Error inside connection thread: {}", e); exit(0x0100); } } @@ -208,7 +98,7 @@ pub async fn connect( match input_task.join().unwrap() { Ok(_) => {} Err(e) => { - eprintln!("[-] Error inside input thread: {}", e); + eprintln!("[ERROR] Error inside input thread: {}", e); exit(0x0100); } } @@ -216,3 +106,220 @@ pub async fn connect( Ok(()) } + +async fn send_msg( + writer: &mut BufWriter>, + msg: &str, +) -> Result<(), Box> { + writer.write_all(msg.as_bytes()).await?; + writer.flush().await?; + Ok(()) +} + +async fn recv_msg_string( + reader: &mut BufReader>, + buf: &mut Vec, +) -> Result> { + let bytes_received = reader.read_until(b'\n', buf).await?; + + if bytes_received == 0 { + let e: Box = + format!("No message received or server crashed").into(); + return Err::>(e); + } + + let msg = String::from_utf8(buf.clone())?; + buf.clear(); + + Ok(msg.trim().to_string()) +} + +fn handle_stdin(tx: Sender) -> Result<(), Box> { + let mut input_string = String::new(); + while input_string.trim() != "DISCONNECT" { + input_string.clear(); + stdin().read_line(&mut input_string)?; + print!("\n"); + tx.send(input_string.clone())?; + } + + Ok::<(), Box>(()) +} + +async fn authenticate_connection( + reader: &mut BufReader>, + writer: &mut BufWriter>, + buf: &mut Vec, + access_key: &String, +) -> Result, Box> { + // Receive ACK to indicate ready-to-receive status + if recv_msg_string(reader, buf).await? != "SYN" { + return Ok(Some( + "[-] Server is not ready to receive access key, terminating connection".to_string(), + )); + } + + // Send access key + send_msg(writer, (access_key.to_string() + "\n").as_str()).await?; + + // Terminate connection if key is invalid + if recv_msg_string(reader, buf).await? == "FIN" { + return Ok(Some( + "[-] Incorrect access key, terminating connection".to_string(), + )); + } else { + send_msg(writer, "ACK\n").await?; + Ok(None) + } +} + +async fn receive_metadata( + reader: &mut BufReader>, + writer: &mut BufWriter>, + buf: &mut Vec, +) -> Result>, Box> { + // Receive file amount or terminate if no files available + let msg = recv_msg_string(reader, buf).await?; + if msg == "FIN" { + println!("[-] Server does not have any files available, closing connection"); + return Ok(None); + } + + let file_amount = msg.parse::()?; + println!("[+] Total of {} files available", file_amount); + + // ACK file amount + send_msg(writer, "ACK\n").await?; + + // Receive file metadata + let mut metadata = HashMap::new(); + while metadata.len() < file_amount { + let msg = recv_msg_string(reader, buf).await?; + + // Parse 'filesize:filename' + let split = msg.split(":").collect::>(); + let filesize = split[0].trim().parse::()?; + let filename = split[1].trim().to_string(); + + metadata.insert(filename, filesize); + } + + Ok(Some(metadata)) +} + +async fn create_filehandle( + fileroot: &PathBuf, + filename: &String, +) -> Result<(BufWriter, PathBuf), Box> { + let mut output_path = fileroot.clone(); + output_path.push(&filename); + let output_file = File::create(output_path.clone()).await?; + println!("[+] New file: {:#?}", output_path); + + Ok((BufWriter::new(output_file), output_path)) +} + +async fn handle_file_reqs( + reader: &mut BufReader>, + writer: &mut BufWriter>, + rx: Receiver, + buffersize: &usize, + metadata: &HashMap, + fileroot: &PathBuf, + download_all: &bool, +) -> Result<(), Box> { + let filenames = metadata.keys().collect::>(); + let mut filenames_iter = filenames.iter(); + + let mut input_string = String::new(); + + loop { + input_string.clear(); + + if *download_all { + match filenames_iter.next() { + Some(filename) => { + input_string.push_str(filename); + } + None => input_string.push_str("DISCONNECT"), + } + } else { + // Blocks the current thread until a message is readable + // Requests (messages) get queued if they can't be served immediately + let msg = rx.recv()?; + input_string.push_str(msg.trim()); + } + + // Terminate connection on request + if input_string == "DISCONNECT" { + break; + } else if !metadata.contains_key(input_string.as_str()) { + println!("[-] No file named '{}' available\n", input_string); + continue; + } + + // Handle request based on input received from channel + println!("[+] Requesting file named '{}'", input_string); + send_msg(writer, (input_string.to_string() + "\n").as_str()).await?; + + // Create file locally + let (mut file_buf, output_path) = create_filehandle(&fileroot, &input_string).await?; + + // Receive the file itself + let filesize = metadata.get(input_string.as_str()).unwrap().clone(); + receive_file(reader, &mut file_buf, &filesize, buffersize).await?; + + // ACK file + send_msg(writer, "ACK\n").await?; + println!( + "[+] Successfully wrote {} bytes to {:#?}\n", + filesize, output_path + ); + } + + Ok(()) +} + +async fn receive_file( + reader: &mut BufReader>, + file_buf: &mut BufWriter, + filesize: &u64, + buffersize: &usize, +) -> Result<(), Box> { + let mut remaining_data = *filesize; + let mut buf = vec![0u8; *buffersize]; + + while remaining_data != 0 { + if remaining_data >= *buffersize as u64 { + let read_result = reader.read(&mut buf); + + match read_result.await { + Ok(0) => { + println!("[-] Connection lost, trying again until [Ctrl + V]..."); + sleep(Duration::from_secs(5)).await; + continue; + } + Ok(n) => { + file_buf.write_all(&mut buf).await?; + file_buf.flush().await?; + remaining_data = remaining_data - n as u64; + } + _ => {} + } + } else { + let read_result = reader.read(&mut buf); + + match read_result.await { + Ok(_) => { + let mut buf_slice = &buf[0..(remaining_data as usize)]; + file_buf.write_all(&mut buf_slice).await?; + file_buf.flush().await?; + remaining_data = 0; + } + _ => {} + } + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 8ef3b31..6ff33fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use fragilebyte::{client, server}; -use std::{path::PathBuf, str::FromStr}; +use std::{error::Error, path::PathBuf, process::exit, str::FromStr}; use tokio; #[derive(Parser, Debug)] @@ -9,6 +9,9 @@ struct Args { #[clap(short = 't', long, value_parser)] /// Server's address when connecting as a client target: Option, + #[clap(short = 'k', long, value_parser)] + /// Alphanumeric 8 characters long key required to establish a connection to the host + key: Option, #[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_arg::)] /// Port where the service is hosted port: u16, @@ -31,7 +34,7 @@ struct Args { } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let args = Args::parse(); match args.target { @@ -41,8 +44,15 @@ async fn main() -> Result<(), Box> { Some(n) => n, None => PathBuf::from("./output"), }; + let access_key = match args.key { + Some(n) => n, + None => { + eprintln!("[-] Access key required as a client, please try again"); + exit(0x0100); + } + }; - client::connect(addr, fileroot, args.all) + client::connect(addr, fileroot, access_key, args.all) .await .expect("Error initializing client"); } @@ -59,6 +69,7 @@ async fn main() -> Result<(), Box> { args.buffersize, args.localhost, args.timeout, + false, ) .await .expect("Error initializing server"); diff --git a/src/server.rs b/src/server.rs index 374f61a..3abdfa3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,11 @@ use local_ip_address::local_ip; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::{ error::Error, fs::read_dir, net::{IpAddr, SocketAddr}, path::PathBuf, + process::exit, str::FromStr, time::Duration, }; @@ -11,7 +13,10 @@ use tokio::{ self, fs::File, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::TcpListener, + net::{ + tcp::{ReadHalf, WriteHalf}, + TcpListener, + }, time::timeout, }; @@ -21,17 +26,26 @@ pub async fn listen( buffersize: usize, localhost: bool, timeout_duration: u64, + use_testing_key: bool, ) -> Result<(), Box> { let addr = match localhost { true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port), false => SocketAddr::new(local_ip()?, port), }; + // Use weak access key for integration testing, otherwise 8 char alphanumeric + let access_key = match use_testing_key { + true => "test".to_string(), + false => generate_key(), + }; let listener = TcpListener::bind(addr).await?; println!("[+] Listening on {}", addr); + println!("[+] Access key: {}", access_key); loop { + // The first loop iteration would take the ownership without cloning let alt_fileroot = fileroot.clone(); + let alt_access_key = access_key.clone(); let (mut socket, addr) = match timeout(Duration::from_secs(timeout_duration), listener.accept()).await { @@ -44,144 +58,71 @@ pub async fn listen( break; } }; - println!("\n[+] New client: {}", addr); - let task = tokio::spawn(async move { + println!("\n[NEW] {}: Connected", addr); + + tokio::spawn(async move { let (reader, writer) = socket.split(); let mut reader = BufReader::new(reader); let mut writer = BufWriter::new(writer); let mut vec_buf = Vec::new(); + // ACK ready-to-receive status + send_msg(&mut writer, "SYN\n").await?; + + // Check access key + if !check_access_key(&mut reader, &mut writer, &mut vec_buf, &alt_access_key).await? { + println!("[FIN] {}: Incorrect access key", addr); + return Ok::<(), Box>(()); + } + // Send buffersize - writer.write_all(buffersize.to_string().as_bytes()).await?; - writer.flush().await?; + send_msg(&mut writer, (buffersize.to_string() + "\n").as_str()).await?; - // Read ACK - let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; - let msg = String::from_utf8(vec_buf.clone())?; - if msg.trim() != "ACK" { - let e: Box = - format!("ACK not received (buffersize)").into(); - return Err::<(), Box>(e); - } else { - vec_buf.clear(); + // ACK buffersize + if recv_msg_string(&mut reader, &mut vec_buf).await? != "ACK" { + return Ok::<(), Box>(()); } - let (metadata_list, file_amount) = get_metadata(alt_fileroot.clone()).await?; - if file_amount == 0 { - println!( - "[-] No files available inside fileroot {:#?}, shutting down", - alt_fileroot - ); - // No files available, request connection termination - writer.write_all(b"FIN\n").await?; - writer.flush().await?; - - // Read FIN - let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; - let msg = String::from_utf8(vec_buf.clone())?; - if msg.trim() != "FIN" { - let e: Box = - format!("ACK not received (server-side termination)").into(); - return Err::<(), Box>(e); - } else { - // Empty error as error's reason is already logged with println - let e: Box = "".into(); - return Err::<(), Box>(e); + // Send metadata + match handle_metadata(&mut reader, &mut writer, &mut vec_buf, &alt_fileroot, &addr) + .await? + { + None => println!("[DATA] {}: Ready to serve files", addr), + Some(err_msg) => { + println!("{}", err_msg); + exit(0x0100); } - } else { - // Send file amount - let msg = file_amount.to_string() + "\n"; - writer.write_all(msg.as_bytes()).await?; - writer.flush().await?; } - // Read ACK - let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; - let msg = String::from_utf8(vec_buf.clone())?; - if msg.trim() != "ACK" { - let e: Box = - format!("ACK not received (file amount)").into(); - return Err::<(), Box>(e); - } else { - vec_buf.clear(); - } - - // Send file metadata - for file in &metadata_list { - // Newline as delimiter between instances - let msg = format!("{}:{}\n", file.1, file.0); - writer.write_all(msg.as_bytes()).await?; - writer.flush().await?; - } - - // Handle file request(s) - println!("[+] Ready to serve files"); - loop { - let bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; - - if bytes_read == 0 { - println!("[-] File request never received or client crashed"); - vec_buf.clear(); - break; - } else { - let msg = String::from_utf8(vec_buf.clone())?; - vec_buf.clear(); - - if msg.trim() == "FIN" { - println!("[+] FIN received, terminating connection with {}", addr); - break; - } - - let mut input_path = alt_fileroot.clone(); - input_path.push(msg.trim()); - - println!("\n[+] File requested: {:#?}", input_path); - let mut file = File::open(input_path.clone()).await?; - let mut remaining_data = file.metadata().await?.len(); - let mut filebuf = vec![0u8; buffersize]; - - while remaining_data != 0 { - let read_result = file.read(&mut filebuf); - match read_result.await { - Ok(n) => { - writer.write_all(&filebuf).await?; - writer.flush().await?; - remaining_data = remaining_data - n as u64; - } - _ => {} - } - } - } - - // Read ACK - let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; - let msg = String::from_utf8(vec_buf.clone())?; - if msg.trim() != "ACK" { - let e: Box = - format!("ACK not received (single file's data)").into(); - return Err::<(), Box>(e); - } else { - println!("[+] File transfer successfully done"); - vec_buf.clear(); + // Send filedata + match handle_file_reqs( + &mut reader, + &mut writer, + &mut vec_buf, + &alt_fileroot, + &buffersize, + &addr, + ) + .await? + { + None => println!("[FIN] {}: Disconnected", addr), + Some(err_msg) => { + println!("{}", err_msg); + exit(0x0100); } } Ok::<(), Box>(()) }); - - match task.await? { - Ok(_) => continue, - Err(_) => break, - } } Ok(()) } async fn get_metadata( - fileroot: PathBuf, + fileroot: &PathBuf, ) -> Result<(Vec<(String, u64)>, usize), Box> { let mut metadata = Vec::<(String, u64)>::new(); let paths = read_dir(fileroot)?; @@ -202,3 +143,142 @@ async fn get_metadata( Ok((metadata, amount)) } + +async fn handle_metadata( + reader: &mut BufReader>, + writer: &mut BufWriter>, + buf: &mut Vec, + fileroot: &PathBuf, + addr: &SocketAddr, +) -> Result, Box> { + let (metadata_list, file_amount) = get_metadata(fileroot).await?; + + // Terminate if fileroot is empty + if file_amount == 0 { + send_msg(writer, "FIN\n").await?; + return Ok(Some(format!( + "[-] No files inside {:#?}, shutting host down", + fileroot + ))); + } + + // Send metadata amount + send_msg(writer, (file_amount.to_string() + "\n").as_str()).await?; + + // ACK metadata amount + if recv_msg_string(reader, buf).await? != "ACK" { + return Ok(Some(format!( + "[ERROR] {}: No confirmation of metadata amount", + addr + ))); + } + + // Send metadata + for file in &metadata_list { + send_msg(writer, format!("{}:{}\n", file.1, file.0).as_str()).await?; + } + + Ok(None) +} + +fn generate_key() -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(8) + .map(char::from) + .collect::() +} + +async fn send_msg( + writer: &mut BufWriter>, + msg: &str, +) -> Result<(), Box> { + writer.write_all(msg.as_bytes()).await?; + writer.flush().await?; + Ok(()) +} + +async fn recv_msg_string( + reader: &mut BufReader>, + buf: &mut Vec, +) -> Result> { + let bytes_received = reader.read_until(b'\n', buf).await?; + + if bytes_received == 0 { + let e: Box = + format!("No message received or client crashed").into(); + return Err::>(e); + } + + let msg = String::from_utf8(buf.clone())?; + buf.clear(); + + Ok(msg.trim().to_string()) +} + +async fn check_access_key( + reader: &mut BufReader>, + writer: &mut BufWriter>, + buf: &mut Vec, + access_key: &String, +) -> Result> { + if recv_msg_string(reader, buf).await? != *access_key { + send_msg(writer, "FIN\n").await?; + return Ok(false); + } else { + send_msg(writer, "ACK\n").await?; + recv_msg_string(reader, buf).await?; // Might be a bit unnecessary ACK + return Ok(true); + } +} + +async fn handle_file_reqs( + reader: &mut BufReader>, + writer: &mut BufWriter>, + buf: &mut Vec, + fileroot: &PathBuf, + buffersize: &usize, + addr: &SocketAddr, +) -> Result, Box> { + loop { + // Receive filename or termination request + let req = recv_msg_string(reader, buf).await?; + + if req == "FIN" { + break; + } + + let mut input_path = fileroot.clone(); + input_path.push(req); + + println!("\n[REQ] {}: {:#?}", addr, input_path); + let mut file = File::open(input_path.clone()).await?; + let mut remaining_data = file.metadata().await?.len(); + let mut filebuf = vec![0u8; *buffersize]; + + // Serve the file itself + while remaining_data != 0 { + let read_result = file.read(&mut filebuf); + match read_result.await { + Ok(n) => { + writer.write_all(&filebuf).await?; + writer.flush().await?; + remaining_data = remaining_data - n as u64; + } + _ => {} + } + } + + // ACK file + if recv_msg_string(reader, buf).await? != "ACK" { + return Ok(Some(format!( + "[ERROR] {}: No confirmation of file {:#?}", + addr, input_path + ))); + } else { + println!("[ACK] {}: File finished successfully", addr); + } + } + + Ok(None) +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 0cb7259..b8cb29c 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -34,6 +34,7 @@ fn inputless_filesync_test() { 8192usize, true, 5, + true, )) .unwrap(); }); @@ -43,6 +44,7 @@ fn inputless_filesync_test() { block_on(client::connect( String::from("127.0.0.1:8080"), PathBuf::from("./output"), + "test".to_string(), true, )) .unwrap();