major rewrite implementing new features

This commit is contained in:
17ms 2023-04-01 20:02:13 +03:00
parent 44b44df132
commit 845cb46881
13 changed files with 3056 additions and 938 deletions

2577
Cargo.lock generated Normal file → Executable file

File diff suppressed because it is too large Load Diff

9
Cargo.toml Normal file → Executable file
View File

@ -9,9 +9,14 @@ edition = "2021"
[dependencies]
tokio = { version = "1.20.4", features = ["full"] }
clap = { version = "3.2.8", features = ["derive"] }
local-ip-address = "0.4.4"
rand = "0.8.5"
rand = "0.7.0"
eframe = "0.21.3"
egui = "0.21.0"
x25519-dalek = "1.2.0"
aes-gcm = "0.10.1"
base64 = "0.21.0"
sha256 = "1.1.2"
[dev-dependencies]
tokio-test = "0.4.2"

View File

@ -1,326 +0,0 @@
use std::{
collections::HashMap,
error::Error,
io::stdin,
path::PathBuf,
process::exit,
sync::mpsc::{self, Receiver, Sender},
thread,
time::Duration,
};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{
tcp::{ReadHalf, WriteHalf},
TcpStream,
},
time::sleep,
};
pub async fn connect(
addr: String,
fileroot: PathBuf,
access_key: String,
download_all: bool,
) -> Result<(), Box<dyn Error>> {
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();
let connection_task = thread::spawn(move || async move {
println!("[+] Connecting to {}", addr);
let mut stream = TcpStream::connect(addr.clone()).await?;
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let mut buf = Vec::new();
// Authenticate connection
match authenticate_connection(&mut reader, &mut writer, &mut buf, &access_key).await? {
None => println!("[+] Connection authenticated successfully"),
Some(err_msg) => {
println!("{}", err_msg);
exit(0x0100);
}
}
// Receive chunksize
let chunksize = recv_msg_string(&mut reader, &mut buf)
.await?
.parse::<usize>()?;
println!("[+] Selected chunksize: {}", chunksize);
// ACK chunksize
send_msg(&mut writer, "ACK\n").await?;
// Receive metadata
let metadata = match receive_metadata(&mut reader, &mut writer, &mut buf).await? {
Some(metadata) => metadata,
None => exit(0x0100),
};
println!("[+] Received metadata: {:#?}", metadata);
// Send request for each file by filename
println!("\n[+] [<Filename> + Enter] to make a request\n");
println!("[+] \"DISCONNECT\" to disconnect");
handle_file_reqs(
&mut reader,
&mut writer,
rx,
&chunksize,
&metadata,
&fileroot,
&download_all,
)
.await?;
// Terminating connection
println!("[+] Requesting connection termination");
writer.write_all(b"FIN\n").await?;
writer.flush().await?;
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
// Separate thread for blocking stdin
let input_task = thread::spawn(move || handle_stdin(tx));
match connection_task.join().unwrap().await {
Ok(_) => {}
Err(e) => {
eprintln!("[ERROR] Error inside connection thread: {}", e);
exit(0x0100);
}
}
if !download_all {
match input_task.join().unwrap() {
Ok(_) => {}
Err(e) => {
eprintln!("[ERROR] Error inside input thread: {}", e);
exit(0x0100);
}
}
}
Ok(())
}
async fn send_msg(
writer: &mut BufWriter<WriteHalf<'_>>,
msg: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
async fn recv_msg_string(
reader: &mut BufReader<ReadHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<String, Box<dyn Error + Send + Sync>> {
let bytes_received = reader.read_until(b'\n', buf).await?;
if bytes_received == 0 {
let e: Box<dyn Error + Send + Sync> =
format!("No message received or server crashed").into();
return Err::<String, Box<dyn Error + Send + Sync>>(e);
}
let msg = String::from_utf8(buf.clone())?;
buf.clear();
Ok(msg.trim().to_string())
}
fn handle_stdin(tx: Sender<String>) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut input_string = String::new();
while input_string.trim() != "DISCONNECT" {
input_string.clear();
stdin().read_line(&mut input_string)?;
print!("\n");
tx.send(input_string.clone())?;
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
async fn authenticate_connection(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
access_key: &String,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
// Receive ACK to indicate ready-to-receive status
if recv_msg_string(reader, buf).await? != "SYN" {
return Ok(Some(
"[-] Server is not ready to receive access key, terminating connection".to_string(),
));
}
// Send access key
send_msg(writer, (access_key.to_string() + "\n").as_str()).await?;
// Terminate connection if key is invalid
if recv_msg_string(reader, buf).await? == "FIN" {
return Ok(Some(
"[-] Incorrect access key, terminating connection".to_string(),
));
} else {
send_msg(writer, "ACK\n").await?;
Ok(None)
}
}
async fn receive_metadata(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<Option<HashMap<String, u64>>, Box<dyn Error + Send + Sync>> {
// Receive file amount or terminate if no files available
let msg = recv_msg_string(reader, buf).await?;
if msg == "FIN" {
println!("[-] Server does not have any files available, closing connection");
return Ok(None);
}
let file_amount = msg.parse::<usize>()?;
println!("[+] Total of {} files available", file_amount);
// ACK file amount
send_msg(writer, "ACK\n").await?;
// Receive file metadata
let mut metadata = HashMap::new();
while metadata.len() < file_amount {
let msg = recv_msg_string(reader, buf).await?;
// Parse 'filesize:filename'
let split = msg.split(":").collect::<Vec<&str>>();
let filesize = split[0].trim().parse::<u64>()?;
let filename = split[1].trim().to_string();
metadata.insert(filename, filesize);
}
Ok(Some(metadata))
}
async fn create_filehandle(
fileroot: &PathBuf,
filename: &String,
) -> Result<(BufWriter<File>, PathBuf), Box<dyn Error + Send + Sync>> {
let mut output_path = fileroot.clone();
output_path.push(&filename);
let output_file = File::create(output_path.clone()).await?;
println!("[+] New file: {:#?}", output_path);
Ok((BufWriter::new(output_file), output_path))
}
async fn handle_file_reqs(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
rx: Receiver<String>,
chunksize: &usize,
metadata: &HashMap<String, u64>,
fileroot: &PathBuf,
download_all: &bool,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let filenames = metadata.keys().collect::<Vec<&String>>();
let mut filenames_iter = filenames.iter();
let mut input_string = String::new();
loop {
input_string.clear();
if *download_all {
match filenames_iter.next() {
Some(filename) => {
input_string.push_str(filename);
}
None => input_string.push_str("DISCONNECT"),
}
} else {
// Blocks the current thread until a message is readable
// Requests (messages) get queued if they can't be served immediately
let msg = rx.recv()?;
input_string.push_str(msg.trim());
}
// Terminate connection on request
if input_string == "DISCONNECT" {
break;
} else if !metadata.contains_key(input_string.as_str()) {
println!("[-] No file named '{}' available\n", input_string);
continue;
}
// Handle request based on input received from channel
println!("[+] Requesting file named '{}'", input_string);
send_msg(writer, (input_string.to_string() + "\n").as_str()).await?;
// Create file locally
let (mut file_buf, output_path) = create_filehandle(&fileroot, &input_string).await?;
// Receive the file itself
let filesize = metadata.get(input_string.as_str()).unwrap().clone();
receive_file(reader, &mut file_buf, &filesize, chunksize).await?;
// ACK file
send_msg(writer, "ACK\n").await?;
println!(
"[+] Successfully wrote {} bytes to {:#?}\n",
filesize, output_path
);
}
Ok(())
}
async fn receive_file(
reader: &mut BufReader<ReadHalf<'_>>,
file_buf: &mut BufWriter<File>,
filesize: &u64,
chunksize: &usize,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut remaining_data = *filesize;
let mut buf = vec![0u8; *chunksize];
while remaining_data != 0 {
if remaining_data >= *chunksize as u64 {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(0) => {
println!("[-] Connection lost, trying again until [Ctrl + C]...");
sleep(Duration::from_secs(5)).await;
continue;
}
Ok(n) => {
file_buf.write_all(&mut buf).await?;
file_buf.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
} else {
let read_result = reader.read(&mut buf);
match read_result.await {
Ok(_) => {
let mut buf_slice = &buf[0..(remaining_data as usize)];
file_buf.write_all(&mut buf_slice).await?;
file_buf.flush().await?;
remaining_data = 0;
}
_ => {}
}
}
}
Ok(())
}

50
src/common.rs Normal file
View File

@ -0,0 +1,50 @@
use crate::crypto;
use aes_gcm::{aead::consts::U12, aes::Aes256, AesGcm};
use rand::rngs::OsRng;
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf};
use tokio::{
io::{BufReader, BufWriter},
net::{
tcp::{ReadHalf, WriteHalf},
TcpStream,
},
};
#[derive(Debug, PartialEq, Eq)]
pub enum Message {
ErrorMsg(String),
Files(Vec<PathBuf>),
Metadata(HashMap<String, (u64, String)>),
Chunksize(usize),
ClientConnect(SocketAddr),
ClientDisconnect(SocketAddr),
ClientReq(String),
ClientReqAll,
ConnectionReady,
Shutdown,
}
pub struct Connection<'a> {
pub reader: BufReader<ReadHalf<'a>>,
pub writer: BufWriter<WriteHalf<'a>>,
pub cipher: AesGcm<Aes256, U12>,
pub rng: OsRng,
}
impl<'a> Connection<'a> {
pub async fn new(
socket: &'a mut TcpStream,
) -> Result<Connection<'a>, Box<dyn Error + Send + Sync>> {
let (reader, writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let cipher = crypto::aes_cipher(&mut reader, &mut writer, true).await?;
let rng = OsRng;
Ok(Self {
reader,
writer,
cipher,
rng,
})
}
}

View File

@ -15,6 +15,7 @@ pub async fn send(
data: &Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let enc: Vec<u8>;
if let (Some(cipher), Some(rng)) = (cipher, rng) {
enc = crypto::aes_encrypt(data, cipher, rng)?;
} else {
@ -26,7 +27,6 @@ pub async fn send(
.as_bytes()
.to_vec();
encoded.push(b':');
writer.write_all(&encoded).await?;
writer.flush().await?;
@ -36,22 +36,22 @@ pub async fn send(
pub async fn recv(
reader: &mut BufReader<ReadHalf<'_>>,
cipher: Option<&mut AesGcm<Aes256, U12>>,
buf: &mut Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let n = reader.read_until(b':', buf).await?;
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let mut buf = Vec::new();
let n = reader.read_until(b':', &mut buf).await?;
if n == 0 {
todo!("error: connection closed unexpectedly");
todo!("maybe error handling :)");
}
buf.pop();
*buf = general_purpose::STANDARD_NO_PAD.decode(&buf)?.to_vec();
buf = general_purpose::STANDARD_NO_PAD.decode(&buf)?.to_vec();
if let Some(cipher) = cipher {
*buf = crypto::aes_decrypt(&buf, cipher)?;
buf = crypto::aes_decrypt(&buf, cipher)?;
} else {
*buf = buf.clone();
buf = buf.clone();
}
Ok(())
Ok(buf)
}

219
src/connector.rs Executable file
View File

@ -0,0 +1,219 @@
use crate::{
common::{Connection, Message},
comms, crypto,
};
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf};
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
net::TcpStream,
sync::mpsc,
};
#[derive(Debug)]
pub struct Request {
pub name: String,
pub size: u64,
pub hash: String,
}
impl Request {
pub fn new(name: String, metadata: &HashMap<String, (u64, String)>) -> Option<Self> {
let (size, hash) = metadata.get(&name)?.clone();
Some(Self { name, size, hash })
}
}
#[derive(Debug, Clone)]
pub struct Connector {
target_addr: SocketAddr,
access_key: &'static str,
output_path: PathBuf,
}
impl Connector {
pub fn new(target_addr: SocketAddr, access_key: &'static str, output_path: PathBuf) -> Self {
Self {
target_addr,
access_key,
output_path,
}
}
pub async fn connect(
self,
tx: mpsc::Sender<Message>,
mut rx: mpsc::Receiver<Message>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut socket = TcpStream::connect(self.target_addr).await?;
let mut connection = Connection::new(&mut socket).await?;
self.authorize(&mut connection).await?;
let metadata = self.metadata(&mut connection).await?;
tx.send(Message::Metadata(metadata.clone())).await?;
self.request_handler(&mut connection, &mut rx, &metadata)
.await?;
let msg = b"FIN".to_vec();
comms::send(
&mut connection.writer,
Some(&mut connection.cipher),
Some(&mut connection.rng),
&msg,
)
.await?;
Ok(())
}
async fn authorize(
&self,
conn: &mut Connection<'_>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let msg = self.access_key.to_string().as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg == "FIN" {
todo!("maybe error handling :)");
}
Ok(())
}
async fn metadata(
&self,
conn: &mut Connection<'_>,
) -> Result<HashMap<String, (u64, String)>, Box<dyn Error + Send + Sync>> {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let amt: usize = String::from_utf8(buf)?.parse()?;
let msg = b"AMT".to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let mut metadata = HashMap::new();
while metadata.len() < amt {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
let split: Vec<&str> = msg.split(":").collect();
let name = split[0].trim().to_string();
let size: u64 = split[1].trim().parse()?;
let hash = split[2].trim().to_string();
metadata.insert(name, (size, hash));
}
Ok(metadata)
}
async fn new_handle(
&self,
filename: &str,
) -> Result<(BufWriter<File>, String), Box<dyn Error + Send + Sync>> {
let mut dir_path = self.output_path.clone();
dir_path.push(filename);
let str_path = dir_path.to_str().unwrap().to_string();
let filehandle = File::create(dir_path).await?;
Ok((BufWriter::new(filehandle), str_path))
}
async fn request(
&self,
conn: &mut Connection<'_>,
req: Request,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let (mut handle, path) = self.new_handle(&req.name).await?;
let msg = req.hash.as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let mut remaining = req.size.clone();
while remaining != 0 {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
handle.write_all(&buf).await?;
handle.flush().await?;
remaining -= buf.len() as u64;
}
let msg: Vec<u8>;
let new_hash = crypto::try_hash(&path)?;
if new_hash == req.hash {
msg = b"OK".to_vec();
} else {
msg = b"ERROR".to_vec();
}
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
Ok(true)
}
async fn request_handler(
&self,
conn: &mut Connection<'_>,
rx: &mut mpsc::Receiver<Message>,
metadata: &HashMap<String, (u64, String)>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let rx_msg = rx.recv().await;
match rx_msg.unwrap() {
Message::ClientReq(name) => {
let req = Request::new(name, metadata).unwrap(); // TODO: handle
self.request(conn, req).await?;
}
Message::ClientReqAll => {
for name in metadata.keys() {
let req = Request::new(name.clone(), metadata).unwrap(); // TODO: handle
self.request(conn, req).await?;
}
}
Message::Shutdown => {
let msg = b"DISCONNECT".to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
break;
}
_ => continue,
}
}
Ok(())
}
}

View File

@ -5,7 +5,7 @@ use aes_gcm::{
Aes256Gcm, AesGcm, KeyInit, Nonce,
};
use rand::{rngs::OsRng, RngCore};
use std::error::Error;
use std::{error::Error, path::Path};
use tokio::{
io::{BufReader, BufWriter},
net::tcp::{ReadHalf, WriteHalf},
@ -15,34 +15,36 @@ use x25519_dalek::{EphemeralSecret, PublicKey, SharedSecret};
const AES_NONCE_SIZE: usize = 12;
const DH_PBK_SIZE: usize = 32;
pub async fn edh(
async fn edh(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
go_first: bool,
) -> Result<SharedSecret, Box<dyn Error + Send + Sync>> {
let buf: Vec<u8>;
let own_sec = EphemeralSecret::new(OsRng);
let own_pbk = PublicKey::from(&own_sec);
let msg = own_pbk.as_bytes().to_vec();
if go_first {
comms::send(writer, None, None, &msg).await?;
comms::recv(reader, None, buf).await?;
buf = comms::recv(reader, None).await?;
} else {
comms::recv(reader, None, buf).await?;
buf = comms::recv(reader, None).await?;
comms::send(writer, None, None, &msg).await?;
}
let slice: [u8; DH_PBK_SIZE] = buf[..DH_PBK_SIZE].try_into()?;
buf.clear();
let recv_pbk = PublicKey::from(slice);
Ok(own_sec.diffie_hellman(&recv_pbk))
}
pub fn aes_cipher(
secret: SharedSecret,
pub async fn aes_cipher(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
go_first: bool,
) -> Result<AesGcm<Aes256, U12>, Box<dyn Error + Sync + Send>> {
let secret = edh(reader, writer, go_first).await?;
Ok(Aes256Gcm::new(secret.as_bytes().into()))
}
@ -78,12 +80,19 @@ pub fn aes_decrypt(
Ok(decrypted)
}
pub fn try_hash(path: &String) -> Result<String, Box<dyn Error + Send + Sync>> {
let path = Path::new(path);
let hash = sha256::try_digest(path)?;
Ok(hash)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_aes() {
fn aes_implementations() {
use aes_gcm::aead;
let mut gen_rng = aead::OsRng;

8
src/lib.rs Normal file → Executable file
View File

@ -1,2 +1,6 @@
pub mod client;
pub mod server;
pub mod common;
pub mod comms;
pub mod connector;
pub mod crypto;
//pub mod gui;
pub mod listener;

228
src/listener.rs Executable file
View File

@ -0,0 +1,228 @@
use crate::{
common::{Connection, Message},
comms, crypto,
};
use rand::{distributions::Alphanumeric, Rng};
use std::{collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, str::FromStr};
use tokio::{
fs::File,
io::AsyncReadExt,
net::{TcpListener, TcpStream},
sync::mpsc::{self},
};
#[derive(Debug, Clone, Copy)]
pub struct Listener {
host_addr: SocketAddr,
access_key: &'static str,
chunksize: usize,
}
// TODO: impl Drop (?)
impl Listener {
pub fn new(host_addr: SocketAddr, access_key: &'static str, chunksize: usize) -> Self {
Self {
host_addr,
access_key,
chunksize,
}
}
pub async fn start(
self,
tx: mpsc::Sender<Message>,
mut kill: mpsc::Receiver<Message>,
files: Vec<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
tokio::select! {
_ = self.listen(tx, files) => Ok(()),
_ = kill.recv() => Ok(()),
}
}
async fn listen(
self,
tx: mpsc::Sender<Message>,
files: Vec<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let listener = TcpListener::bind(self.host_addr).await?;
loop {
let files = files.clone();
let (mut socket, addr) = listener.accept().await?;
tx.send(Message::ClientConnect(addr)).await?;
let this_tx = tx.clone();
tokio::spawn(async move {
self.connection(&mut socket, addr, this_tx, &files).await?;
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
}
}
async fn connection(
&self,
socket: &mut TcpStream,
addr: SocketAddr,
tx: mpsc::Sender<Message>,
files: &Vec<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut connection = Connection::new(socket).await?;
if !self.authorize(&mut connection).await? {
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
let index = self.metadata_handler(&mut connection, &files).await?;
tx.send(Message::ConnectionReady).await?;
self.request_handler(&mut connection, &index).await?;
tx.send(Message::ClientDisconnect(addr)).await?;
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
async fn authorize(
&self,
conn: &mut Connection<'_>,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let key = String::from_utf8(buf)?;
let msg: Vec<u8>;
let res: bool;
if key != self.access_key {
res = false;
msg = b"DISCONNECT".to_vec();
} else {
res = true;
msg = b"OK".to_vec();
}
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
Ok(res)
}
async fn metadata(
&self,
files: &Vec<String>,
) -> Result<
(usize, Vec<(String, u64, String)>, HashMap<String, String>),
Box<dyn Error + Send + Sync>,
> {
let mut metadata: Vec<(String, u64, String)> = Vec::new();
let mut index = HashMap::new();
for path in files {
let split: Vec<&str> = path.split("/").collect(); // TODO: different path delimiters?
let name = split[split.len() - 1].to_string();
let handle = File::open(PathBuf::from_str(path)?).await?;
let size = handle.metadata().await?.len();
let hash = crypto::try_hash(path)?;
if size > 0 {
metadata.push((name, size, hash.clone()));
index.insert(hash, path.clone());
}
}
Ok((metadata.len(), metadata, index))
}
async fn metadata_handler(
&self,
conn: &mut Connection<'_>,
files: &Vec<String>,
) -> Result<HashMap<String, String>, Box<dyn Error + Send + Sync>> {
let (amt, metadata, index) = self.metadata(files).await?;
let msg = amt.to_string().as_bytes().to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg != "AMT" {
todo!("maybe error handling :)");
}
for file in metadata {
let msg = format!("{}:{}:{}", file.0, file.1, file.2)
.as_bytes()
.to_vec();
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&msg,
)
.await?;
}
Ok(index)
}
async fn request_handler(
&self,
conn: &mut Connection<'_>,
index: &HashMap<String, String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let cmd = String::from_utf8(buf)?;
if cmd == "DISCONNECT" {
break;
}
let mut file = File::open(index[&cmd].clone()).await?;
let mut remaining = file.metadata().await?.len();
let mut send_buf = vec![0u8; self.chunksize];
while remaining != 0 {
let n = file.read(&mut send_buf).await?;
comms::send(
&mut conn.writer,
Some(&mut conn.cipher),
Some(&mut conn.rng),
&send_buf[..n].to_vec(),
)
.await?;
remaining = remaining - n as u64;
}
let buf = comms::recv(&mut conn.reader, Some(&mut conn.cipher)).await?;
let msg = String::from_utf8(buf)?;
if msg == "ERROR" {
todo!("maybe error handling :)");
}
}
Ok(())
}
}
pub fn keygen() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect::<String>()
}

83
src/main.rs Normal file → Executable file
View File

@ -1,87 +1,8 @@
use clap::Parser;
use fragilebyte::{client, server};
use std::{error::Error, path::PathBuf, process::exit, str::FromStr};
//use fragilebyte::gui;
use std::error::Error;
use tokio;
#[derive(Parser, Debug)]
#[clap(author, about, version, long_about = None)]
struct Args {
#[clap(short = 't', long, value_parser)]
/// Server's address when connecting as a client
target: Option<String>,
#[clap(short = 'k', long, value_parser)]
/// Alphanumeric 8 characters long key required to establish a connection to the host
key: Option<String>,
#[clap(default_value_t = 8080u16, short = 'p', long, value_parser = validate_arg::<u16>)]
/// Port where the service is hosted
port: u16,
#[clap(default_value_t = 8192usize, short = 'b', long, value_parser = validate_arg::<usize>)]
/// Chunksize used in the file transfer (bytes)
chunksize: usize,
#[clap(default_value_t = false, long, action)]
/// Run only in the local network
localhost: bool,
#[clap(default_value_t = 30, long, value_parser = validate_arg::<u64>)]
/// Seconds of inactivity after which the server closes itself
timeout: u64,
#[clap(short = 'f', long, value_parser)]
/// Path to the folder where the files are outputted as a client or
/// served from as a server [default: './output' / './data']
fileroot: Option<PathBuf>,
#[clap(default_value_t = false, short = 'a', long, action)]
/// Automatically download every available file from the host (skips stdin)
all: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
match args.target {
Some(addr) => {
// Client
let fileroot = match args.fileroot {
Some(n) => n,
None => PathBuf::from("./output"),
};
let access_key = match args.key {
Some(n) => n,
None => {
eprintln!("[-] Access key required as a client, please try again");
exit(0x0100);
}
};
client::connect(addr, fileroot, access_key, args.all)
.await
.expect("Error initializing client");
}
None => {
// Server
let fileroot = match args.fileroot {
Some(n) => n,
None => PathBuf::from("./data"),
};
server::listen(
args.port,
fileroot,
args.chunksize,
args.localhost,
args.timeout,
false,
)
.await
.expect("Error initializing server");
}
}
Ok(())
}
fn validate_arg<T: FromStr>(value: &str) -> Result<T, String> {
match value.parse::<T>() {
Ok(n) => Ok(n),
Err(_) => Err(format!("Invalid argument: {}", value)),
}
}

View File

@ -1,286 +0,0 @@
use local_ip_address::local_ip;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{
error::Error,
fs::read_dir,
net::{IpAddr, SocketAddr},
path::PathBuf,
process::exit,
str::FromStr,
time::Duration,
};
use tokio::{
self,
fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{
tcp::{ReadHalf, WriteHalf},
TcpListener,
},
time::timeout,
};
pub async fn listen(
port: u16,
fileroot: PathBuf,
chunksize: usize,
localhost: bool,
timeout_duration: u64,
use_testing_key: bool,
) -> Result<(), Box<dyn Error>> {
let addr = match localhost {
true => SocketAddr::new(IpAddr::from_str("127.0.0.1")?, port),
false => {
println!("[+] Listening on {}:{}", local_ip()?, port);
SocketAddr::new(IpAddr::from_str("0.0.0.0")?, port)
}
};
// Use weak access key for integration testing, otherwise 8 char alphanumeric
let access_key = match use_testing_key {
true => "test".to_string(),
false => generate_key(),
};
let listener = TcpListener::bind(addr).await?;
println!("[+] Access key: {}", access_key);
loop {
// The first loop iteration would take the ownership without cloning
let alt_fileroot = fileroot.clone();
let alt_access_key = access_key.clone();
let (mut socket, addr) =
match timeout(Duration::from_secs(timeout_duration), listener.accept()).await {
Ok(n) => n?,
Err(_) => {
println!(
"\n[-] Connection timed out after {} seconds",
timeout_duration
);
break;
}
};
println!("\n[NEW] {}: Connected", addr);
tokio::spawn(async move {
let (reader, writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let mut vec_buf = Vec::new();
// ACK ready-to-receive status
send_msg(&mut writer, "SYN\n").await?;
// Check access key
if !check_access_key(&mut reader, &mut writer, &mut vec_buf, &alt_access_key).await? {
println!("[FIN] {}: Incorrect access key", addr);
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
// Send chunksize
send_msg(&mut writer, (chunksize.to_string() + "\n").as_str()).await?;
// ACK chunksize
if recv_msg_string(&mut reader, &mut vec_buf).await? != "ACK" {
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
// Send metadata
match handle_metadata(&mut reader, &mut writer, &mut vec_buf, &alt_fileroot, &addr)
.await?
{
None => println!("[DATA] {}: Ready to serve files", addr),
Some(err_msg) => {
println!("{}", err_msg);
exit(0x0100);
}
}
// Send filedata
match handle_file_reqs(
&mut reader,
&mut writer,
&mut vec_buf,
&alt_fileroot,
&chunksize,
&addr,
)
.await?
{
None => println!("[FIN] {}: Disconnected", addr),
Some(err_msg) => {
println!("{}", err_msg);
exit(0x0100);
}
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
}
Ok(())
}
async fn get_metadata(
fileroot: &PathBuf,
) -> Result<(Vec<(String, u64)>, usize), Box<dyn Error + Send + Sync>> {
let mut metadata = Vec::<(String, u64)>::new();
let paths = read_dir(fileroot)?;
for filename in paths {
let filepath = filename?.path().display().to_string();
let split = filepath.split("/").collect::<Vec<&str>>();
let filename = split[split.len() - 1].to_string();
let file = File::open(filepath).await?;
let filesize = file.metadata().await?.len();
if filesize > 0 {
metadata.push((filename, filesize));
}
}
let amount = metadata.len();
Ok((metadata, amount))
}
async fn handle_metadata(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
fileroot: &PathBuf,
addr: &SocketAddr,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
let (metadata_list, file_amount) = get_metadata(fileroot).await?;
// Terminate if fileroot is empty
if file_amount == 0 {
send_msg(writer, "FIN\n").await?;
return Ok(Some(format!(
"[-] No files inside {:#?}, shutting host down",
fileroot
)));
}
// Send metadata amount
send_msg(writer, (file_amount.to_string() + "\n").as_str()).await?;
// ACK metadata amount
if recv_msg_string(reader, buf).await? != "ACK" {
return Ok(Some(format!(
"[ERROR] {}: No confirmation of metadata amount",
addr
)));
}
// Send metadata
for file in &metadata_list {
send_msg(writer, format!("{}:{}\n", file.1, file.0).as_str()).await?;
}
Ok(None)
}
fn generate_key() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect::<String>()
}
async fn send_msg(
writer: &mut BufWriter<WriteHalf<'_>>,
msg: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
writer.write_all(msg.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
async fn recv_msg_string(
reader: &mut BufReader<ReadHalf<'_>>,
buf: &mut Vec<u8>,
) -> Result<String, Box<dyn Error + Send + Sync>> {
let bytes_received = reader.read_until(b'\n', buf).await?;
if bytes_received == 0 {
let e: Box<dyn Error + Send + Sync> =
format!("No message received or client crashed").into();
return Err::<String, Box<dyn Error + Send + Sync>>(e);
}
let msg = String::from_utf8(buf.clone())?;
buf.clear();
Ok(msg.trim().to_string())
}
async fn check_access_key(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
access_key: &String,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
if recv_msg_string(reader, buf).await? != *access_key {
send_msg(writer, "FIN\n").await?;
return Ok(false);
} else {
send_msg(writer, "ACK\n").await?;
recv_msg_string(reader, buf).await?; // Might be a bit unnecessary ACK
return Ok(true);
}
}
async fn handle_file_reqs(
reader: &mut BufReader<ReadHalf<'_>>,
writer: &mut BufWriter<WriteHalf<'_>>,
buf: &mut Vec<u8>,
fileroot: &PathBuf,
chunksize: &usize,
addr: &SocketAddr,
) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
loop {
// Receive filename or termination request
let req = recv_msg_string(reader, buf).await?;
if req == "FIN" {
break;
}
let mut input_path = fileroot.clone();
input_path.push(req);
println!("\n[REQ] {}: {:#?}", addr, input_path);
let mut file = File::open(input_path.clone()).await?;
let mut remaining_data = file.metadata().await?.len();
let mut filebuf = vec![0u8; *chunksize];
// Serve the file itself
while remaining_data != 0 {
let read_result = file.read(&mut filebuf);
match read_result.await {
Ok(n) => {
writer.write_all(&filebuf).await?;
writer.flush().await?;
remaining_data = remaining_data - n as u64;
}
_ => {}
}
}
// ACK file
if recv_msg_string(reader, buf).await? != "ACK" {
return Ok(Some(format!(
"[ERROR] {}: No confirmation of file {:#?}",
addr, input_path
)));
} else {
println!("[ACK] {}: File finished successfully", addr);
}
}
Ok(None)
}

View File

@ -1,81 +0,0 @@
use fragilebyte::{client, server};
use ntest::timeout;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{
fs::{read_to_string, remove_file, File},
io::{BufWriter, Write},
path::PathBuf,
thread::{self, sleep},
time::Duration,
};
use tokio_test::block_on;
#[test]
#[timeout(8000)]
/// Syncs three textfiles from ./data to ./output and checks that their contents match
fn inputless_filesync_test() {
let data = vec![
("1.txt", create_data()),
("2.txt", create_data()),
("3.txt", create_data()),
];
for file in &data {
let filepath = String::from("./data/") + file.0;
let mut writer = BufWriter::new(File::create(filepath).unwrap());
writer.write_all(file.1.as_bytes()).unwrap();
}
let server_handle = thread::spawn(|| {
// Start the server in the local network, timeouts after 5 secs of inactivity
block_on(server::listen(
8080u16,
PathBuf::from("./data"),
8192usize,
true,
5,
true,
))
.unwrap();
});
let client_handle = thread::spawn(|| {
// Run the client inputless
block_on(client::connect(
String::from("127.0.0.1:8080"),
PathBuf::from("./output"),
"test".to_string(),
true,
))
.unwrap();
});
client_handle.join().unwrap();
// Sleep to give server time to start up
sleep(Duration::from_millis(500));
server_handle.join().unwrap();
for file in data {
let filepath = String::from("./output/") + file.0;
let content = read_to_string(filepath).unwrap();
assert_eq!(
content, file.1,
"Output [{}] does not match input [{}]",
content, file.1
);
remove_file(String::from("./output/") + file.0).unwrap();
remove_file(String::from("./data/") + file.0).unwrap();
}
}
fn create_data() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>()
}

View File

@ -0,0 +1,82 @@
use fragilebyte::{common::Message, connector::Connector, listener::Listener};
use ntest::timeout;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{
fs::{self, File},
io::{BufWriter, Write},
net::SocketAddr,
path::PathBuf,
thread,
};
use tokio::sync::mpsc;
use tokio_test::block_on;
#[test]
#[timeout(2000)]
/// Tests communication between GUI and individual handlers by mocking GUI signals.
fn filesync_signals() {
let testdata = vec![
("1.txt", generate_data()),
("2.txt", generate_data()),
("3.txt", generate_data()),
];
let mut paths = Vec::new();
for file in &testdata {
let filepath = String::from("./tests/data/") + file.0;
let mut writer = BufWriter::new(File::create(filepath.clone()).unwrap());
paths.push(filepath);
writer.write_all(file.1.as_bytes()).unwrap();
}
let output_path = PathBuf::from("./tests/output/");
let server_addr = SocketAddr::from(([127, 0, 0, 1], 9191));
let (kill_server_tx, server_rx) = mpsc::channel::<Message>(2);
let (server_tx, mut local_server_rx) = mpsc::channel::<Message>(2);
let (local_client_tx, client_rx) = mpsc::channel::<Message>(2);
let (client_tx, mut local_client_rx) = mpsc::channel::<Message>(2);
let server_handle = thread::spawn(move || {
let listener = Listener::new(server_addr, "xyz", 8192usize);
block_on(listener.start(server_tx, server_rx, paths)).unwrap();
});
let server_channel_handle = thread::spawn(move || {
block_on(local_server_rx.recv()).unwrap(); // ClientConnect
block_on(local_server_rx.recv()).unwrap(); // ConnectionReady
block_on(local_server_rx.recv()).unwrap(); // ClientDisconnect
block_on(kill_server_tx.send(Message::Shutdown)).unwrap();
});
let client_handle = thread::spawn(move || {
let output_path = output_path.clone();
let connector = Connector::new(server_addr, "xyz", output_path);
block_on(connector.connect(client_tx, client_rx)).unwrap()
});
let client_channel_handle = thread::spawn(move || {
block_on(local_client_rx.recv()).unwrap(); // Metadata(HashMap)
block_on(local_client_tx.send(Message::ClientReqAll)).unwrap();
block_on(local_client_tx.send(Message::Shutdown)).unwrap();
});
client_handle.join().unwrap();
client_channel_handle.join().unwrap();
server_handle.join().unwrap();
server_channel_handle.join().unwrap();
for file in testdata {
fs::remove_file(String::from("./tests/output/") + file.0).unwrap();
fs::remove_file(String::from("./tests/data/") + file.0).unwrap();
}
}
fn generate_data() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>()
}