Key-based authentication & critical fix to server's async behaviour

* 8 character long alphanumeric key authentication (server-side)
* Modified integration test to skip the previously mentioned key authentication
* Fixed a previously made typo, which caused the server to await until a single
  client's connection was done
* Simplified the structure of client.rs & server.rs a lot, also improved comments
* Confirm client's connection request with SYN
* Version bump, v0.2.0 => v0.3.0
This commit is contained in:
einisto 2022-07-29 03:34:30 +03:00
parent 7810f6b38c
commit 35fb797c95
7 changed files with 507 additions and 295 deletions

2
Cargo.lock generated
View File

@ -111,7 +111,7 @@ checksum = "fb58b6451e8c2a812ad979ed1d83378caa5e927eef2622017a45f251457c2c9d"
[[package]] [[package]]
name = "fragilebyte" name = "fragilebyte"
version = "0.2.0" version = "0.3.0"
dependencies = [ dependencies = [
"clap", "clap",
"local-ip-address", "local-ip-address",

View File

@ -2,7 +2,7 @@
name = "fragilebyte" name = "fragilebyte"
authors = ["Arttu Einistö"] authors = ["Arttu Einistö"]
description = "TCP socket pair for file transfer, backend for https://github.com/einisto/leightbox" description = "TCP socket pair for file transfer, backend for https://github.com/einisto/leightbox"
version = "0.2.0" version = "0.3.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -11,8 +11,8 @@ edition = "2021"
tokio = { version = "1.19.2", features = ["full"] } tokio = { version = "1.19.2", features = ["full"] }
clap = { version = "3.2.8", features = ["derive"] } clap = { version = "3.2.8", features = ["derive"] }
local-ip-address = "0.4.4" local-ip-address = "0.4.4"
rand = "0.8.5"
[dev-dependencies] [dev-dependencies]
tokio-test = "0.4.2" tokio-test = "0.4.2"
rand = "0.8.5"
ntest = "0.8.1" ntest = "0.8.1"

View File

@ -1,18 +1,18 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<!-- Generated by graphviz version 4.0.0 (0) <!-- Generated by graphviz version 5.0.0 (0)
--> -->
<!-- Pages: 1 --> <!-- Pages: 1 -->
<svg width="338pt" height="116pt" <svg width="421pt" height="116pt"
viewBox="0.00 0.00 337.69 116.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"> viewBox="0.00 0.00 420.69 116.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 112)"> <g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 112)">
<polygon fill="white" stroke="transparent" points="-4,4 -4,-112 333.69,-112 333.69,4 -4,4"/> <polygon fill="white" stroke="transparent" points="-4,4 -4,-112 416.69,-112 416.69,4 -4,4"/>
<!-- fragilebyte --> <!-- fragilebyte -->
<g id="node1" class="node"> <g id="node1" class="node">
<title>fragilebyte</title> <title>fragilebyte</title>
<ellipse fill="none" stroke="black" cx="160.9" cy="-90" rx="61.99" ry="18"/> <ellipse fill="none" stroke="black" cx="227.9" cy="-90" rx="61.99" ry="18"/>
<text text-anchor="middle" x="160.9" y="-86.3" font-family="Times,serif" font-size="14.00">fragilebyte</text> <text text-anchor="middle" x="227.9" y="-86.3" font-family="Times,serif" font-size="14.00">fragilebyte</text>
</g> </g>
<!-- clap --> <!-- clap -->
<g id="node2" class="node"> <g id="node2" class="node">
@ -23,8 +23,8 @@
<!-- fragilebyte&#45;&gt;clap --> <!-- fragilebyte&#45;&gt;clap -->
<g id="edge1" class="edge"> <g id="edge1" class="edge">
<title>fragilebyte&#45;&gt;clap</title> <title>fragilebyte&#45;&gt;clap</title>
<path fill="none" stroke="black" d="M132.47,-73.81C111.27,-62.48 82.34,-47.02 60.61,-35.41"/> <path fill="none" stroke="black" d="M187.02,-76.48C154.7,-66.32 108.56,-51.22 68.9,-36 66.82,-35.2 64.7,-34.36 62.57,-33.49"/>
<polygon fill="black" stroke="black" points="62.25,-32.32 51.78,-30.69 58.95,-38.49 62.25,-32.32"/> <polygon fill="black" stroke="black" points="63.84,-30.23 53.27,-29.59 61.13,-36.69 63.84,-30.23"/>
</g> </g>
<!-- local&#45;ip&#45;address --> <!-- local&#45;ip&#45;address -->
<g id="node3" class="node"> <g id="node3" class="node">
@ -35,20 +35,32 @@
<!-- fragilebyte&#45;&gt;local&#45;ip&#45;address --> <!-- fragilebyte&#45;&gt;local&#45;ip&#45;address -->
<g id="edge2" class="edge"> <g id="edge2" class="edge">
<title>fragilebyte&#45;&gt;local&#45;ip&#45;address</title> <title>fragilebyte&#45;&gt;local&#45;ip&#45;address</title>
<path fill="none" stroke="black" d="M160.9,-71.7C160.9,-63.98 160.9,-54.71 160.9,-46.11"/> <path fill="none" stroke="black" d="M212.02,-72.41C203.71,-63.73 193.39,-52.95 184.2,-43.34"/>
<polygon fill="black" stroke="black" points="164.4,-46.1 160.9,-36.1 157.4,-46.1 164.4,-46.1"/> <polygon fill="black" stroke="black" points="186.57,-40.76 177.13,-35.96 181.51,-45.6 186.57,-40.76"/>
</g>
<!-- rand -->
<g id="node4" class="node">
<title>rand</title>
<ellipse fill="none" stroke="black" cx="294.9" cy="-18" rx="32.49" ry="18"/>
<text text-anchor="middle" x="294.9" y="-14.3" font-family="Times,serif" font-size="14.00">rand</text>
</g>
<!-- fragilebyte&#45;&gt;rand -->
<g id="edge3" class="edge">
<title>fragilebyte&#45;&gt;rand</title>
<path fill="none" stroke="black" d="M243.78,-72.41C252.53,-63.26 263.54,-51.76 273.09,-41.78"/>
<polygon fill="black" stroke="black" points="275.67,-44.16 280.05,-34.51 270.61,-39.31 275.67,-44.16"/>
</g> </g>
<!-- tokio --> <!-- tokio -->
<g id="node4" class="node"> <g id="node5" class="node">
<title>tokio</title> <title>tokio</title>
<ellipse fill="none" stroke="black" cx="295.9" cy="-18" rx="33.6" ry="18"/> <ellipse fill="none" stroke="black" cx="378.9" cy="-18" rx="33.6" ry="18"/>
<text text-anchor="middle" x="295.9" y="-14.3" font-family="Times,serif" font-size="14.00">tokio</text> <text text-anchor="middle" x="378.9" y="-14.3" font-family="Times,serif" font-size="14.00">tokio</text>
</g> </g>
<!-- fragilebyte&#45;&gt;tokio --> <!-- fragilebyte&#45;&gt;tokio -->
<g id="edge3" class="edge"> <g id="edge4" class="edge">
<title>fragilebyte&#45;&gt;tokio</title> <title>fragilebyte&#45;&gt;tokio</title>
<path fill="none" stroke="black" d="M189.86,-73.98C211.42,-62.81 240.86,-47.54 263.25,-35.93"/> <path fill="none" stroke="black" d="M259.55,-74.33C284.55,-62.74 319.36,-46.6 344.94,-34.74"/>
<polygon fill="black" stroke="black" points="265.11,-38.91 272.37,-31.2 261.89,-32.69 265.11,-38.91"/> <polygon fill="black" stroke="black" points="346.65,-37.81 354.25,-30.43 343.71,-31.46 346.65,-37.81"/>
</g> </g>
</g> </g>
</svg> </svg>

Before

Width:  |  Height:  |  Size: 2.5 KiB

After

Width:  |  Height:  |  Size: 3.1 KiB

View File

@ -11,13 +11,17 @@ use std::{
use tokio::{ use tokio::{
fs::File, fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpStream, net::{
tcp::{ReadHalf, WriteHalf},
TcpStream,
},
time::sleep, time::sleep,
}; };
pub async fn connect( pub async fn connect(
addr: String, addr: String,
fileroot: PathBuf, fileroot: PathBuf,
access_key: String,
download_all: bool, download_all: bool,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel(); let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();
@ -32,174 +36,60 @@ pub async fn connect(
let mut buf = Vec::new(); let mut buf = Vec::new();
loop { // Authenticate connection
let bytes_read = reader.read_buf(&mut buf).await?; match authenticate_connection(&mut reader, &mut writer, &mut buf, &access_key).await? {
if bytes_read == 0 { None => println!("[+] Connection authenticated successfully"),
println!("[-] No more bytes received, closing connection"); Some(err_msg) => {
break; println!("{}", err_msg);
exit(0x0100);
} }
// Receive buffersize
let buffersize = String::from_utf8(buf.clone())?.parse::<usize>()?;
println!("[+] Selected buffersize: {}", buffersize);
buf.clear();
// ACK buffersize
writer.write_all(b"ACK\n").await.unwrap();
writer.flush().await?;
// Receive file amount (or termination request if the server does not have any files available)
let file_amount: usize;
let _bytes_read = reader.read_until(b'\n', &mut buf).await?;
let msg = String::from_utf8(buf.clone())?;
if msg.trim() == "FIN" {
println!("[-] Server does not have any files available, closing connection");
writer.write_all(b"FIN\n").await?;
writer.flush().await?;
break;
} else {
file_amount = msg.trim().parse::<usize>()?;
println!("[+] Total of {} files available", file_amount);
buf.clear();
// ACK file amount
writer.write_all(b"ACK\n").await?;
writer.flush().await?;
}
// Receive file metadata
println!("[+] Receiving file metadata");
let mut metadata = HashMap::new();
while metadata.len() < file_amount {
reader.read_until(b'\n', &mut buf).await?;
let msg = String::from_utf8(buf.clone())?;
buf.clear();
// Parse 'filesize:filename'
let split = msg.split(":").collect::<Vec<&str>>();
let filesize = split[0].trim().parse::<u64>()?;
let filename = split[1].trim().to_string();
metadata.insert(filename, filesize);
}
println!("[INFO] Metadata: {:?}", metadata);
// Send request for each file by filename
println!("[+] Requesting files individually\n");
let filenames = metadata.keys().collect::<Vec<&String>>();
let mut filenames_iter = filenames.iter();
let mut input = String::new();
loop {
input.clear();
if download_all {
match filenames_iter.next() {
Some(filename) => {
input.push_str(filename);
}
None => input.push_str("DISCONNECT"),
}
} else {
// Blocks the current thread until a message is readable
// Requests (messages) get queued if they can't be served immediately
let msg = rx.recv()?;
input.push_str(msg.trim());
}
if input == "DISCONNECT" {
break;
} else if !metadata.contains_key(input.as_str()) {
println!("[-] No file named '{}' available\n", input);
continue;
}
// Handle request based on input received from channel
println!("[+] Requesting file named '{}'", input);
let msg = input.to_string() + "\n";
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
// Create file locally
let mut output_path = fileroot.clone();
output_path.push(input.clone());
let output_file = File::create(output_path.clone()).await?;
println!("[+] New file: {:#?}", output_path);
let mut file_buf = BufWriter::new(output_file);
// Receive the file itself
let filesize = metadata.get(input.as_str()).unwrap().clone();
let mut remaining_data = filesize;
let mut buf = vec![0u8; buffersize];
while remaining_data != 0 {
if remaining_data >= buffersize as u64 {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(0) => {
println!("[-] Waiting for data to become available...");
sleep(Duration::from_secs(5)).await;
continue;
}
Ok(n) => {
file_buf.write_all(&mut buf).await?;
file_buf.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
} else {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(_) => {
let mut buf_slice = &buf[0..(remaining_data as usize)];
file_buf.write_all(&mut buf_slice).await?;
file_buf.flush().await?;
remaining_data = 0;
}
_ => {}
}
}
}
// ACK file
writer.write_all(b"ACK\n").await?;
writer.flush().await?;
println!(
"[+] Successfully wrote {} bytes to {:#?}\n",
filesize, output_path
);
}
println!("[+] Requesting connection termination");
writer.write_all(b"FIN\n").await?;
writer.flush().await?;
} }
// Receive buffersize
let buffersize = recv_msg_string(&mut reader, &mut buf)
.await?
.parse::<usize>()?;
println!("[+] Selected buffersize: {}", buffersize);
// ACK buffersize
send_msg(&mut writer, "ACK\n").await?;
// Receive metadata
let metadata = match receive_metadata(&mut reader, &mut writer, &mut buf).await? {
Some(metadata) => metadata,
None => exit(0x0100),
};
println!("[+] Received metadata: {:#?}", metadata);
// Send request for each file by filename
println!("[+] [<Filename> + Enter] to make a request\n");
handle_file_reqs(
&mut reader,
&mut writer,
rx,
&buffersize,
&metadata,
&fileroot,
&download_all,
)
.await?;
// Terminating connection
println!("[+] Requesting connection termination");
writer.write_all(b"FIN\n").await?;
writer.flush().await?;
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(()) Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}); });
// Separate thread for blocking stdin // Separate thread for blocking stdin
let input_task = thread::spawn(move || { let input_task = thread::spawn(move || handle_stdin(tx));
let mut input_string = String::new();
while input_string.trim() != "DISCONNECT" {
input_string.clear();
stdin().read_line(&mut input_string)?;
print!("\n");
tx.send(input_string.clone())?;
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
match connection_task.join().unwrap().await { match connection_task.join().unwrap().await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
eprintln!("[-] Error inside connection thread: {}", e); eprintln!("[ERROR] Error inside connection thread: {}", e);
exit(0x0100); exit(0x0100);
} }
} }
@ -208,7 +98,7 @@ pub async fn connect(
match input_task.join().unwrap() { match input_task.join().unwrap() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
eprintln!("[-] Error inside input thread: {}", e); eprintln!("[ERROR] Error inside input thread: {}", e);
exit(0x0100); exit(0x0100);
} }
} }
@ -216,3 +106,220 @@ pub async fn connect(
Ok(()) Ok(())
} }
async fn send_msg(
writer: &mut BufWriter<WriteHalf<'_>>,
msg: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
async fn recv_msg_string(
reader: &mut BufReader<ReadHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<String, Box<dyn Error + Send + Sync>> {
let bytes_received = reader.read_until(b'\n', buf).await?;
if bytes_received == 0 {
let e: Box<dyn Error + Send + Sync> =
format!("No message received or server crashed").into();
return Err::<String, Box<dyn Error + Send + Sync>>(e);
}
let msg = String::from_utf8(buf.clone())?;
buf.clear();
Ok(msg.trim().to_string())
}
fn handle_stdin(tx: Sender<String>) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut input_string = String::new();
while input_string.trim() != "DISCONNECT" {
input_string.clear();
stdin().read_line(&mut input_string)?;
print!("\n");
tx.send(input_string.clone())?;
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
async fn authenticate_connection(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
access_key: &String,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
// Receive ACK to indicate ready-to-receive status
if recv_msg_string(reader, buf).await? != "SYN" {
return Ok(Some(
"[-] Server is not ready to receive access key, terminating connection".to_string(),
));
}
// Send access key
send_msg(writer, (access_key.to_string() + "\n").as_str()).await?;
// Terminate connection if key is invalid
if recv_msg_string(reader, buf).await? == "FIN" {
return Ok(Some(
"[-] Incorrect access key, terminating connection".to_string(),
));
} else {
send_msg(writer, "ACK\n").await?;
Ok(None)
}
}
async fn receive_metadata(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<Option<HashMap<String, u64>>, Box<dyn Error + Send + Sync>> {
// Receive file amount or terminate if no files available
let msg = recv_msg_string(reader, buf).await?;
if msg == "FIN" {
println!("[-] Server does not have any files available, closing connection");
return Ok(None);
}
let file_amount = msg.parse::<usize>()?;
println!("[+] Total of {} files available", file_amount);
// ACK file amount
send_msg(writer, "ACK\n").await?;
// Receive file metadata
let mut metadata = HashMap::new();
while metadata.len() < file_amount {
let msg = recv_msg_string(reader, buf).await?;
// Parse 'filesize:filename'
let split = msg.split(":").collect::<Vec<&str>>();
let filesize = split[0].trim().parse::<u64>()?;
let filename = split[1].trim().to_string();
metadata.insert(filename, filesize);
}
Ok(Some(metadata))
}
async fn create_filehandle(
fileroot: &PathBuf,
filename: &String,
) -> Result<(BufWriter<File>, PathBuf), Box<dyn Error + Send + Sync>> {
let mut output_path = fileroot.clone();
output_path.push(&filename);
let output_file = File::create(output_path.clone()).await?;
println!("[+] New file: {:#?}", output_path);
Ok((BufWriter::new(output_file), output_path))
}
async fn handle_file_reqs(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
rx: Receiver<String>,
buffersize: &usize,
metadata: &HashMap<String, u64>,
fileroot: &PathBuf,
download_all: &bool,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let filenames = metadata.keys().collect::<Vec<&String>>();
let mut filenames_iter = filenames.iter();
let mut input_string = String::new();
loop {
input_string.clear();
if *download_all {
match filenames_iter.next() {
Some(filename) => {
input_string.push_str(filename);
}
None => input_string.push_str("DISCONNECT"),
}
} else {
// Blocks the current thread until a message is readable
// Requests (messages) get queued if they can't be served immediately
let msg = rx.recv()?;
input_string.push_str(msg.trim());
}
// Terminate connection on request
if input_string == "DISCONNECT" {
break;
} else if !metadata.contains_key(input_string.as_str()) {
println!("[-] No file named '{}' available\n", input_string);
continue;
}
// Handle request based on input received from channel
println!("[+] Requesting file named '{}'", input_string);
send_msg(writer, (input_string.to_string() + "\n").as_str()).await?;
// Create file locally
let (mut file_buf, output_path) = create_filehandle(&fileroot, &input_string).await?;
// Receive the file itself
let filesize = metadata.get(input_string.as_str()).unwrap().clone();
receive_file(reader, &mut file_buf, &filesize, buffersize).await?;
// ACK file
send_msg(writer, "ACK\n").await?;
println!(
"[+] Successfully wrote {} bytes to {:#?}\n",
filesize, output_path
);
}
Ok(())
}
async fn receive_file(
reader: &mut BufReader<ReadHalf<'_>>,
file_buf: &mut BufWriter<File>,
filesize: &u64,
buffersize: &usize,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut remaining_data = *filesize;
let mut buf = vec![0u8; *buffersize];
while remaining_data != 0 {
if remaining_data >= *buffersize as u64 {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(0) => {
println!("[-] Connection lost, trying again until [Ctrl + V]...");
sleep(Duration::from_secs(5)).await;
continue;
}
Ok(n) => {
file_buf.write_all(&mut buf).await?;
file_buf.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
} else {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(_) => {
let mut buf_slice = &buf[0..(remaining_data as usize)];
file_buf.write_all(&mut buf_slice).await?;
file_buf.flush().await?;
remaining_data = 0;
}
_ => {}
}
}
}
Ok(())
}

View File

@ -1,6 +1,6 @@
use clap::Parser; use clap::Parser;
use fragilebyte::{client, server}; use fragilebyte::{client, server};
use std::{path::PathBuf, str::FromStr}; use std::{error::Error, path::PathBuf, process::exit, str::FromStr};
use tokio; use tokio;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -9,6 +9,9 @@ struct Args {
#[clap(short = 't', long, value_parser)] #[clap(short = 't', long, value_parser)]
/// Server's address when connecting as a client /// Server's address when connecting as a client
target: Option<String>, target: Option<String>,
#[clap(short = 'k', long, value_parser)]
/// Alphanumeric 8 characters long key required to establish a connection to the host
key: Option<String>,
#[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_arg::<u16>)] #[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_arg::<u16>)]
/// Port where the service is hosted /// Port where the service is hosted
port: u16, port: u16,
@ -31,7 +34,7 @@ struct Args {
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse(); let args = Args::parse();
match args.target { match args.target {
@ -41,8 +44,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(n) => n, Some(n) => n,
None => PathBuf::from("./output"), None => PathBuf::from("./output"),
}; };
let access_key = match args.key {
Some(n) => n,
None => {
eprintln!("[-] Access key required as a client, please try again");
exit(0x0100);
}
};
client::connect(addr, fileroot, args.all) client::connect(addr, fileroot, access_key, args.all)
.await .await
.expect("Error initializing client"); .expect("Error initializing client");
} }
@ -59,6 +69,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
args.buffersize, args.buffersize,
args.localhost, args.localhost,
args.timeout, args.timeout,
false,
) )
.await .await
.expect("Error initializing server"); .expect("Error initializing server");

View File

@ -1,9 +1,11 @@
use local_ip_address::local_ip; use local_ip_address::local_ip;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{ use std::{
error::Error, error::Error,
fs::read_dir, fs::read_dir,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
path::PathBuf, path::PathBuf,
process::exit,
str::FromStr, str::FromStr,
time::Duration, time::Duration,
}; };
@ -11,7 +13,10 @@ use tokio::{
self, self,
fs::File, fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpListener, net::{
tcp::{ReadHalf, WriteHalf},
TcpListener,
},
time::timeout, time::timeout,
}; };
@ -21,17 +26,26 @@ pub async fn listen(
buffersize: usize, buffersize: usize,
localhost: bool, localhost: bool,
timeout_duration: u64, timeout_duration: u64,
use_testing_key: bool,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let addr = match localhost { let addr = match localhost {
true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port), true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port),
false => SocketAddr::new(local_ip()?, port), false => SocketAddr::new(local_ip()?, port),
}; };
// Use weak access key for integration testing, otherwise 8 char alphanumeric
let access_key = match use_testing_key {
true => "test".to_string(),
false => generate_key(),
};
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
println!("[+] Listening on {}", addr); println!("[+] Listening on {}", addr);
println!("[+] Access key: {}", access_key);
loop { loop {
// The first loop iteration would take the ownership without cloning
let alt_fileroot = fileroot.clone(); let alt_fileroot = fileroot.clone();
let alt_access_key = access_key.clone();
let (mut socket, addr) = let (mut socket, addr) =
match timeout(Duration::from_secs(timeout_duration), listener.accept()).await { match timeout(Duration::from_secs(timeout_duration), listener.accept()).await {
@ -44,144 +58,71 @@ pub async fn listen(
break; break;
} }
}; };
println!("\n[+] New client: {}", addr);
let task = tokio::spawn(async move { println!("\n[NEW] {}: Connected", addr);
tokio::spawn(async move {
let (reader, writer) = socket.split(); let (reader, writer) = socket.split();
let mut reader = BufReader::new(reader); let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer); let mut writer = BufWriter::new(writer);
let mut vec_buf = Vec::new(); let mut vec_buf = Vec::new();
// ACK ready-to-receive status
send_msg(&mut writer, "SYN\n").await?;
// Check access key
if !check_access_key(&mut reader, &mut writer, &mut vec_buf, &alt_access_key).await? {
println!("[FIN] {}: Incorrect access key", addr);
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
// Send buffersize // Send buffersize
writer.write_all(buffersize.to_string().as_bytes()).await?; send_msg(&mut writer, (buffersize.to_string() + "\n").as_str()).await?;
writer.flush().await?;
// Read ACK // ACK buffersize
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; if recv_msg_string(&mut reader, &mut vec_buf).await? != "ACK" {
let msg = String::from_utf8(vec_buf.clone())?; return Ok::<(), Box<dyn Error + Send + Sync>>(());
if msg.trim() != "ACK" {
let e: Box<dyn Error + Send + Sync> =
format!("ACK not received (buffersize)").into();
return Err::<(), Box<dyn Error + Send + Sync>>(e);
} else {
vec_buf.clear();
} }
let (metadata_list, file_amount) = get_metadata(alt_fileroot.clone()).await?; // Send metadata
if file_amount == 0 { match handle_metadata(&mut reader, &mut writer, &mut vec_buf, &alt_fileroot, &addr)
println!( .await?
"[-] No files available inside fileroot {:#?}, shutting down", {
alt_fileroot None => println!("[DATA] {}: Ready to serve files", addr),
); Some(err_msg) => {
// No files available, request connection termination println!("{}", err_msg);
writer.write_all(b"FIN\n").await?; exit(0x0100);
writer.flush().await?;
// Read FIN
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
let msg = String::from_utf8(vec_buf.clone())?;
if msg.trim() != "FIN" {
let e: Box<dyn Error + Send + Sync> =
format!("ACK not received (server-side termination)").into();
return Err::<(), Box<dyn Error + Send + Sync>>(e);
} else {
// Empty error as error's reason is already logged with println
let e: Box<dyn Error + Send + Sync> = "".into();
return Err::<(), Box<dyn Error + Send + Sync>>(e);
} }
} else {
// Send file amount
let msg = file_amount.to_string() + "\n";
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
} }
// Read ACK // Send filedata
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?; match handle_file_reqs(
let msg = String::from_utf8(vec_buf.clone())?; &mut reader,
if msg.trim() != "ACK" { &mut writer,
let e: Box<dyn Error + Send + Sync> = &mut vec_buf,
format!("ACK not received (file amount)").into(); &alt_fileroot,
return Err::<(), Box<dyn Error + Send + Sync>>(e); &buffersize,
} else { &addr,
vec_buf.clear(); )
} .await?
{
// Send file metadata None => println!("[FIN] {}: Disconnected", addr),
for file in &metadata_list { Some(err_msg) => {
// Newline as delimiter between instances println!("{}", err_msg);
let msg = format!("{}:{}\n", file.1, file.0); exit(0x0100);
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
}
// Handle file request(s)
println!("[+] Ready to serve files");
loop {
let bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
if bytes_read == 0 {
println!("[-] File request never received or client crashed");
vec_buf.clear();
break;
} else {
let msg = String::from_utf8(vec_buf.clone())?;
vec_buf.clear();
if msg.trim() == "FIN" {
println!("[+] FIN received, terminating connection with {}", addr);
break;
}
let mut input_path = alt_fileroot.clone();
input_path.push(msg.trim());
println!("\n[+] File requested: {:#?}", input_path);
let mut file = File::open(input_path.clone()).await?;
let mut remaining_data = file.metadata().await?.len();
let mut filebuf = vec![0u8; buffersize];
while remaining_data != 0 {
let read_result = file.read(&mut filebuf);
match read_result.await {
Ok(n) => {
writer.write_all(&filebuf).await?;
writer.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
}
}
// Read ACK
let _bytes_read = reader.read_until(b'\n', &mut vec_buf).await?;
let msg = String::from_utf8(vec_buf.clone())?;
if msg.trim() != "ACK" {
let e: Box<dyn Error + Send + Sync> =
format!("ACK not received (single file's data)").into();
return Err::<(), Box<dyn Error + Send + Sync>>(e);
} else {
println!("[+] File transfer successfully done");
vec_buf.clear();
} }
} }
Ok::<(), Box<dyn Error + Send + Sync>>(()) Ok::<(), Box<dyn Error + Send + Sync>>(())
}); });
match task.await? {
Ok(_) => continue,
Err(_) => break,
}
} }
Ok(()) Ok(())
} }
async fn get_metadata( async fn get_metadata(
fileroot: PathBuf, fileroot: &PathBuf,
) -> Result<(Vec<(String, u64)>, usize), Box<dyn Error + Send + Sync>> { ) -> Result<(Vec<(String, u64)>, usize), Box<dyn Error + Send + Sync>> {
let mut metadata = Vec::<(String, u64)>::new(); let mut metadata = Vec::<(String, u64)>::new();
let paths = read_dir(fileroot)?; let paths = read_dir(fileroot)?;
@ -202,3 +143,142 @@ async fn get_metadata(
Ok((metadata, amount)) Ok((metadata, amount))
} }
async fn handle_metadata(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
fileroot: &PathBuf,
addr: &SocketAddr,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
let (metadata_list, file_amount) = get_metadata(fileroot).await?;
// Terminate if fileroot is empty
if file_amount == 0 {
send_msg(writer, "FIN\n").await?;
return Ok(Some(format!(
"[-] No files inside {:#?}, shutting host down",
fileroot
)));
}
// Send metadata amount
send_msg(writer, (file_amount.to_string() + "\n").as_str()).await?;
// ACK metadata amount
if recv_msg_string(reader, buf).await? != "ACK" {
return Ok(Some(format!(
"[ERROR] {}: No confirmation of metadata amount",
addr
)));
}
// Send metadata
for file in &metadata_list {
send_msg(writer, format!("{}:{}\n", file.1, file.0).as_str()).await?;
}
Ok(None)
}
fn generate_key() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect::<String>()
}
async fn send_msg(
writer: &mut BufWriter<WriteHalf<'_>>,
msg: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
async fn recv_msg_string(
reader: &mut BufReader<ReadHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<String, Box<dyn Error + Send + Sync>> {
let bytes_received = reader.read_until(b'\n', buf).await?;
if bytes_received == 0 {
let e: Box<dyn Error + Send + Sync> =
format!("No message received or client crashed").into();
return Err::<String, Box<dyn Error + Send + Sync>>(e);
}
let msg = String::from_utf8(buf.clone())?;
buf.clear();
Ok(msg.trim().to_string())
}
async fn check_access_key(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
access_key: &String,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
if recv_msg_string(reader, buf).await? != *access_key {
send_msg(writer, "FIN\n").await?;
return Ok(false);
} else {
send_msg(writer, "ACK\n").await?;
recv_msg_string(reader, buf).await?; // Might be a bit unnecessary ACK
return Ok(true);
}
}
async fn handle_file_reqs(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
fileroot: &PathBuf,
buffersize: &usize,
addr: &SocketAddr,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
loop {
// Receive filename or termination request
let req = recv_msg_string(reader, buf).await?;
if req == "FIN" {
break;
}
let mut input_path = fileroot.clone();
input_path.push(req);
println!("\n[REQ] {}: {:#?}", addr, input_path);
let mut file = File::open(input_path.clone()).await?;
let mut remaining_data = file.metadata().await?.len();
let mut filebuf = vec![0u8; *buffersize];
// Serve the file itself
while remaining_data != 0 {
let read_result = file.read(&mut filebuf);
match read_result.await {
Ok(n) => {
writer.write_all(&filebuf).await?;
writer.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
}
// ACK file
if recv_msg_string(reader, buf).await? != "ACK" {
return Ok(Some(format!(
"[ERROR] {}: No confirmation of file {:#?}",
addr, input_path
)));
} else {
println!("[ACK] {}: File finished successfully", addr);
}
}
Ok(None)
}

View File

@ -34,6 +34,7 @@ fn inputless_filesync_test() {
8192usize, 8192usize,
true, true,
5, 5,
true,
)) ))
.unwrap(); .unwrap();
}); });
@ -43,6 +44,7 @@ fn inputless_filesync_test() {
block_on(client::connect( block_on(client::connect(
String::from("127.0.0.1:8080"), String::from("127.0.0.1:8080"),
PathBuf::from("./output"), PathBuf::from("./output"),
"test".to_string(),
true, true,
)) ))
.unwrap(); .unwrap();