Single binary structure & integration tests

* Moved server & client into single binary
* Integration tests
* Server timeout
* General argument parsing
* Version bump
This commit is contained in:
einisto 2022-07-15 16:18:03 +03:00
parent 32ed133e21
commit 44ef3d8db1
7 changed files with 411 additions and 133 deletions

169
Cargo.lock generated
View File

@ -2,6 +2,27 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "async-stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -90,11 +111,31 @@ checksum = "fb58b6451e8c2a812ad979ed1d83378caa5e927eef2622017a45f251457c2c9d"
[[package]]
name = "fragilebyte"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"clap",
"local-ip-address",
"ntest",
"rand",
"tokio",
"tokio-test",
]
[[package]]
name = "futures-core"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "getrandom"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
@ -200,6 +241,47 @@ dependencies = [
"libc",
]
[[package]]
name = "ntest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e865500b46e35210765d62d549178c520badc018b2a71a827c29b305d680d1fb"
dependencies = [
"ntest_proc_macro_helper",
"ntest_test_cases",
"ntest_timeout",
]
[[package]]
name = "ntest_proc_macro_helper"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0e328d267a679d683b55222b3d06c2fb7358220857945bfc4e65a6b531e9994"
[[package]]
name = "ntest_test_cases"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f7caf063242bb66721e74515dc01a915901063fa1f994bee7a2b9136f13370e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "ntest_timeout"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bca6eaadc7c104fb2eb0c6d14782b9e33775aaf5584c3bcb0f87c89e3e6d6c07"
dependencies = [
"ntest_proc_macro_helper",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num_cpus"
version = "1.13.1"
@ -251,6 +333,22 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-crate"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a"
dependencies = [
"thiserror",
"toml",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -293,6 +391,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.13"
@ -308,6 +436,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -416,6 +550,39 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "toml"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7"
dependencies = [
"serde",
]
[[package]]
name = "unicode-ident"
version = "1.0.1"

View File

@ -1,19 +1,18 @@
[package]
name = "fragilebyte"
author = "Arttu Einistö"
about = "TCP socket pair for file transfer, backend for https://github.com/einisto/leightbox"
version = "0.1.0"
authors = ["Arttu Einistö"]
description = "TCP socket pair for file transfer, backend for https://github.com/einisto/leightbox"
version = "0.1.1"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "server"
[[bin]]
name = "client"
[dependencies]
tokio = { version = "1.19.2", features = ["full"] }
clap = { version = "3.2.8", features = ["derive"] }
local-ip-address = "0.4.4"
[dev-dependencies]
tokio-test = "0.4.2"
rand = "0.8.5"
ntest = "0.8.1"

View File

@ -1,5 +1,4 @@
use clap::Parser;
use std::time::Duration;
use std::{path::PathBuf, time::Duration};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
@ -7,25 +6,9 @@ use tokio::{
time::sleep,
};
// TODO: Remove panics/unwraps & add proper error handling
#[derive(Debug, Parser)]
#[clap(author, about, version)]
struct Args {
#[clap(short = 't', long, value_parser)]
target: String,
#[clap(default_value = "./output/", short = 'f', long, value_parser)]
fileroot: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let addr = args.target;
let fileroot = args.fileroot;
let mut stream = TcpStream::connect(addr.clone()).await?;
pub async fn connect(addr: String, fileroot: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
println!("[+] Connecting to {}", addr);
let mut stream = TcpStream::connect(addr.clone()).await?;
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
@ -34,48 +17,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut buf = Vec::new();
loop {
let bytes_read = reader.read_buf(&mut buf).await.unwrap();
let bytes_read = reader.read_buf(&mut buf).await?;
if bytes_read == 0 {
println!("[-] No more bytes received, closing connection");
break;
}
// Receive buffersize
let buffersize = String::from_utf8(buf.clone())
.unwrap()
.parse::<usize>()
.unwrap();
let buffersize = String::from_utf8(buf.clone())?.parse::<usize>()?;
println!("[+] Selected buffersize: {}", buffersize);
buf.clear();
// ACK buffersize
writer.write_all(b"ACK").await.unwrap();
writer.flush().await.unwrap();
writer.flush().await?;
// Receive file amount
let _bytes_read = reader.read_buf(&mut buf).await.unwrap();
let file_amount = String::from_utf8(buf.clone())
.unwrap()
.parse::<usize>()
.unwrap();
let _bytes_read = reader.read_buf(&mut buf).await?;
let file_amount = String::from_utf8(buf.clone())?.parse::<usize>()?;
println!("[+] Total of {} files available", file_amount);
buf.clear();
// ACK file amount
writer.write_all(b"ACK").await.unwrap();
writer.flush().await.unwrap();
writer.write_all(b"ACK").await?;
writer.flush().await?;
// Receive file metadata
println!("[+] Receiving file metadata");
let mut metadata = Vec::<(String, u64)>::new();
while metadata.len() < file_amount {
reader.read_until(b'\n', &mut buf).await.unwrap();
let msg = String::from_utf8(buf.clone()).unwrap();
reader.read_until(b'\n', &mut buf).await?;
let msg = String::from_utf8(buf.clone())?;
buf.clear();
// Parse 'filesize:filename'
let split = msg.split(":").collect::<Vec<&str>>();
let filesize = split[0].trim().parse::<u64>().unwrap();
let filesize = split[0].trim().parse::<u64>()?;
let filename = split[1].trim().to_string();
metadata.push((filename, filesize));
@ -83,17 +60,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("[INFO] Metadata: {:?}", metadata);
// Send request for each file by filename
println!("[+] Requesting files individually"); // TODO: Choose files based on input
// TODO: Choose files based on input
println!("[+] Requesting files individually");
for file in &metadata {
println!("[INFO] Current request: [{:?}]", file);
writer.write_all(file.0.as_bytes()).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(file.0.as_bytes()).await?;
writer.flush().await?;
// Create file locally
let output_path = fileroot.clone() + file.0.as_str();
let mut output_path = fileroot.clone();
output_path.push(file.0.clone());
let output_file = File::create(output_path.clone()).await.unwrap();
println!("[+] New file: {}", output_path);
let output_file = File::create(output_path.clone()).await?;
println!("[+] New file: {:#?}", output_path);
let mut file_buf = BufWriter::new(output_file);
// Receive the file itself
@ -111,8 +90,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
continue;
}
Ok(n) => {
file_buf.write_all(&mut buf).await.unwrap();
file_buf.flush().await.unwrap();
file_buf.write_all(&mut buf).await?;
file_buf.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
@ -123,8 +102,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match read_result.await {
Ok(_) => {
let mut buf_slice = &buf[0..(remaining_data as usize)];
file_buf.write_all(&mut buf_slice).await.unwrap();
file_buf.flush().await.unwrap();
file_buf.write_all(&mut buf_slice).await?;
file_buf.flush().await?;
remaining_data = 0;
}
_ => {}
@ -133,17 +112,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// ACK file
writer.write_all(b"ACK").await.unwrap();
writer.flush().await.unwrap();
writer.write_all(b"ACK").await?;
writer.flush().await?;
println!(
"[+] Successfully wrote {} bytes to {}\n",
"[+] Successfully wrote {} bytes to {:#?}\n",
file.1, output_path
);
}
println!("[+] All files finished, requesting connection termination");
writer.write_all(b"FIN").await.unwrap();
writer.flush().await.unwrap();
writer.write_all(b"FIN").await?;
writer.flush().await?;
}
Ok(())

2
src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod client;
pub mod server;

73
src/main.rs Normal file
View File

@ -0,0 +1,73 @@
use clap::Parser;
use fragilebyte::{client, server};
use std::{path::PathBuf, str::FromStr};
use tokio;
#[derive(Parser, Debug)]
#[clap(author, about, version, long_about = None)]
struct Args {
#[clap(short = 't', long, value_parser)]
/// Server's address when connecting as a client
target: Option<String>,
#[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_arg::<u16>)]
/// Port where the service is hosted
port: u16,
#[clap(default_value_t = 8192usize, short = 'b', long, value_parser = validate_arg::<usize>)]
/// Buffersize used in the file transfer (bytes)
buffersize: usize,
#[clap(default_value_t = false, long, action)]
/// Run only in the local network
localhost: bool,
#[clap(default_value_t = 30, long, value_parser = validate_arg::<u64>)]
/// Seconds of inactivity after which the server closes itself
timeout: u64,
#[clap(short = 'f', long, value_parser)]
/// Path to the folder where the files are outputted as a client or
/// served from as a server [default: './output' / './data']
fileroot: Option<PathBuf>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
match args.target {
Some(addr) => {
// Client
let fileroot = match args.fileroot {
Some(n) => n,
None => PathBuf::from("./output"),
};
client::connect(addr, fileroot)
.await
.expect("Error initializing client");
}
None => {
// Server
let fileroot = match args.fileroot {
Some(n) => n,
None => PathBuf::from("./data"),
};
server::listen(
args.port,
fileroot,
args.buffersize,
args.localhost,
args.timeout,
)
.await
.expect("Error initializing server");
}
}
Ok(())
}
fn validate_arg<T: FromStr>(value: &str) -> Result<T, String> {
match value.parse::<T>() {
Ok(n) => Ok(n),
Err(_) => Err(format!("Invalid argument: {}", value)),
}
}

View File

@ -1,49 +1,45 @@
use clap::Parser;
use local_ip_address::local_ip;
use std::{
fs::read_dir,
net::{IpAddr, SocketAddr},
path::PathBuf,
str::FromStr,
time::Duration,
};
use tokio::{
self,
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpListener,
time::timeout,
};
// TODO: Remove panics/unwraps & add proper error handling
#[derive(Parser, Debug)]
#[clap(author, about, version)]
struct Args {
#[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_port)]
pub async fn listen(
port: u16,
#[clap(default_value = "./data/", short = 'f', long, value_parser)]
fileroot: String,
#[clap(default_value_t = 8192usize, short = 'b', long, value_parser = validate_buffersize)]
fileroot: PathBuf,
buffersize: usize,
#[clap(default_value_t = false, short = 'l', long, action)]
local: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let addr = match args.local {
true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, args.port),
false => SocketAddr::new(local_ip()?, args.port),
localhost: bool,
timeout_duration: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let addr = match localhost {
true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port),
false => SocketAddr::new(local_ip()?, port),
};
let listener = TcpListener::bind(addr).await?;
println!("[+] Listening on {}", addr);
loop {
let args = Args::parse();
let buffersize = args.buffersize;
let fileroot = args.fileroot;
let alt_fileroot = fileroot.clone();
let (mut socket, addr) = listener.accept().await?;
let (mut socket, addr) =
match timeout(Duration::from_secs(timeout_duration), listener.accept()).await {
Ok(n) => n?,
Err(_) => {
println!("\nConnection timed out after {} seconds", timeout_duration);
break;
}
};
println!("\n[+] New client: {}", addr);
tokio::spawn(async move {
@ -54,32 +50,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut vec_buf = Vec::new();
// Send buffersize
writer
.write_all(buffersize.to_string().as_bytes())
.await
.unwrap();
writer.flush().await.unwrap();
writer.write_all(buffersize.to_string().as_bytes()).await?;
writer.flush().await?;
// Read ACK
let _bytes_read = reader.read_buf(&mut vec_buf).await.unwrap();
if String::from_utf8(vec_buf.clone()).unwrap() != "ACK" {
let _bytes_read = reader.read_buf(&mut vec_buf).await?;
if String::from_utf8(vec_buf.clone())? != "ACK" {
panic!("ACK not received (buffersize)");
} else {
vec_buf.clear();
}
let (metadata_list, file_amount) = get_metadata().await;
let (metadata_list, file_amount) = get_metadata().await?;
// Send file amount
writer
.write_all(file_amount.to_string().as_bytes())
.await
.unwrap();
writer.flush().await.unwrap();
writer.write_all(file_amount.to_string().as_bytes()).await?;
writer.flush().await?;
// Read ACK
let _bytes_read = reader.read_buf(&mut vec_buf).await.unwrap();
if String::from_utf8(vec_buf.clone()).unwrap() != "ACK" {
let _bytes_read = reader.read_buf(&mut vec_buf).await?;
if String::from_utf8(vec_buf.clone())? != "ACK" {
panic!("ACK not received (amount)");
} else {
vec_buf.clear();
@ -89,40 +79,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for file in &metadata_list {
// Newline as delimiter between instances
let msg = format!("{}:{}\n", file.1, file.0);
writer.write_all(msg.as_bytes()).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
}
// Handle file request(s)
println!("[+] Ready to serve files");
loop {
let bytes_read = reader.read_buf(&mut vec_buf).await.unwrap();
let bytes_read = reader.read_buf(&mut vec_buf).await?;
if bytes_read == 0 {
println!("File request never received");
break;
} else {
let msg = String::from_utf8(vec_buf.clone()).unwrap();
let msg = String::from_utf8(vec_buf.clone())?;
vec_buf.clear();
if msg == "FIN" {
println!("[+] FIN received, terminating connection...");
println!("[+] FIN received, terminating individual connection...");
break;
}
let input_path = fileroot.clone() + msg.as_str();
let mut input_path = alt_fileroot.clone();
input_path.push(msg);
println!("\n[+] File requested: {}", input_path);
let mut file = File::open(input_path.clone()).await.unwrap();
let mut remaining_data = file.metadata().await.unwrap().len();
println!("\n[+] File requested: {:#?}", input_path);
let mut file = File::open(input_path.clone()).await?;
let mut remaining_data = file.metadata().await?.len();
let mut filebuf = vec![0u8; buffersize];
while remaining_data != 0 {
let read_result = file.read(&mut filebuf);
match read_result.await {
Ok(n) => {
writer.write_all(&filebuf).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&filebuf).await?;
writer.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
@ -131,28 +122,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// Read ACK
let _bytes_read = reader.read_buf(&mut vec_buf).await.unwrap();
if String::from_utf8(vec_buf.clone()).unwrap() != "ACK" {
let _bytes_read = reader.read_buf(&mut vec_buf).await?;
if String::from_utf8(vec_buf.clone())? != "ACK" {
panic!("ACK not received (amount)");
} else {
println!("[+] File transfer successfully done");
vec_buf.clear();
}
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
}
Ok(())
}
async fn get_metadata() -> (Vec<(String, u64)>, usize) {
async fn get_metadata(
) -> Result<(Vec<(String, u64)>, usize), Box<dyn std::error::Error + Send + Sync>> {
let mut metadata = Vec::<(String, u64)>::new();
let paths = read_dir("./data").unwrap();
let paths = read_dir("./data")?;
for filename in paths {
let filepath = filename.unwrap().path().display().to_string(); // ????
let filepath = filename?.path().display().to_string();
let split = filepath.split("/").collect::<Vec<&str>>();
let filename = split[split.len() - 1].to_string();
let file = File::open(filepath).await.unwrap();
let filesize = file.metadata().await.unwrap().len();
let file = File::open(filepath).await?;
let filesize = file.metadata().await?.len();
if filesize > 0 {
metadata.push((filename, filesize));
@ -161,19 +157,5 @@ async fn get_metadata() -> (Vec<(String, u64)>, usize) {
let amount = metadata.len();
(metadata, amount)
}
fn validate_buffersize(value: &str) -> Result<usize, String> {
match value.parse::<usize>() {
Ok(n) => Ok(n),
Err(_) => Err(format!("Invalid buffersize: {}", value)),
}
}
fn validate_port(value: &str) -> Result<u16, String> {
match value.parse::<u16>() {
Ok(n) => Ok(n),
Err(_) => Err(format!("Invalid port-number: {}", value)),
}
Ok((metadata, amount))
}

76
tests/integration_test.rs Normal file
View File

@ -0,0 +1,76 @@
use fragilebyte::{client, server};
use ntest::timeout;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{
fs::{read_to_string, remove_file, File},
io::{BufWriter, Write},
path::PathBuf,
thread::{self, sleep},
time::Duration,
};
use tokio_test::block_on;
#[test]
#[timeout(10000)]
/// Syncs three textfiles from ./data to ./output and checks
/// that their contents match.
fn sync_txt_files() {
let data = vec![
("1.txt", create_data()),
("2.txt", create_data()),
("3.txt", create_data()),
];
for file in &data {
let filepath = String::from("./data/") + file.0;
let mut writer = BufWriter::new(File::create(filepath).unwrap());
writer.write_all(file.1.as_bytes()).unwrap();
}
let server_handle = thread::spawn(|| {
block_on(server::listen(
8080u16,
PathBuf::from("./data"),
8192usize,
true,
5,
))
.unwrap();
});
// Sleep to give server time to start up
sleep(Duration::from_millis(500));
let client_handle = thread::spawn(|| {
block_on(client::connect(
String::from("127.0.0.1:8080"),
PathBuf::from("./output"),
))
.unwrap();
});
client_handle.join().unwrap();
server_handle.join().unwrap();
for file in data {
let filepath = String::from("./output/") + file.0;
let content = read_to_string(filepath).unwrap();
assert_eq!(
content, file.1,
"Output [{}] does not match input [{}]",
content, file.1
);
remove_file(String::from("./output/") + file.0).unwrap();
remove_file(String::from("./data/") + file.0).unwrap();
}
}
fn create_data() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>()
}